Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/packages/core/agent_framework/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
AgentExecutorResponse,
)
from ._workflows._agent_utils import resolve_agent_id
from ._workflows._background_run import BackgroundRunHandle
from ._workflows._checkpoint import (
CheckpointStorage,
FileCheckpointStorage,
Expand Down Expand Up @@ -205,6 +206,7 @@
"AgentRunInputs",
"AgentSession",
"Annotation",
"BackgroundRunHandle",
"BaseAgent",
"BaseChatClient",
"BaseContextProvider",
Expand Down
116 changes: 116 additions & 0 deletions python/packages/core/agent_framework/_workflows/_background_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright (c) Microsoft. All rights reserved.

from __future__ import annotations

import asyncio
import logging
from collections.abc import Awaitable, Callable
from typing import Any

from ._events import WorkflowEvent
from ._runner_context import RunnerContext

logger = logging.getLogger(__name__)
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logger variable is imported but never used in this module. Consider removing the unused import or adding appropriate debug/error logging where it might be useful (e.g., in the respond method when resuming after idle).

Copilot uses AI. Check for mistakes.


class BackgroundRunHandle:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like this class is essentially a wrapper around asyncio primitives: create_task and an asyncio.Queue. Why do we need to wrap these well-known constructs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and this also adds another concept that people have to learn, I do not think this is needed

"""Handle for a workflow running in the background.
Provides a polling interface to consume events produced by a background
workflow execution. The workflow runs in an ``asyncio.Task`` and events
are buffered in an internal queue until the caller drains them via
:meth:`poll`.
Responses to ``request_info`` events can be sent while the workflow is
still running via :meth:`respond`, enabling hot-path / cold-path
parallelism.
Example:
.. code-block:: python
handle = workflow.run_in_background(message="Hello")
while not handle.is_idle:
events = await handle.poll()
for event in events:
if event.type == "request_info":
await handle.respond({event.request_id: answer})
"""

def __init__(
self,
task: asyncio.Task[None],
event_queue: asyncio.Queue[WorkflowEvent[Any]],
runner_context: RunnerContext,
resume_fn: Callable[[], Awaitable[asyncio.Task[None]]],
) -> None:
"""Initialize the background run handle.
Args:
task: The asyncio task running the workflow.
event_queue: The queue where workflow events are buffered.
runner_context: The runner context for injecting responses.
resume_fn: Callback that creates and returns a new producer task
to resume the workflow after it has converged.
"""
self._task = task
self._event_queue = event_queue
self._runner_context = runner_context
self._resume_fn = resume_fn

@property
def is_idle(self) -> bool:
"""Whether the background task has finished producing events.
This becomes ``True`` when the background task completes, which happens
when the workflow reaches any terminal run state — including
:attr:`~WorkflowRunState.IDLE`,
:attr:`~WorkflowRunState.IDLE_WITH_PENDING_REQUESTS`, or
:attr:`~WorkflowRunState.FAILED`. To determine which state the workflow
ended in, inspect the status events returned by :meth:`poll`.
"""
return self._task.done()

async def poll(self) -> list[WorkflowEvent[Any]]:
"""Drain all currently queued events without blocking.
Returns:
A list of events produced since the last poll. Returns an empty
list if no events are available.
"""
events: list[WorkflowEvent[Any]] = []
# Use get_nowait() in a loop to drain all available events.
# This is safe from race conditions because asyncio uses cooperative
# multitasking: since we never await inside this loop, no other
# coroutine (including the background task producing events) can
# execute until we finish. This guarantees we get a consistent
# snapshot of all events queued at the moment poll() was called.
while True:
try:
events.append(self._event_queue.get_nowait())
except asyncio.QueueEmpty:
# Queue is empty — we've drained all currently available events.
# Any events added after this point will be picked up in the
# next poll() call.
break
return events

async def respond(self, responses: dict[str, Any]) -> None:
"""Send responses to pending ``request_info`` events.
If the workflow is still running, the responses are injected into the
runner context and picked up in the next superstep. If the workflow
has already converged (idle), the responses are injected and the
runner is automatically resumed.
Args:
responses: A mapping of request IDs to response data.
Raises:
ValueError: If a request ID is unknown.
TypeError: If a response type does not match the expected type.
"""
for request_id, response in responses.items():
await self._runner_context.send_request_info_response(request_id, response)

if self.is_idle:
self._task = await self._resume_fn()
96 changes: 96 additions & 0 deletions python/packages/core/agent_framework/_workflows/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .._types import ResponseStream
from ..observability import OtelAttr, capture_exception, create_workflow_span
from ._agent import WorkflowAgent
from ._background_run import BackgroundRunHandle
from ._checkpoint import CheckpointStorage
from ._const import DEFAULT_MAX_ITERATIONS, WORKFLOW_RUN_KWARGS_KEY
from ._edge import (
Expand Down Expand Up @@ -537,6 +538,101 @@ def run(
return response_stream
return response_stream.get_final_response()

def run_in_background(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we genuinely need a polling-based consumption pattern? Are we getting feedback that this is missing today?

We're now pushing more concerns onto the caller. Every consumer has to:

  1. Write the poll loop
  2. Pick a sleep interval (and get it "wrong", too slow adds latency, too fast wastes cycles)
  3. Route events by type manually
  4. Track which request IDs map to which responses
  5. Remember to drain after idle
  6. Handle the resume-after-idle edge case

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I very much agree, this is not needed and leads to un-pythonic code

self,
message: Any | None = None,
*,
responses: dict[str, Any] | None = None,
checkpoint_id: str | None = None,
checkpoint_storage: CheckpointStorage | None = None,
**kwargs: Any,
) -> BackgroundRunHandle:
"""Start the workflow in a background asyncio task and return a polling handle.

The workflow runs as a producer in a background task, buffering events into
an internal queue. The caller acts as the single consumer, draining events
via :meth:`BackgroundRunHandle.poll`. The workflow always runs in streaming
mode internally.

This is a one-shot execution: the workflow runs until it reaches an idle
state (or fails), after which :attr:`BackgroundRunHandle.is_idle` becomes
``True``. To resume, call :meth:`BackgroundRunHandle.respond` which
automatically restarts the runner, or call ``run_in_background`` again.

Args:
message: Initial message for the start executor. Required for new workflow runs.
Mutually exclusive with responses.
responses: Responses to send for pending request info events. Mutually
exclusive with message. Can be combined with checkpoint_id.
checkpoint_id: ID of checkpoint to restore from.
checkpoint_storage: Runtime checkpoint storage.
**kwargs: Additional keyword arguments passed through to agent invocations.

Returns:
A :class:`BackgroundRunHandle` for polling events and checking idle state.

Raises:
ValueError: If parameter combination is invalid.
RuntimeError: If the workflow is already running.
"""
self._validate_run_params(message, responses, checkpoint_id)
self._ensure_not_running()

event_queue: asyncio.Queue[WorkflowEvent[Any]] = asyncio.Queue()

async def _background_producer(
message: Any | None = None,
responses: dict[str, Any] | None = None,
checkpoint_id: str | None = None,
) -> None:
try:
async for event in self._run_core(
message=message,
responses=responses,
checkpoint_id=checkpoint_id,
checkpoint_storage=checkpoint_storage,
streaming=True,
**kwargs,
):
await event_queue.put(event)
except Exception:
# _run_workflow_with_tracing yields failed/status events before
# re-raising, so they are already enqueued above. Suppress here
# so the task completes cleanly and is_idle becomes True.
logger.debug("Background workflow run completed with error; events already enqueued.")
finally:
await self._run_cleanup(checkpoint_storage)

async def _resume_producer() -> None:
"""Producer for resuming after respond() injects messages into the context."""
try:
async for event in self._run_workflow_with_tracing(
initial_executor_fn=None,
reset_context=False,
streaming=True,
run_kwargs=kwargs if kwargs else None,
):
await event_queue.put(event)
except Exception:
logger.debug("Background workflow run completed with error; events already enqueued.")
finally:
await self._run_cleanup(checkpoint_storage)

async def _resume() -> asyncio.Task[None]: # noqa: RUF029
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The # noqa: RUF029 comment is unnecessary here. RUF029 warns about unused async functions, but _resume is clearly used on line 634 where it's passed to BackgroundRunHandle. This suppression should be removed.

Suggested change
async def _resume() -> asyncio.Task[None]: # noqa: RUF029
async def _resume() -> asyncio.Task[None]:

Copilot uses AI. Check for mistakes.
"""Resume the workflow by launching a new producer task.

The responses have already been injected into the runner context
by :meth:`BackgroundRunHandle.respond`, so the messages are ready
for the next ``run_until_convergence`` cycle.
"""
self._ensure_not_running()
return asyncio.create_task(_resume_producer())

task = asyncio.create_task(
_background_producer(message=message, responses=responses, checkpoint_id=checkpoint_id)
)
return BackgroundRunHandle(task, event_queue, self._runner_context, _resume)

async def _run_core(
self,
message: Any | None = None,
Expand Down
Loading
Loading