Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions python/CODING_STANDARD.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,39 @@ user_msg = UserMessage(content="Hello, world!")
asst_msg = AssistantMessage(content="Hello, world!")
```

### Empty Method Bodies

Use `pass` for empty method bodies rather than `...` (Ellipsis). The `...` literal should be reserved for `@overload` stubs and `= ...` default argument sentinels, where it carries idiomatic meaning.

```python
# ✅ Preferred - concrete no-op method body
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
"""Hook called when the workflow is restored from a checkpoint."""
pass

# ✅ Preferred - Protocol method stub
class CheckpointStorage(Protocol):
async def save(self, checkpoint: WorkflowCheckpoint) -> CheckpointID:
"""Save a checkpoint and return its ID."""
pass

# ✅ Preferred - abstract method
@abstractmethod
async def plan(self, context: Context) -> Message:
"""Create a plan for the task."""
raise NotImplementedError

# ✅ Acceptable - @overload stubs and default argument sentinels use ...
@overload
def run(self, stream: Literal[True]) -> ResponseStream: ...
@overload
def run(self, stream: Literal[False] = ...) -> Response: ...

# ❌ Avoid - ... in concrete or Protocol method bodies
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
...
```

### Import Structure

The package follows a flat import structure:
Expand Down
12 changes: 6 additions & 6 deletions python/packages/core/agent_framework/_workflows/_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ async def save(self, checkpoint: WorkflowCheckpoint) -> CheckpointID:
Returns:
The unique ID of the saved checkpoint.
"""
...
pass

async def load(self, checkpoint_id: CheckpointID) -> WorkflowCheckpoint:
"""Load a checkpoint by ID.
Expand All @@ -142,7 +142,7 @@ async def load(self, checkpoint_id: CheckpointID) -> WorkflowCheckpoint:
Raises:
WorkflowCheckpointException: If no checkpoint with the given ID exists.
"""
...
pass

async def list_checkpoints(self, *, workflow_name: str) -> list[WorkflowCheckpoint]:
"""List checkpoint objects for a given workflow name.
Expand All @@ -153,7 +153,7 @@ async def list_checkpoints(self, *, workflow_name: str) -> list[WorkflowCheckpoi
Returns:
A list of WorkflowCheckpoint objects for the specified workflow name.
"""
...
pass

async def delete(self, checkpoint_id: CheckpointID) -> bool:
"""Delete a checkpoint by ID.
Expand All @@ -164,7 +164,7 @@ async def delete(self, checkpoint_id: CheckpointID) -> bool:
Returns:
True if the checkpoint was successfully deleted, False if no checkpoint with the given ID exists.
"""
...
pass

async def get_latest(self, *, workflow_name: str) -> WorkflowCheckpoint | None:
"""Get the latest checkpoint for a given workflow name.
Expand All @@ -175,7 +175,7 @@ async def get_latest(self, *, workflow_name: str) -> WorkflowCheckpoint | None:
Returns:
The latest WorkflowCheckpoint object for the specified workflow name, or None if no checkpoints exist.
"""
...
pass

async def list_checkpoint_ids(self, *, workflow_name: str) -> list[CheckpointID]:
"""List checkpoint IDs for a given workflow name.
Expand All @@ -186,7 +186,7 @@ async def list_checkpoint_ids(self, *, workflow_name: str) -> list[CheckpointID]
Returns:
A list of checkpoint IDs for the specified workflow name.
"""
...
pass


class InMemoryCheckpointStorage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
Args:
state: The state dictionary that was saved during checkpointing.
"""
...
pass


# endregion: Executor
Expand Down
38 changes: 19 additions & 19 deletions python/packages/core/agent_framework/_workflows/_runner_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,51 +105,51 @@ async def send_message(self, WorkflowMessage: WorkflowMessage) -> None:
Args:
WorkflowMessage: The WorkflowMessage to be sent.
"""
...
pass

async def drain_messages(self) -> dict[str, list[WorkflowMessage]]:
"""Drain all messages from the context.

Returns:
A dictionary mapping executor IDs to lists of messages.
"""
...
pass

async def has_messages(self) -> bool:
"""Check if there are any messages in the context.

Returns:
True if there are messages, False otherwise.
"""
...
pass

async def add_event(self, event: WorkflowEvent) -> None:
"""Add an event to the execution context.

Args:
event: The event to be added.
"""
...
pass

async def drain_events(self) -> list[WorkflowEvent]:
"""Drain all events from the context.

Returns:
A list of events that were added to the context.
"""
...
pass

async def has_events(self) -> bool:
"""Check if there are any events in the context.

Returns:
True if there are events, False otherwise.
"""
...
pass

async def next_event(self) -> WorkflowEvent: # pragma: no cover - interface only
"""Wait for and return the next event emitted by the workflow run."""
...
pass

# Checkpointing capability
def has_checkpointing(self) -> bool:
Expand All @@ -158,39 +158,39 @@ def has_checkpointing(self) -> bool:
Returns:
True if checkpointing is supported, False otherwise.
"""
...
pass

def set_runtime_checkpoint_storage(self, storage: CheckpointStorage) -> None:
"""Set runtime checkpoint storage to override build-time configuration.

Args:
storage: The checkpoint storage to use for this run.
"""
...
pass

def clear_runtime_checkpoint_storage(self) -> None:
"""Clear runtime checkpoint storage override."""
...
pass

def reset_for_new_run(self) -> None:
"""Reset the context for a new workflow run."""
...
pass

def set_streaming(self, streaming: bool) -> None:
"""Set whether agents should stream incremental updates.

Args:
streaming: True for streaming mode (stream=True), False for non-streaming (stream=False).
"""
...
pass

def is_streaming(self) -> bool:
"""Check if the workflow is in streaming mode.

Returns:
True if streaming mode is enabled, False otherwise.
"""
...
pass

async def create_checkpoint(
self,
Expand All @@ -217,7 +217,7 @@ async def create_checkpoint(
Returns:
The ID of the created checkpoint.
"""
...
pass

async def load_checkpoint(self, checkpoint_id: CheckpointID) -> WorkflowCheckpoint | None:
"""Load a checkpoint without mutating the current context state.
Expand All @@ -228,23 +228,23 @@ async def load_checkpoint(self, checkpoint_id: CheckpointID) -> WorkflowCheckpoi
Returns:
The loaded checkpoint, or None if it does not exist.
"""
...
pass

async def apply_checkpoint(self, checkpoint: WorkflowCheckpoint) -> None:
"""Apply a checkpoint to the current context, mutating its state.

Args:
checkpoint: The checkpoint whose state is to be applied.
"""
...
pass

async def add_request_info_event(self, event: WorkflowEvent[Any]) -> None:
"""Add a request_info event to the context and track it for correlation.

Args:
event: The WorkflowEvent with type='request_info' to be added.
"""
...
pass

async def send_request_info_response(self, request_id: str, response: Any) -> None:
"""Send a response correlated to a pending request.
Expand All @@ -253,15 +253,15 @@ async def send_request_info_response(self, request_id: str, response: Any) -> No
request_id: The ID of the original request.
response: The response data to be sent.
"""
...
pass

async def get_pending_request_info_events(self) -> dict[str, WorkflowEvent[Any]]:
"""Get the mapping of request IDs to their corresponding request_info events.

Returns:
A dictionary mapping request IDs to their corresponding WorkflowEvent (type='request_info').
"""
...
pass


class InProcRunnerContext:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,22 +474,22 @@ def __init__(
@abstractmethod
async def plan(self, magentic_context: MagenticContext) -> Message:
"""Create a plan for the task."""
...
raise NotImplementedError

@abstractmethod
async def replan(self, magentic_context: MagenticContext) -> Message:
"""Replan for the task."""
...
raise NotImplementedError

@abstractmethod
async def create_progress_ledger(self, magentic_context: MagenticContext) -> MagenticProgressLedger:
"""Create a progress ledger."""
...
raise NotImplementedError

@abstractmethod
async def prepare_final_answer(self, magentic_context: MagenticContext) -> Message:
"""Prepare the final answer."""
...
raise NotImplementedError

def on_checkpoint_save(self) -> dict[str, Any]:
"""Serialize runtime state for checkpointing."""
Expand Down