Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions python/packages/ag-ui/agent_framework_ag_ui/_run_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,134 @@ def _emit_usage(content: Content) -> list[BaseEvent]:
return [CustomEvent(name="usage", value=usage_details)]


def _emit_mcp_tool_call(content: Content, flow: FlowState) -> list[BaseEvent]:
"""Emit ToolCall start/args events for MCP server tool call content.

MCP tool calls arrive as complete items (not streamed deltas), so we emit a
``ToolCallStartEvent`` (and, when arguments are present, a ``ToolCallArgsEvent``)
immediately. This maps MCP-specific fields (tool_name, server_name) to the
same AG-UI ToolCall* events used by regular function calls, making MCP tool
execution visible to AG-UI consumers. Completion/end events are handled
separately by ``_emit_mcp_tool_result``.
"""
events: list[BaseEvent] = []

tool_call_id = content.call_id or generate_event_id()
tool_name = content.tool_name or "mcp_tool"

# Prefix with server name for disambiguation when available
display_name = f"{content.server_name}/{tool_name}" if content.server_name else tool_name

events.append(
ToolCallStartEvent(
tool_call_id=tool_call_id,
tool_call_name=display_name,
parent_message_id=flow.message_id,
)
)

# Serialize arguments
args_str = ""
if content.arguments:
args_str = (
content.arguments
if isinstance(content.arguments, str)
else json.dumps(make_json_safe(content.arguments))
)
events.append(ToolCallArgsEvent(tool_call_id=tool_call_id, delta=args_str))

# Track in flow state for MESSAGES_SNAPSHOT
tool_entry = {
"id": tool_call_id,
"type": "function",
"function": {"name": display_name, "arguments": args_str},
}
flow.pending_tool_calls.append(tool_entry)
flow.tool_calls_by_id[tool_call_id] = tool_entry

return events


def _emit_mcp_tool_result(content: Content, flow: FlowState) -> list[BaseEvent]:
"""Emit ToolCallResult events for MCP server tool result content.

Maps MCP tool results to the same AG-UI ToolCallEnd + ToolCallResult events
used by regular function results. Uses ``content.output`` (the MCP-specific
result field) instead of ``content.result``.

Mirrors the FlowState cleanup performed by ``_emit_tool_result`` (resetting
tool_call_id/tool_call_name, closing any open text message) so MCP results
behave consistently with standard tool results.
"""
events: list[BaseEvent] = []

if not content.call_id:
logger.warning("MCP tool result content missing call_id, skipping")
return events
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the event that call_id is missing, we simply return. Should we log anything?


events.append(ToolCallEndEvent(tool_call_id=content.call_id))
flow.tool_calls_ended.add(content.call_id)

raw_output = content.output if content.output is not None else ""
result_content = raw_output if isinstance(raw_output, str) else json.dumps(make_json_safe(raw_output))
message_id = generate_event_id()
events.append(
ToolCallResultEvent(
message_id=message_id,
tool_call_id=content.call_id,
content=result_content,
role="tool",
)
)

flow.tool_results.append(
{
"id": message_id,
"role": "tool",
"toolCallId": content.call_id,
"content": result_content,
}
)

# Mirror _emit_tool_result cleanup so MCP results behave consistently
flow.tool_call_id = None
flow.tool_call_name = None

if flow.message_id:
logger.debug("Closing text message for MCP tool result: message_id=%s", flow.message_id)
events.append(TextMessageEndEvent(message_id=flow.message_id))
flow.message_id = None
flow.accumulated_text = ""

return events


def _emit_text_reasoning(content: Content) -> list[BaseEvent]:
"""Emit a custom event for text_reasoning content.

AG-UI protocol does not define a dedicated reasoning event type, so we emit
a ``CustomEvent`` with ``name="text_reasoning"``. This makes reasoning /
chain-of-thought progress visible to frontends that listen for custom events,
following the same pattern used by ``_emit_usage``.
"""
# Only emit user-visible text from content.text. Do not fall back to
# protected_data as text, since protected_data may contain non-display
# payloads such as provider-specific reasoning metadata.
text = content.text or ""
if not text and content.protected_data is None:
return []

value: dict[str, Any] = {"text": text}
# Expose protected_data under a separate key so consumers can decide
# whether/how to render it, without conflating it with display text.
if content.protected_data is not None:
value["protected_data"] = content.protected_data
if content.id:
value["id"] = content.id

return [CustomEvent(name="text_reasoning", value=value)]


def _emit_content(
content: Any,
flow: FlowState,
Expand All @@ -374,5 +502,11 @@ def _emit_content(
return _emit_approval_request(content, flow, predictive_handler, require_confirmation)
if content_type == "usage":
return _emit_usage(content)
if content_type == "mcp_server_tool_call":
return _emit_mcp_tool_call(content, flow)
if content_type == "mcp_server_tool_result":
return _emit_mcp_tool_result(content, flow)
if content_type == "text_reasoning":
return _emit_text_reasoning(content)
logger.debug("Skipping unsupported content type in AG-UI emitter: %s", content_type)
return []
Loading