-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Python: Workflows background run #4274
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f10df36
fb6809e
daa60f9
7d4f9b1
5974967
015c5df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,116 @@ | ||
| # Copyright (c) Microsoft. All rights reserved. | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import logging | ||
| from collections.abc import Awaitable, Callable | ||
| from typing import Any | ||
|
|
||
| from ._events import WorkflowEvent | ||
| from ._runner_context import RunnerContext | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class BackgroundRunHandle: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels like this class is essentially a wrapper around asyncio primitives: create_task and an asyncio.Queue. Why do we need to wrap these well-known constructs?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and this also adds another concept that people have to learn, I do not think this is needed |
||
| """Handle for a workflow running in the background. | ||
| Provides a polling interface to consume events produced by a background | ||
| workflow execution. The workflow runs in an ``asyncio.Task`` and events | ||
| are buffered in an internal queue until the caller drains them via | ||
| :meth:`poll`. | ||
| Responses to ``request_info`` events can be sent while the workflow is | ||
| still running via :meth:`respond`, enabling hot-path / cold-path | ||
| parallelism. | ||
| Example: | ||
| .. code-block:: python | ||
| handle = workflow.run_in_background(message="Hello") | ||
| while not handle.is_idle: | ||
| events = await handle.poll() | ||
| for event in events: | ||
| if event.type == "request_info": | ||
| await handle.respond({event.request_id: answer}) | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| task: asyncio.Task[None], | ||
| event_queue: asyncio.Queue[WorkflowEvent[Any]], | ||
| runner_context: RunnerContext, | ||
| resume_fn: Callable[[], Awaitable[asyncio.Task[None]]], | ||
| ) -> None: | ||
| """Initialize the background run handle. | ||
| Args: | ||
| task: The asyncio task running the workflow. | ||
| event_queue: The queue where workflow events are buffered. | ||
| runner_context: The runner context for injecting responses. | ||
| resume_fn: Callback that creates and returns a new producer task | ||
| to resume the workflow after it has converged. | ||
| """ | ||
| self._task = task | ||
| self._event_queue = event_queue | ||
| self._runner_context = runner_context | ||
| self._resume_fn = resume_fn | ||
|
|
||
| @property | ||
| def is_idle(self) -> bool: | ||
| """Whether the background task has finished producing events. | ||
| This becomes ``True`` when the background task completes, which happens | ||
| when the workflow reaches any terminal run state — including | ||
| :attr:`~WorkflowRunState.IDLE`, | ||
| :attr:`~WorkflowRunState.IDLE_WITH_PENDING_REQUESTS`, or | ||
| :attr:`~WorkflowRunState.FAILED`. To determine which state the workflow | ||
| ended in, inspect the status events returned by :meth:`poll`. | ||
| """ | ||
| return self._task.done() | ||
|
|
||
| async def poll(self) -> list[WorkflowEvent[Any]]: | ||
| """Drain all currently queued events without blocking. | ||
| Returns: | ||
| A list of events produced since the last poll. Returns an empty | ||
| list if no events are available. | ||
| """ | ||
| events: list[WorkflowEvent[Any]] = [] | ||
| # Use get_nowait() in a loop to drain all available events. | ||
| # This is safe from race conditions because asyncio uses cooperative | ||
| # multitasking: since we never await inside this loop, no other | ||
| # coroutine (including the background task producing events) can | ||
| # execute until we finish. This guarantees we get a consistent | ||
| # snapshot of all events queued at the moment poll() was called. | ||
| while True: | ||
| try: | ||
| events.append(self._event_queue.get_nowait()) | ||
| except asyncio.QueueEmpty: | ||
| # Queue is empty — we've drained all currently available events. | ||
| # Any events added after this point will be picked up in the | ||
| # next poll() call. | ||
| break | ||
| return events | ||
|
|
||
| async def respond(self, responses: dict[str, Any]) -> None: | ||
| """Send responses to pending ``request_info`` events. | ||
| If the workflow is still running, the responses are injected into the | ||
| runner context and picked up in the next superstep. If the workflow | ||
| has already converged (idle), the responses are injected and the | ||
| runner is automatically resumed. | ||
| Args: | ||
| responses: A mapping of request IDs to response data. | ||
| Raises: | ||
| ValueError: If a request ID is unknown. | ||
| TypeError: If a response type does not match the expected type. | ||
| """ | ||
| for request_id, response in responses.items(): | ||
| await self._runner_context.send_request_info_response(request_id, response) | ||
|
|
||
| if self.is_idle: | ||
| self._task = await self._resume_fn() | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |||||
| from .._types import ResponseStream | ||||||
| from ..observability import OtelAttr, capture_exception, create_workflow_span | ||||||
| from ._agent import WorkflowAgent | ||||||
| from ._background_run import BackgroundRunHandle | ||||||
| from ._checkpoint import CheckpointStorage | ||||||
| from ._const import DEFAULT_MAX_ITERATIONS, WORKFLOW_RUN_KWARGS_KEY | ||||||
| from ._edge import ( | ||||||
|
|
@@ -537,6 +538,101 @@ def run( | |||||
| return response_stream | ||||||
| return response_stream.get_final_response() | ||||||
|
|
||||||
| def run_in_background( | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we genuinely need a polling-based consumption pattern? Are we getting feedback that this is missing today? We're now pushing more concerns onto the caller. Every consumer has to:
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I very much agree, this is not needed and leads to un-pythonic code |
||||||
| self, | ||||||
| message: Any | None = None, | ||||||
| *, | ||||||
| responses: dict[str, Any] | None = None, | ||||||
| checkpoint_id: str | None = None, | ||||||
| checkpoint_storage: CheckpointStorage | None = None, | ||||||
| **kwargs: Any, | ||||||
| ) -> BackgroundRunHandle: | ||||||
| """Start the workflow in a background asyncio task and return a polling handle. | ||||||
|
|
||||||
| The workflow runs as a producer in a background task, buffering events into | ||||||
| an internal queue. The caller acts as the single consumer, draining events | ||||||
| via :meth:`BackgroundRunHandle.poll`. The workflow always runs in streaming | ||||||
| mode internally. | ||||||
|
|
||||||
| This is a one-shot execution: the workflow runs until it reaches an idle | ||||||
| state (or fails), after which :attr:`BackgroundRunHandle.is_idle` becomes | ||||||
| ``True``. To resume, call :meth:`BackgroundRunHandle.respond` which | ||||||
| automatically restarts the runner, or call ``run_in_background`` again. | ||||||
|
|
||||||
| Args: | ||||||
| message: Initial message for the start executor. Required for new workflow runs. | ||||||
| Mutually exclusive with responses. | ||||||
| responses: Responses to send for pending request info events. Mutually | ||||||
| exclusive with message. Can be combined with checkpoint_id. | ||||||
| checkpoint_id: ID of checkpoint to restore from. | ||||||
| checkpoint_storage: Runtime checkpoint storage. | ||||||
| **kwargs: Additional keyword arguments passed through to agent invocations. | ||||||
|
|
||||||
| Returns: | ||||||
| A :class:`BackgroundRunHandle` for polling events and checking idle state. | ||||||
|
|
||||||
| Raises: | ||||||
| ValueError: If parameter combination is invalid. | ||||||
| RuntimeError: If the workflow is already running. | ||||||
| """ | ||||||
| self._validate_run_params(message, responses, checkpoint_id) | ||||||
| self._ensure_not_running() | ||||||
|
|
||||||
| event_queue: asyncio.Queue[WorkflowEvent[Any]] = asyncio.Queue() | ||||||
|
|
||||||
| async def _background_producer( | ||||||
| message: Any | None = None, | ||||||
| responses: dict[str, Any] | None = None, | ||||||
| checkpoint_id: str | None = None, | ||||||
| ) -> None: | ||||||
| try: | ||||||
| async for event in self._run_core( | ||||||
| message=message, | ||||||
| responses=responses, | ||||||
| checkpoint_id=checkpoint_id, | ||||||
| checkpoint_storage=checkpoint_storage, | ||||||
| streaming=True, | ||||||
| **kwargs, | ||||||
| ): | ||||||
| await event_queue.put(event) | ||||||
| except Exception: | ||||||
| # _run_workflow_with_tracing yields failed/status events before | ||||||
| # re-raising, so they are already enqueued above. Suppress here | ||||||
| # so the task completes cleanly and is_idle becomes True. | ||||||
| logger.debug("Background workflow run completed with error; events already enqueued.") | ||||||
| finally: | ||||||
| await self._run_cleanup(checkpoint_storage) | ||||||
|
|
||||||
| async def _resume_producer() -> None: | ||||||
| """Producer for resuming after respond() injects messages into the context.""" | ||||||
| try: | ||||||
| async for event in self._run_workflow_with_tracing( | ||||||
| initial_executor_fn=None, | ||||||
| reset_context=False, | ||||||
| streaming=True, | ||||||
| run_kwargs=kwargs if kwargs else None, | ||||||
| ): | ||||||
| await event_queue.put(event) | ||||||
| except Exception: | ||||||
| logger.debug("Background workflow run completed with error; events already enqueued.") | ||||||
| finally: | ||||||
| await self._run_cleanup(checkpoint_storage) | ||||||
|
|
||||||
| async def _resume() -> asyncio.Task[None]: # noqa: RUF029 | ||||||
|
||||||
| async def _resume() -> asyncio.Task[None]: # noqa: RUF029 | |
| async def _resume() -> asyncio.Task[None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
loggervariable is imported but never used in this module. Consider removing the unused import or adding appropriate debug/error logging where it might be useful (e.g., in therespondmethod when resuming after idle).