From c36038442399c30c4632700238738859332a1040 Mon Sep 17 00:00:00 2001 From: radu-mocanu Date: Thu, 22 Jan 2026 19:06:00 +0200 Subject: [PATCH] feat: resume runtime on fired triggers --- pyproject.toml | 2 +- src/uipath/runtime/resumable/runtime.py | 73 ++++- tests/test_resumable.py | 381 ++++++++++++++++++------ uv.lock | 2 +- 4 files changed, 352 insertions(+), 106 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2b562f7..8366349 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-runtime" -version = "0.6.0" +version = "0.6.1" description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/src/uipath/runtime/resumable/runtime.py b/src/uipath/runtime/resumable/runtime.py index ccac3b6..2d98d3d 100644 --- a/src/uipath/runtime/resumable/runtime.py +++ b/src/uipath/runtime/resumable/runtime.py @@ -12,11 +12,15 @@ ) from uipath.runtime.debug.breakpoint import UiPathBreakpointResult from uipath.runtime.events import UiPathRuntimeEvent -from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus +from uipath.runtime.result import ( + UiPathRuntimeResult, + UiPathRuntimeStatus, +) from uipath.runtime.resumable.protocols import ( UiPathResumableStorageProtocol, UiPathResumeTriggerProtocol, ) +from uipath.runtime.resumable.trigger import UiPathResumeTrigger from uipath.runtime.schema import UiPathRuntimeSchema logger = logging.getLogger(__name__) @@ -51,6 +55,7 @@ def __init__( self.storage = storage self.trigger_manager = trigger_manager self.runtime_id = runtime_id + self._fired_triggers_map: dict[str, Any] = {} async def execute( self, @@ -66,14 +71,29 @@ async def execute( Returns: Execution result, potentially with resume trigger attached """ - # If resuming, restore trigger from storage + # check if we are resuming if options and options.resume: - input = await self._restore_resume_input(input) + if self._fired_triggers_map: + input = self._fired_triggers_map + self._fired_triggers_map = {} + else: + # restore trigger from storage + input = await self._restore_resume_input(input) # Execute the delegate result = await self.delegate.execute(input, options=options) # If suspended, create and persist trigger - return await self._handle_suspension(result) + suspension_result = await self._handle_suspension(result) + if not self._fired_triggers_map: + return suspension_result + + # some triggers are already fired, runtime can be resumed + resume_options = options or UiPathExecuteOptions(resume=True) + if not resume_options.resume: + resume_options = UiPathExecuteOptions(resume=True) + return await self.execute( + options=resume_options, + ) async def stream( self, @@ -89,9 +109,14 @@ async def stream( Yields: Runtime events during execution, final event is UiPathRuntimeResult """ - # If resuming, restore trigger from storage + # check if we are resuming if options and options.resume: - input = await self._restore_resume_input(input) + if self._fired_triggers_map: + input = self._fired_triggers_map + self._fired_triggers_map = {} + else: + # restore trigger from storage + input = await self._restore_resume_input(input) final_result: UiPathRuntimeResult | None = None async for event in self.delegate.stream(input, options=options): @@ -102,7 +127,20 @@ async def stream( # If suspended, create and persist trigger if final_result: - yield await self._handle_suspension(final_result) + suspension_result = await self._handle_suspension(final_result) + + if not self._fired_triggers_map: + yield suspension_result + return + + # some triggers are already fired, runtime can be resumed + resume_options = options or UiPathStreamOptions(resume=True) + if not resume_options.resume: + resume_options = UiPathStreamOptions(resume=True) + async for event in self.stream( + options=resume_options, + ): + yield event async def _restore_resume_input( self, input: dict[str, Any] | None @@ -142,6 +180,11 @@ async def _restore_resume_input( if not triggers: return None + return await self._build_resume_map(triggers) + + async def _build_resume_map( + self, triggers: list[UiPathResumeTrigger] + ) -> dict[str, Any]: # Build resume map: {interrupt_id: resume_data} resume_map: dict[str, Any] = {} for trigger in triggers: @@ -166,11 +209,10 @@ async def _handle_suspension( Args: result: The execution result to check for suspension """ - # Only handle suspensions - if result.status != UiPathRuntimeStatus.SUSPENDED: - return result - - if isinstance(result, UiPathBreakpointResult): + # Only handle interrupt suspensions + if result.status != UiPathRuntimeStatus.SUSPENDED or isinstance( + result, UiPathBreakpointResult + ): return result suspended_result = UiPathRuntimeResult( @@ -205,6 +247,13 @@ async def _handle_suspension( # Backward compatibility: set single trigger directly suspended_result.trigger = suspended_result.triggers[0] + # check if any trigger can be resumed + # Note: when resuming a job, orchestrator deletes all triggers associated with it, + # thus we can resume the runtime at this point without worrying a trigger may be fired 'twice' + triggers = await self.storage.get_triggers(self.runtime_id) + if triggers: + self._fired_triggers_map = await self._build_resume_map(triggers) + return suspended_result async def get_schema(self) -> UiPathRuntimeSchema: diff --git a/tests/test_resumable.py b/tests/test_resumable.py index ab3398d..fc04e37 100644 --- a/tests/test_resumable.py +++ b/tests/test_resumable.py @@ -131,129 +131,326 @@ def create_trigger_impl(data: dict[str, Any]) -> UiPathResumeTrigger: payload=data, ) + async def read_trigger_default(trigger: UiPathResumeTrigger) -> dict[str, Any]: + # Default behavior: triggers are pending + raise UiPathPendingTriggerError("Trigger not fired yet") + manager.create_trigger = AsyncMock(side_effect=create_trigger_impl) - manager.read_trigger = AsyncMock() + manager.read_trigger = AsyncMock(side_effect=read_trigger_default) return cast(UiPathResumeTriggerProtocol, manager) -@pytest.mark.asyncio -async def test_resumable_creates_multiple_triggers_on_first_suspension(): - """First suspension with parallel branches should create multiple triggers.""" +class TestResumableRuntime: + @pytest.mark.asyncio + async def test_resumable_creates_multiple_triggers_on_first_suspension( + self, + ) -> None: + """First suspension with parallel branches should create multiple triggers.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + + result = await resumable.execute({}) + + # Should be suspended with 2 triggers + assert result.status == UiPathRuntimeStatus.SUSPENDED + assert result.triggers is not None + assert len(result.triggers) == 2 + assert {t.interrupt_id for t in result.triggers} == {"int-1", "int-2"} + + # Check payloads by interrupt_id (order should be preserved) + assert result.triggers[0].interrupt_id == "int-1" + assert result.triggers[0].payload == {"action": "approve_branch_1"} + assert result.triggers[1].interrupt_id == "int-2" + assert result.triggers[1].payload == {"action": "approve_branch_2"} + + # Both triggers should be created and saved + assert cast(AsyncMock, trigger_manager.create_trigger).await_count == 2 + assert len(storage.triggers) == 2 + + @pytest.mark.asyncio + async def test_resumable_adds_only_new_triggers_on_partial_resume(self) -> None: + """Partial resume should keep pending trigger and add only new ones.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + # First execution + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + + result1 = await resumable.execute({}) + assert result1.triggers is not None + assert len(result1.triggers) == 2 # int-1, int-2 + + # Create async side effect function for read_trigger + async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: + if trigger.interrupt_id == "int-1": + return {"approved": True} + raise UiPathPendingTriggerError("still pending") - runtime_impl = MultiTriggerMockRuntime() - storage = StatefulStorageMock() - trigger_manager = make_trigger_manager_mock() + # Replace the mock with new side_effect + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore - resumable = UiPathResumableRuntime( - delegate=runtime_impl, - storage=storage, - trigger_manager=trigger_manager, - runtime_id="runtime-1", - ) + # Second execution (resume) + result2 = await resumable.execute( + None, options=UiPathExecuteOptions(resume=True) + ) - result = await resumable.execute({}) + # Should have 2 triggers: int-2 (existing) + int-3 (new) + assert result2.status == UiPathRuntimeStatus.SUSPENDED + assert result2.triggers is not None + assert len(result2.triggers) == 2 + assert {t.interrupt_id for t in result2.triggers} == {"int-2", "int-3"} - # Should be suspended with 2 triggers - assert result.status == UiPathRuntimeStatus.SUSPENDED - assert result.triggers is not None - assert len(result.triggers) == 2 - assert {t.interrupt_id for t in result.triggers} == {"int-1", "int-2"} + # Only one new trigger created (int-3) - total 3 calls (2 from first + 1 new) + assert cast(AsyncMock, trigger_manager.create_trigger).await_count == 3 - # Check payloads by interrupt_id (order should be preserved) - assert result.triggers[0].interrupt_id == "int-1" - assert result.triggers[0].payload == {"action": "approve_branch_1"} - assert result.triggers[1].interrupt_id == "int-2" - assert result.triggers[1].payload == {"action": "approve_branch_2"} + @pytest.mark.asyncio + async def test_resumable_completes_after_all_triggers_resolved(self) -> None: + """After all triggers resolved, execution should complete successfully.""" - # Both triggers should be created and saved - assert cast(AsyncMock, trigger_manager.create_trigger).await_count == 2 - assert len(storage.triggers) == 2 + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) -@pytest.mark.asyncio -async def test_resumable_adds_only_new_triggers_on_partial_resume(): - """Partial resume should keep pending trigger and add only new ones.""" + # First execution - creates int-1, int-2 + await resumable.execute({}) - runtime_impl = MultiTriggerMockRuntime() - storage = StatefulStorageMock() - trigger_manager = make_trigger_manager_mock() + # Create async side effect for second resume + async def read_trigger_impl_2(trigger: UiPathResumeTrigger) -> dict[str, Any]: + if trigger.interrupt_id == "int-1": + return {"approved": True} + raise UiPathPendingTriggerError("pending") - # First execution - resumable = UiPathResumableRuntime( - delegate=runtime_impl, - storage=storage, - trigger_manager=trigger_manager, - runtime_id="runtime-1", - ) + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl_2) # type: ignore - result1 = await resumable.execute({}) - assert result1.triggers is not None - assert len(result1.triggers) == 2 # int-1, int-2 + # Second execution - int-1 resolved, creates int-3 + await resumable.execute(None, options=UiPathExecuteOptions(resume=True)) - # Create async side effect function for read_trigger - async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: - if trigger.interrupt_id == "int-1": + # Create async side effect for final resume + async def read_trigger_impl_3(trigger: UiPathResumeTrigger) -> dict[str, Any]: return {"approved": True} - raise UiPathPendingTriggerError("still pending") - # Replace the mock with new side_effect - trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl_3) # type: ignore - # Second execution (resume) - result2 = await resumable.execute(None, options=UiPathExecuteOptions(resume=True)) + # Third execution - int-2 and int-3 both resolved + result = await resumable.execute( + None, options=UiPathExecuteOptions(resume=True) + ) + + # Should be successful now + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + assert isinstance(result.output, dict) + assert result.output["completed"] is True + assert "int-2" in result.output["resume_data"] + assert "int-3" in result.output["resume_data"] + + @pytest.mark.asyncio + async def test_resumable_auto_resumes_when_triggers_already_fired(self) -> None: + """When triggers are already fired during suspension, runtime should auto-resume.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + # Configure trigger manager: only int-1 is immediately fired, int-2 stays pending + async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: + # Only int-1 is immediately available + if trigger.interrupt_id == "int-1": + return {"approved": True} + # All other triggers are pending + raise UiPathPendingTriggerError("pending") + + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore + + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) - # Should have 2 triggers: int-2 (existing) + int-3 (new) - assert result2.status == UiPathRuntimeStatus.SUSPENDED - assert result2.triggers is not None - assert len(result2.triggers) == 2 - assert {t.interrupt_id for t in result2.triggers} == {"int-2", "int-3"} + # First execution - should suspend with int-1 and int-2, but since int-1 is + # already fired, it should auto-resume and suspend again with int-2 and int-3 + result = await resumable.execute({}) + + # The runtime should have auto-resumed once and suspended again + assert result.status == UiPathRuntimeStatus.SUSPENDED + assert result.triggers is not None + assert len(result.triggers) == 2 + # After auto-resume with int-1, we should be at second suspension with int-2, int-3 + assert {t.interrupt_id for t in result.triggers} == {"int-2", "int-3"} + + # Delegate should have been executed twice (initial + auto-resume) + assert runtime_impl.execution_count == 2 + + @pytest.mark.asyncio + async def test_resumable_auto_resumes_partial_fired_triggers(self) -> None: + """When only some triggers are fired during suspension, auto-resume with those.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + # Configure trigger manager so int-1 is fired but int-2 is pending + async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: + if trigger.interrupt_id == "int-1": + return {"approved": True} + raise UiPathPendingTriggerError("still pending") + + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore + + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) - # Only one new trigger created (int-3) - total 3 calls (2 from first + 1 new) - assert cast(AsyncMock, trigger_manager.create_trigger).await_count == 3 + # First execution - int-1 fires immediately, int-2 stays pending + # Should auto-resume with int-1 and suspend with int-2, int-3 + result = await resumable.execute({}) + + assert result.status == UiPathRuntimeStatus.SUSPENDED + assert result.triggers is not None + # After auto-resume with int-1, should have int-2 (still pending) + int-3 (new) + assert {t.interrupt_id for t in result.triggers} == {"int-2", "int-3"} + + # Verify int-1 was consumed (deleted from storage) + remaining_triggers = await storage.get_triggers("runtime-1") + assert all(t.interrupt_id != "int-1" for t in remaining_triggers) + + @pytest.mark.asyncio + async def test_resumable_auto_resumes_multiple_times(self) -> None: + """When triggers keep being fired immediately, keep auto-resuming until complete.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + # Track which phase we're in to fire the right triggers + checked_triggers = set() + + async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: + # First time we see int-1: fire it immediately + if trigger.interrupt_id == "int-1" and "int-1" not in checked_triggers: + checked_triggers.add("int-1") + return {"approved": True} + # After int-1 fires, we'll see int-2 and int-3 + # Fire them both immediately + if trigger.interrupt_id in ["int-2", "int-3"]: + return {"approved": True} + raise UiPathPendingTriggerError("pending") + + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore + + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + # Execute once - should auto-resume through all suspensions + result = await resumable.execute({}) + + # Should complete successfully after auto-resuming twice + # 1st exec: suspend with int-1, int-2 -> int-1 fires -> auto-resume + # 2nd exec: suspend with int-2, int-3 -> both fire -> auto-resume + # 3rd exec: complete + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + assert isinstance(result.output, dict) + assert result.output["completed"] is True + + # Delegate should have been executed 3 times + assert runtime_impl.execution_count == 3 + + @pytest.mark.asyncio + async def test_resumable_stream_auto_resumes_when_triggers_fired(self) -> None: + """Stream should auto-resume when triggers are already fired.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + # Configure int-1 to be immediately fired, int-2 pending + async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: + if trigger.interrupt_id == "int-1": + return {"approved": True} + raise UiPathPendingTriggerError("pending") + + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore + + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) -@pytest.mark.asyncio -async def test_resumable_completes_after_all_triggers_resolved(): - """After all triggers resolved, execution should complete successfully.""" + # Stream should auto-resume and yield final result after auto-resume + events = [] + async for event in resumable.stream({}): + events.append(event) - runtime_impl = MultiTriggerMockRuntime() - storage = StatefulStorageMock() - trigger_manager = make_trigger_manager_mock() + # Should have received exactly one final result (after auto-resume) + assert len(events) == 1 + assert isinstance(events[0], UiPathRuntimeResult) + assert events[0].status == UiPathRuntimeStatus.SUSPENDED - resumable = UiPathResumableRuntime( - delegate=runtime_impl, - storage=storage, - trigger_manager=trigger_manager, - runtime_id="runtime-1", - ) + # Should be at second suspension (after auto-resume with int-1) + assert events[0].triggers is not None + assert {t.interrupt_id for t in events[0].triggers} == {"int-2", "int-3"} - # First execution - creates int-1, int-2 - await resumable.execute({}) + @pytest.mark.asyncio + async def test_resumable_no_auto_resume_when_all_triggers_pending(self) -> None: + """When all triggers are pending, should NOT auto-resume.""" - # Create async side effect for second resume - async def read_trigger_impl_2(trigger: UiPathResumeTrigger) -> dict[str, Any]: - if trigger.interrupt_id == "int-1": - return {"approved": True} - raise UiPathPendingTriggerError("pending") + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() - trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl_2) # type: ignore + # All triggers are pending + async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: + raise UiPathPendingTriggerError("pending") - # Second execution - int-1 resolved, creates int-3 - await resumable.execute(None, options=UiPathExecuteOptions(resume=True)) + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore - # Create async side effect for final resume - async def read_trigger_impl_3(trigger: UiPathResumeTrigger) -> dict[str, Any]: - return {"approved": True} + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) - trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl_3) # type: ignore + # Execute - should suspend + result = await resumable.execute({}) - # Third execution - int-2 and int-3 both resolved - result = await resumable.execute(None, options=UiPathExecuteOptions(resume=True)) + assert result.status == UiPathRuntimeStatus.SUSPENDED + assert result.triggers is not None + assert {t.interrupt_id for t in result.triggers} == {"int-1", "int-2"} - # Should be successful now - assert result.status == UiPathRuntimeStatus.SUCCESSFUL - assert isinstance(result.output, dict) - assert result.output["completed"] is True - assert "int-2" in result.output["resume_data"] - assert "int-3" in result.output["resume_data"] + # Delegate should have been executed only once) + assert runtime_impl.execution_count == 1 diff --git a/uv.lock b/uv.lock index 5ad2469..fbdc4c9 100644 --- a/uv.lock +++ b/uv.lock @@ -1005,7 +1005,7 @@ wheels = [ [[package]] name = "uipath-runtime" -version = "0.6.0" +version = "0.6.1" source = { editable = "." } dependencies = [ { name = "uipath-core" },