diff --git a/python/packages/core/agent_framework/__init__.py b/python/packages/core/agent_framework/__init__.py index bfa684b469..0867b7eb00 100644 --- a/python/packages/core/agent_framework/__init__.py +++ b/python/packages/core/agent_framework/__init__.py @@ -120,6 +120,7 @@ AgentExecutorResponse, ) from ._workflows._agent_utils import resolve_agent_id +from ._workflows._background_run import BackgroundRunHandle from ._workflows._checkpoint import ( CheckpointStorage, FileCheckpointStorage, @@ -205,6 +206,7 @@ "AgentRunInputs", "AgentSession", "Annotation", + "BackgroundRunHandle", "BaseAgent", "BaseChatClient", "BaseContextProvider", diff --git a/python/packages/core/agent_framework/_workflows/_background_run.py b/python/packages/core/agent_framework/_workflows/_background_run.py new file mode 100644 index 0000000000..7c8d1abea6 --- /dev/null +++ b/python/packages/core/agent_framework/_workflows/_background_run.py @@ -0,0 +1,116 @@ +# Copyright (c) Microsoft. All rights reserved. + +from __future__ import annotations + +import asyncio +import logging +from collections.abc import Awaitable, Callable +from typing import Any + +from ._events import WorkflowEvent +from ._runner_context import RunnerContext + +logger = logging.getLogger(__name__) + + +class BackgroundRunHandle: + """Handle for a workflow running in the background. + + Provides a polling interface to consume events produced by a background + workflow execution. The workflow runs in an ``asyncio.Task`` and events + are buffered in an internal queue until the caller drains them via + :meth:`poll`. + + Responses to ``request_info`` events can be sent while the workflow is + still running via :meth:`respond`, enabling hot-path / cold-path + parallelism. + + Example: + .. code-block:: python + + handle = workflow.run_in_background(message="Hello") + while not handle.is_idle: + events = await handle.poll() + for event in events: + if event.type == "request_info": + await handle.respond({event.request_id: answer}) + """ + + def __init__( + self, + task: asyncio.Task[None], + event_queue: asyncio.Queue[WorkflowEvent[Any]], + runner_context: RunnerContext, + resume_fn: Callable[[], Awaitable[asyncio.Task[None]]], + ) -> None: + """Initialize the background run handle. + + Args: + task: The asyncio task running the workflow. + event_queue: The queue where workflow events are buffered. + runner_context: The runner context for injecting responses. + resume_fn: Callback that creates and returns a new producer task + to resume the workflow after it has converged. + """ + self._task = task + self._event_queue = event_queue + self._runner_context = runner_context + self._resume_fn = resume_fn + + @property + def is_idle(self) -> bool: + """Whether the background task has finished producing events. + + This becomes ``True`` when the background task completes, which happens + when the workflow reaches any terminal run state — including + :attr:`~WorkflowRunState.IDLE`, + :attr:`~WorkflowRunState.IDLE_WITH_PENDING_REQUESTS`, or + :attr:`~WorkflowRunState.FAILED`. To determine which state the workflow + ended in, inspect the status events returned by :meth:`poll`. + """ + return self._task.done() + + async def poll(self) -> list[WorkflowEvent[Any]]: + """Drain all currently queued events without blocking. + + Returns: + A list of events produced since the last poll. Returns an empty + list if no events are available. + """ + events: list[WorkflowEvent[Any]] = [] + # Use get_nowait() in a loop to drain all available events. + # This is safe from race conditions because asyncio uses cooperative + # multitasking: since we never await inside this loop, no other + # coroutine (including the background task producing events) can + # execute until we finish. This guarantees we get a consistent + # snapshot of all events queued at the moment poll() was called. + while True: + try: + events.append(self._event_queue.get_nowait()) + except asyncio.QueueEmpty: + # Queue is empty — we've drained all currently available events. + # Any events added after this point will be picked up in the + # next poll() call. + break + return events + + async def respond(self, responses: dict[str, Any]) -> None: + """Send responses to pending ``request_info`` events. + + If the workflow is still running, the responses are injected into the + runner context and picked up in the next superstep. If the workflow + has already converged (idle), the responses are injected and the + runner is automatically resumed. + + Args: + responses: A mapping of request IDs to response data. + + Raises: + ValueError: If a request ID is unknown. + TypeError: If a response type does not match the expected type. + """ + for request_id, response in responses.items(): + await self._runner_context.send_request_info_response(request_id, response) + + if self.is_idle: + self._task = await self._resume_fn() diff --git a/python/packages/core/agent_framework/_workflows/_workflow.py b/python/packages/core/agent_framework/_workflows/_workflow.py index cd7dbb4a68..e1f2e81b53 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow.py +++ b/python/packages/core/agent_framework/_workflows/_workflow.py @@ -15,6 +15,7 @@ from .._types import ResponseStream from ..observability import OtelAttr, capture_exception, create_workflow_span from ._agent import WorkflowAgent +from ._background_run import BackgroundRunHandle from ._checkpoint import CheckpointStorage from ._const import DEFAULT_MAX_ITERATIONS, WORKFLOW_RUN_KWARGS_KEY from ._edge import ( @@ -537,6 +538,101 @@ def run( return response_stream return response_stream.get_final_response() + def run_in_background( + self, + message: Any | None = None, + *, + responses: dict[str, Any] | None = None, + checkpoint_id: str | None = None, + checkpoint_storage: CheckpointStorage | None = None, + **kwargs: Any, + ) -> BackgroundRunHandle: + """Start the workflow in a background asyncio task and return a polling handle. + + The workflow runs as a producer in a background task, buffering events into + an internal queue. The caller acts as the single consumer, draining events + via :meth:`BackgroundRunHandle.poll`. The workflow always runs in streaming + mode internally. + + This is a one-shot execution: the workflow runs until it reaches an idle + state (or fails), after which :attr:`BackgroundRunHandle.is_idle` becomes + ``True``. To resume, call :meth:`BackgroundRunHandle.respond` which + automatically restarts the runner, or call ``run_in_background`` again. + + Args: + message: Initial message for the start executor. Required for new workflow runs. + Mutually exclusive with responses. + responses: Responses to send for pending request info events. Mutually + exclusive with message. Can be combined with checkpoint_id. + checkpoint_id: ID of checkpoint to restore from. + checkpoint_storage: Runtime checkpoint storage. + **kwargs: Additional keyword arguments passed through to agent invocations. + + Returns: + A :class:`BackgroundRunHandle` for polling events and checking idle state. + + Raises: + ValueError: If parameter combination is invalid. + RuntimeError: If the workflow is already running. + """ + self._validate_run_params(message, responses, checkpoint_id) + self._ensure_not_running() + + event_queue: asyncio.Queue[WorkflowEvent[Any]] = asyncio.Queue() + + async def _background_producer( + message: Any | None = None, + responses: dict[str, Any] | None = None, + checkpoint_id: str | None = None, + ) -> None: + try: + async for event in self._run_core( + message=message, + responses=responses, + checkpoint_id=checkpoint_id, + checkpoint_storage=checkpoint_storage, + streaming=True, + **kwargs, + ): + await event_queue.put(event) + except Exception: + # _run_workflow_with_tracing yields failed/status events before + # re-raising, so they are already enqueued above. Suppress here + # so the task completes cleanly and is_idle becomes True. + logger.debug("Background workflow run completed with error; events already enqueued.") + finally: + await self._run_cleanup(checkpoint_storage) + + async def _resume_producer() -> None: + """Producer for resuming after respond() injects messages into the context.""" + try: + async for event in self._run_workflow_with_tracing( + initial_executor_fn=None, + reset_context=False, + streaming=True, + run_kwargs=kwargs if kwargs else None, + ): + await event_queue.put(event) + except Exception: + logger.debug("Background workflow run completed with error; events already enqueued.") + finally: + await self._run_cleanup(checkpoint_storage) + + async def _resume() -> asyncio.Task[None]: # noqa: RUF029 + """Resume the workflow by launching a new producer task. + + The responses have already been injected into the runner context + by :meth:`BackgroundRunHandle.respond`, so the messages are ready + for the next ``run_until_convergence`` cycle. + """ + self._ensure_not_running() + return asyncio.create_task(_resume_producer()) + + task = asyncio.create_task( + _background_producer(message=message, responses=responses, checkpoint_id=checkpoint_id) + ) + return BackgroundRunHandle(task, event_queue, self._runner_context, _resume) + async def _run_core( self, message: Any | None = None, diff --git a/python/packages/core/tests/workflow/test_background_run.py b/python/packages/core/tests/workflow/test_background_run.py new file mode 100644 index 0000000000..d94bc99dab --- /dev/null +++ b/python/packages/core/tests/workflow/test_background_run.py @@ -0,0 +1,389 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from dataclasses import dataclass +from typing import Any + +import pytest + +from agent_framework import ( + BackgroundRunHandle, + Executor, + WorkflowBuilder, + WorkflowContext, + WorkflowEvent, + WorkflowRunState, + handler, + response_handler, +) + + +@dataclass +class NumberMessage: + """A message carrying an integer value for testing.""" + + data: int + + +class IncrementExecutor(Executor): + """An executor that increments until a limit, then yields output.""" + + def __init__(self, id: str, *, limit: int = 10, increment: int = 1) -> None: + super().__init__(id=id) + self.limit = limit + self.increment = increment + + @handler + async def handle(self, message: NumberMessage, ctx: WorkflowContext[NumberMessage, int]) -> None: + if message.data < self.limit: + await ctx.send_message(NumberMessage(data=message.data + self.increment)) + else: + await ctx.yield_output(message.data) + + +class FailingExecutor(Executor): + """An executor that always raises an exception.""" + + @handler + async def handle(self, message: NumberMessage, ctx: WorkflowContext[NumberMessage, int]) -> None: + raise RuntimeError("Intentional failure") + + +def _build_simple_workflow(*, limit: int = 10) -> Any: + """Build a two-executor ping-pong workflow for testing.""" + executor_a = IncrementExecutor(id="a", limit=limit) + executor_b = IncrementExecutor(id="b", limit=limit) + return ( + WorkflowBuilder(start_executor=executor_a) + .add_edge(executor_a, executor_b) + .add_edge(executor_b, executor_a) + .build() + ) + + +async def _wait_and_collect(handle: BackgroundRunHandle) -> list[WorkflowEvent[Any]]: + """Wait for a background handle to become idle and collect all events.""" + await handle._task # noqa: SLF001 + return await handle.poll() + + +# --- Basic functionality --- + + +async def test_run_in_background_returns_handle() -> None: + """run_in_background returns a BackgroundRunHandle.""" + workflow = _build_simple_workflow() + handle = workflow.run_in_background(NumberMessage(data=0)) + assert isinstance(handle, BackgroundRunHandle) + await _wait_and_collect(handle) + + +async def test_run_in_background_produces_events() -> None: + """Polling the handle returns workflow events including output.""" + workflow = _build_simple_workflow() + handle = workflow.run_in_background(NumberMessage(data=0)) + + all_events = await _wait_and_collect(handle) + + assert len(all_events) > 0 + output_events = [e for e in all_events if e.type == "output"] + assert len(output_events) == 1 + assert output_events[0].data == 10 + + +async def test_run_in_background_status_events() -> None: + """Background run emits started and status events.""" + workflow = _build_simple_workflow(limit=3) + handle = workflow.run_in_background(NumberMessage(data=0)) + + all_events = await _wait_and_collect(handle) + + types = [e.type for e in all_events] + assert "started" in types + assert "status" in types + + status_events = [e for e in all_events if e.type == "status"] + final_states = [e.state for e in status_events] + assert WorkflowRunState.IDLE in final_states + + +# --- is_idle property --- + + +async def test_is_idle_false_while_running() -> None: + """is_idle is False while the workflow is still executing.""" + workflow = _build_simple_workflow() + handle = workflow.run_in_background(NumberMessage(data=0)) + # Immediately after creation, the property shouldn't raise. + _ = handle.is_idle + await _wait_and_collect(handle) + assert handle.is_idle is True + + +# --- Empty poll --- + + +async def test_poll_returns_empty_when_no_events() -> None: + """poll() returns an empty list when no events are queued.""" + workflow = _build_simple_workflow() + handle = workflow.run_in_background(NumberMessage(data=0)) + await _wait_and_collect(handle) + # Now poll again — should be empty + events = await handle.poll() + assert events == [] + + +# --- Error handling --- + + +async def test_run_in_background_error_produces_failed_event() -> None: + """When the workflow fails, poll returns a failed event and is_idle becomes True.""" + failing = FailingExecutor(id="failing") + workflow = WorkflowBuilder(start_executor=failing).build() + + handle = workflow.run_in_background(NumberMessage(data=0)) + + all_events = await _wait_and_collect(handle) + + assert handle.is_idle + failed_events = [e for e in all_events if e.type == "failed"] + assert len(failed_events) == 1 + assert "Intentional failure" in failed_events[0].details.message + + +# --- Concurrency guard --- + + +async def test_run_in_background_prevents_concurrent_run() -> None: + """Cannot start a second run while background run is in progress.""" + workflow = _build_simple_workflow() + handle = workflow.run_in_background(NumberMessage(data=0)) + + # Give it a moment to start + await asyncio.sleep(0.01) + + with pytest.raises( + RuntimeError, + match="Workflow is already running. Concurrent executions are not allowed.", + ): + workflow.run_in_background(NumberMessage(data=0)) + + await _wait_and_collect(handle) + + +async def test_run_in_background_prevents_concurrent_regular_run() -> None: + """Cannot call run() while a background run is in progress.""" + workflow = _build_simple_workflow() + handle = workflow.run_in_background(NumberMessage(data=0)) + + await asyncio.sleep(0.01) + + with pytest.raises( + RuntimeError, + match="Workflow is already running. Concurrent executions are not allowed.", + ): + await workflow.run(NumberMessage(data=0)) + + await _wait_and_collect(handle) + + +async def test_run_in_background_allows_rerun_after_completion() -> None: + """After background run completes, the workflow can be run again.""" + workflow = _build_simple_workflow(limit=3) + handle = workflow.run_in_background(NumberMessage(data=0)) + await _wait_and_collect(handle) + + # Should succeed — workflow is no longer running + result = await workflow.run(NumberMessage(data=0)) + assert result.get_final_state() == WorkflowRunState.IDLE + + +async def test_run_in_background_allows_rerun_after_failure() -> None: + """After a failed background run, the workflow can be run again.""" + failing = FailingExecutor(id="failing") + workflow_fail = WorkflowBuilder(start_executor=failing).build() + + handle = workflow_fail.run_in_background(NumberMessage(data=0)) + await _wait_and_collect(handle) + + # Reusing same instance should work after failure + handle2 = workflow_fail.run_in_background(NumberMessage(data=0)) + await _wait_and_collect(handle2) + # It will fail again, but the point is it doesn't raise "already running" + assert handle2.is_idle + + +# --- Parameter validation --- + + +async def test_run_in_background_rejects_invalid_params() -> None: + """run_in_background validates parameters the same as run().""" + workflow = _build_simple_workflow() + + with pytest.raises(ValueError, match="Must provide at least one of"): + workflow.run_in_background() + + with pytest.raises(ValueError, match="Cannot provide both"): + workflow.run_in_background(NumberMessage(data=0), responses={"r1": "yes"}) + + +# --- respond() --- + + +@dataclass +class ApprovalRequest: + """Request payload for approval.""" + + prompt: str + + +@dataclass +class ApprovalResponse: + """Response payload for approval.""" + + approved: bool + + +class RequestApprovalExecutor(Executor): + """Executor that requests approval, then yields output based on the response.""" + + @handler + async def on_message(self, message: NumberMessage, ctx: WorkflowContext) -> None: + ctx.set_state(self.id, message.data) + await ctx.request_info(ApprovalRequest(prompt="Approve?"), ApprovalResponse) + + @response_handler + async def on_response( + self, + original_request: ApprovalRequest, + response: ApprovalResponse, + ctx: WorkflowContext[NumberMessage, int], + ) -> None: + data = ctx.get_state(self.id) + assert isinstance(data, int) + if response.approved: + await ctx.yield_output(data) + else: + await ctx.yield_output(-1) + + +class PassthroughExecutor(Executor): + """Executor that forwards the input message downstream.""" + + @handler + async def handle(self, message: NumberMessage, ctx: WorkflowContext[NumberMessage]) -> None: + await ctx.send_message(message) + + +class SlowExecutor(Executor): + """Executor that sleeps before yielding output, simulating a hot path.""" + + def __init__(self, id: str, *, delay: float = 0.1) -> None: + super().__init__(id=id) + self.delay = delay + + @handler + async def handle(self, message: NumberMessage, ctx: WorkflowContext[NumberMessage, int]) -> None: + await asyncio.sleep(self.delay) + await ctx.yield_output(message.data * 100) + + +async def test_respond_after_idle_auto_resumes() -> None: + """respond() after idle injects the response and auto-resumes the runner.""" + approval = RequestApprovalExecutor(id="approval") + workflow = WorkflowBuilder(start_executor=approval).build() + + handle = workflow.run_in_background(NumberMessage(data=42)) + all_events = await _wait_and_collect(handle) + + # Workflow should be idle with a pending request + assert handle.is_idle + request_events = [e for e in all_events if e.type == "request_info"] + assert len(request_events) == 1 + + # Respond — this should auto-resume + await handle.respond({request_events[0].request_id: ApprovalResponse(approved=True)}) + assert not handle.is_idle # Runner restarted + + resumed_events = await _wait_and_collect(handle) + assert handle.is_idle + + output_events = [e for e in resumed_events if e.type == "output"] + assert len(output_events) == 1 + assert output_events[0].data == 42 + + +async def test_respond_while_running() -> None: + """respond() while the runner is still executing injects into the next superstep.""" + approval = RequestApprovalExecutor(id="approval") + slow = SlowExecutor(id="slow", delay=0.2) + + # Fan-out: start sends to both approval and slow in parallel. + # approval will request_info quickly; slow will keep running. + start = PassthroughExecutor(id="start") + + workflow = ( + WorkflowBuilder(start_executor=start) + .add_fan_out_edges(start, [approval, slow]) + .build() + ) + + handle = workflow.run_in_background(NumberMessage(data=7)) + + # Poll until we see a request_info event (approval path) + request_event = None + for _ in range(100): + events = await handle.poll() + for e in events: + if e.type == "request_info": + request_event = e + break + if request_event is not None: + break + await asyncio.sleep(0.01) + + assert request_event is not None + # The slow executor is still running, so handle should not be idle + assert not handle.is_idle + + # Respond while running + await handle.respond({request_event.request_id: ApprovalResponse(approved=True)}) + + # Wait for completion + remaining = await _wait_and_collect(handle) + all_output = [e for e in remaining if e.type == "output"] + + # We should have outputs from both paths + output_values = sorted([e.data for e in all_output]) + assert 7 in output_values # approval path + assert 700 in output_values # slow path (7 * 100) + + +async def test_respond_with_invalid_request_id() -> None: + """respond() raises ValueError for unknown request IDs.""" + approval = RequestApprovalExecutor(id="approval") + workflow = WorkflowBuilder(start_executor=approval).build() + + handle = workflow.run_in_background(NumberMessage(data=1)) + await _wait_and_collect(handle) + + with pytest.raises(ValueError, match="No pending request found"): + await handle.respond({"nonexistent_id": ApprovalResponse(approved=True)}) + + +async def test_respond_multiple_sequential() -> None: + """Multiple sequential respond() calls work correctly.""" + approval = RequestApprovalExecutor(id="approval") + workflow = WorkflowBuilder(start_executor=approval).build() + + # First run: request approval + handle = workflow.run_in_background(NumberMessage(data=10)) + events1 = await _wait_and_collect(handle) + req1 = [e for e in events1 if e.type == "request_info"] + assert len(req1) == 1 + + # Deny — the executor yields -1 + await handle.respond({req1[0].request_id: ApprovalResponse(approved=False)}) + events2 = await _wait_and_collect(handle) + outputs = [e.data for e in events2 if e.type == "output"] + assert outputs == [-1] diff --git a/python/samples/03-workflows/README.md b/python/samples/03-workflows/README.md index b72cdce54d..416103115f 100644 --- a/python/samples/03-workflows/README.md +++ b/python/samples/03-workflows/README.md @@ -43,6 +43,14 @@ Once comfortable with these, explore the rest of the samples below. | Workflow as Agent with Session | [agents/workflow_as_agent_with_session.py](./agents/workflow_as_agent_with_session.py) | Use AgentSession to maintain conversation history across workflow-as-agent invocations | | Workflow as Agent kwargs | [agents/workflow_as_agent_kwargs.py](./agents/workflow_as_agent_kwargs.py) | Pass custom context (data, user tokens) via kwargs through workflow.as_agent() to @ai_function tools | +### background-run + +| Sample | File | Concepts | +| -------------------- | -------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------- | +| Basic Polling | [background-run/basic_polling.py](./background-run/basic_polling.py) | Start a background run, poll for events with `handle.poll()`, check `is_idle` | +| Error Handling | [background-run/error_handling.py](./background-run/error_handling.py) | Inspect failed events via polling, re-run workflow after failure | +| Respond While Running | [background-run/respond_while_running.py](./background-run/respond_while_running.py) | Fan-out hot/cold paths, `handle.respond()` while workflow is still running | + ### checkpoint | Sample | File | Concepts | diff --git a/python/samples/03-workflows/background-run/basic_polling.py b/python/samples/03-workflows/background-run/basic_polling.py new file mode 100644 index 0000000000..f24ff80254 --- /dev/null +++ b/python/samples/03-workflows/background-run/basic_polling.py @@ -0,0 +1,106 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio + +from agent_framework import ( + Executor, + WorkflowBuilder, + WorkflowContext, + WorkflowEvent, + handler, +) + +""" +Sample: Background run with polling + +What this example shows +- Starting a workflow in the background with `workflow.run_in_background()`. +- Polling for events at your own pace with `handle.poll()`. +- Checking when the workflow reaches an idle state with `handle.is_idle`. + +This pattern is useful when a consumer needs to pull events on demand rather +than being pushed events via streaming. The workflow (producer) runs in a +background asyncio task and buffers events into an internal queue. The caller +(consumer) drains that queue whenever it is ready. + +Prerequisites +- No external services required. +""" + + +class UpperCase(Executor): + """Convert text to uppercase and forward it.""" + + @handler + async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None: + result = text.upper() + await ctx.send_message(result) + + +class ReverseText(Executor): + """Reverse text and yield it as workflow output.""" + + @handler + async def reverse(self, text: str, ctx: WorkflowContext[str, str]) -> None: + result = text[::-1] + await ctx.yield_output(result) + + +async def main(): + """Run a simple workflow in the background and poll for events.""" + upper_case = UpperCase(id="upper_case") + reverse_text = ReverseText(id="reverse_text") + + workflow = WorkflowBuilder(start_executor=upper_case).add_edge(upper_case, reverse_text).build() + + # Start the workflow in the background. This returns immediately with a + # handle that lets the caller pull events at its own pace. + handle = workflow.run_in_background("hello world") + + # Poll for events until the workflow becomes idle. + all_events: list[WorkflowEvent] = [] + while not handle.is_idle: + # The workflow continues running in the background while we process events. + events = await handle.poll() + all_events.extend(events) + await asyncio.sleep(0.01) + + # Drain any remaining events produced just before idle was detected. + all_events.extend(await handle.poll()) + + # Print all collected events. + print("Events received:") + for event in all_events: + print(f" type={event.type}", end="") + if event.type == "output": + print(f" data={event.data!r}", end="") + print() + + # Extract outputs. + outputs = [e.data for e in all_events if e.type == "output"] + print(f"\nOutputs: {outputs}") + print(f"Handle is idle: {handle.is_idle}") + + """ + Sample Output: + + Events received: + type=started + type=status + type=superstep_started + type=executor_invoked + type=executor_completed + type=superstep_completed + type=superstep_started + type=executor_invoked + type=executor_completed + type=output data='DLROW OLLEH' + type=superstep_completed + type=status + Handle is idle: True + Outputs: ['DLROW OLLEH'] + """ + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/03-workflows/background-run/error_handling.py b/python/samples/03-workflows/background-run/error_handling.py new file mode 100644 index 0000000000..cf1c63895e --- /dev/null +++ b/python/samples/03-workflows/background-run/error_handling.py @@ -0,0 +1,126 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio + +from agent_framework import ( + Executor, + WorkflowBuilder, + WorkflowContext, + WorkflowEvent, + handler, +) + +""" +Sample: Background run error handling + +What this example shows +- How exceptions raised inside executors are surfaced as failed events + through the polling interface. +- After a failure, `handle.is_idle` becomes True and the caller can inspect + the error details from the polled events. +- The workflow can be re-used after a failed background run. + +Prerequisites +- No external services required. +""" + + +class ValidatingExecutor(Executor): + """Validate input and forward it, or raise if invalid.""" + + @handler + async def validate(self, text: str, ctx: WorkflowContext[str]) -> None: + if not text.strip(): + raise ValueError("Input must not be empty or whitespace.") + await ctx.send_message(text.upper()) + + +class OutputExecutor(Executor): + """Yield the received text as workflow output.""" + + @handler + async def output(self, text: str, ctx: WorkflowContext[str, str]) -> None: + await ctx.yield_output(text) + + +def create_workflow(): + """Create a fresh workflow instance.""" + validator = ValidatingExecutor(id="validator") + output = OutputExecutor(id="output") + return WorkflowBuilder(start_executor=validator).add_edge(validator, output).build() + + +async def main(): + """Demonstrate error handling with background run polling.""" + + # --- Scenario 1: A successful run --- + print("=== Scenario 1: Successful run ===") + workflow = create_workflow() + handle = workflow.run_in_background("hello") + + all_events: list[WorkflowEvent] = [] + while not handle.is_idle: + # The workflow continues running in the background while we process events. + all_events.extend(await handle.poll()) + await asyncio.sleep(0.01) + all_events.extend(await handle.poll()) + + outputs = [e.data for e in all_events if e.type == "output"] + print(f"Outputs: {outputs}") + print(f"Handle is idle: {handle.is_idle}") + + # --- Scenario 2: A failing run --- + print("\n=== Scenario 2: Failing run ===") + workflow = create_workflow() + handle = workflow.run_in_background(" ") # Whitespace triggers the error + + all_events = [] + while not handle.is_idle: + # The workflow continues running in the background while we process events. + all_events.extend(await handle.poll()) + # Throttle polling; poll() is non-blocking and returns immediately. + await asyncio.sleep(0.01) + all_events.extend(await handle.poll()) + + # The handle becomes idle even after a failure. + print(f"Handle is idle: {handle.is_idle}") + + # Inspect the failed event. + for event in all_events: + if event.type == "failed": + print(f"Error type: {event.details.error_type}") + print(f"Error message: {event.details.message}") + + # --- Scenario 3: Re-use the workflow after failure --- + print("\n=== Scenario 3: Re-run after failure ===") + handle = workflow.run_in_background("world") + + all_events = [] + while not handle.is_idle: + # The workflow continues running in the background while we process events. + all_events.extend(await handle.poll()) + await asyncio.sleep(0.01) + all_events.extend(await handle.poll()) + + outputs = [e.data for e in all_events if e.type == "output"] + print(f"Outputs: {outputs}") + + """ + Sample Output: + + === Scenario 1: Successful run === + Outputs: ['HELLO'] + Handle is idle: True + + === Scenario 2: Failing run === + Handle is idle: True + Error type: ValueError + Error message: Input must not be empty or whitespace. + + === Scenario 3: Re-run after failure === + Outputs: ['WORLD'] + """ + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/03-workflows/background-run/respond_while_running.py b/python/samples/03-workflows/background-run/respond_while_running.py new file mode 100644 index 0000000000..8934527497 --- /dev/null +++ b/python/samples/03-workflows/background-run/respond_while_running.py @@ -0,0 +1,173 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from dataclasses import dataclass + +from agent_framework import ( + Executor, + WorkflowBuilder, + WorkflowContext, + handler, + response_handler, +) + +""" +Sample: Responding while the workflow is still running (hot path / cold path) + +What this example shows +- A workflow with two parallel paths after a fan-out: + 1) A "hot path" that loops, incrementing a counter each superstep. + 2) A "cold path" that pauses to request human approval via `ctx.request_info()`. +- Using `handle.respond()` inside the poll loop to send the approval while the + hot path is still iterating, so both paths make progress concurrently. +- If the workflow has already converged when `respond()` is called, the runner + is automatically resumed — no need to call `run_in_background()` again. + +This pattern is useful for workflows where one branch can proceed independently +while another branch waits for external input. With the traditional `run()` API, +the caller must wait for the entire workflow to become idle before sending +responses. With `run_in_background()` + `respond()`, responses can be injected +at any time. + +Prerequisites +- No external services required. +""" + + +# --- Messages --- + + +@dataclass +class TaskInput: + """Input message carrying a numeric value for both paths.""" + + value: int + + +@dataclass +class ApprovalRequest: + """Request sent to the caller for approval.""" + + prompt: str + + +@dataclass +class ApprovalResponse: + """Response from the caller.""" + + approved: bool + + +# --- Executors --- + + +class Dispatcher(Executor): + """Forwards the input to both the hot path and the cold path.""" + + @handler + async def dispatch(self, message: TaskInput, ctx: WorkflowContext[TaskInput]) -> None: + await ctx.send_message(message) + + +class HotPathExecutor(Executor): + """Increments a counter in a self-loop until it reaches a limit, then yields output. + + Each iteration is a separate superstep, so the workflow stays active while + this executor loops — giving the caller time to respond to requests from + the cold path. + """ + + def __init__(self, id: str, *, limit: int = 10) -> None: + super().__init__(id=id) + self.limit = limit + + @handler + async def compute(self, message: TaskInput, ctx: WorkflowContext[TaskInput, str]) -> None: + if message.value < self.limit: + await asyncio.sleep(0.05) # Simulate work each iteration + await ctx.send_message(TaskInput(value=message.value + 1)) + else: + await ctx.yield_output(f"Hot path done (counted to {message.value})") + + +class ColdPathExecutor(Executor): + """Requests human approval before producing output.""" + + @handler + async def request_approval(self, message: TaskInput, ctx: WorkflowContext) -> None: + ctx.set_state(self.id, message.value) + await ctx.request_info( + ApprovalRequest(prompt=f"Approve processing value {message.value}?"), + ApprovalResponse, + ) + + @response_handler + async def on_approval( + self, + original_request: ApprovalRequest, + response: ApprovalResponse, + ctx: WorkflowContext[TaskInput, str], + ) -> None: + value = ctx.get_state(self.id) + if response.approved: + await ctx.yield_output(f"Cold path approved (value={value})") + else: + await ctx.yield_output(f"Cold path rejected (value={value})") + + +async def main(): + """Run a fan-out workflow and respond to a request inside the poll loop.""" + dispatcher = Dispatcher(id="dispatcher") + hot_path = HotPathExecutor(id="hot_path", limit=10) + cold_path = ColdPathExecutor(id="cold_path") + + workflow = ( + WorkflowBuilder(start_executor=dispatcher) + .add_fan_out_edges(dispatcher, [hot_path, cold_path]) + .add_edge(hot_path, hot_path) # Self-loop for the hot path + .build() + ) + + handle = workflow.run_in_background(TaskInput(value=0)) + + # Single poll loop: process all events and respond to requests inline. + # The workflow continues running in the background while we process events. + outputs: list[str] = [] + while not handle.is_idle: + for event in await handle.poll(): + if event.type == "request_info" and isinstance(event.data, ApprovalRequest): + print(f" Request: {event.data.prompt}") + print(f" (hot path still running: is_idle={handle.is_idle})") + + # Respond immediately inside the poll loop. + await handle.respond({event.request_id: ApprovalResponse(approved=True)}) + + elif event.type == "output": + outputs.append(event.data) + print(f" Output: {event.data}") + + # Throttle polling; poll() is non-blocking and returns immediately. + await asyncio.sleep(0.01) + + # Drain any final events after idle. + for event in await handle.poll(): + if event.type == "output": + outputs.append(event.data) + print(f" Output: {event.data}") + + print(f"\nAll outputs: {outputs}") + + """ + Sample Output: + + Request: Approve processing value 0? + (hot path still running: is_idle=False) + Output: Cold path approved (value=0) + Output: Hot path done (counted to 10) + + All outputs: ['Cold path approved (value=0)', 'Hot path done (counted to 10)'] + """ + + +if __name__ == "__main__": + asyncio.run(main())