From 9c50935511c33bca04a064849f1cd601a9a9673a Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Tue, 7 Apr 2026 13:46:26 +0000 Subject: [PATCH] feat: add reasoning event support to Gemini CLI bridge Add support for emitting REASONING_MESSAGE_START/CONTENT/END AG-UI events in the Gemini CLI adapter by parsing 'thinking' events from the NDJSON stream. This reuses the existing reasoning event models from the Claude SDK adapter. The Gemini CLI internally tracks thinking via GeminiEventType.Thought but does not yet surface it in the stream-json output format. This change prepares the adapter to handle thinking events as soon as upstream support is added, while remaining a no-op for current CLI versions. Closes #1127 Co-Authored-By: Claude Opus 4.6 --- .../ag_ui_gemini_cli/adapter.py | 111 ++++++++ .../ambient-runner/ag_ui_gemini_cli/types.py | 23 +- .../tests/test_gemini_cli_adapter.py | 258 ++++++++++++++++++ 3 files changed, 389 insertions(+), 3 deletions(-) mode change 100644 => 100755 components/runners/ambient-runner/ag_ui_gemini_cli/types.py mode change 100644 => 100755 components/runners/ambient-runner/tests/test_gemini_cli_adapter.py diff --git a/components/runners/ambient-runner/ag_ui_gemini_cli/adapter.py b/components/runners/ambient-runner/ag_ui_gemini_cli/adapter.py index 7be1666d7..315c62b4b 100755 --- a/components/runners/ambient-runner/ag_ui_gemini_cli/adapter.py +++ b/components/runners/ambient-runner/ag_ui_gemini_cli/adapter.py @@ -28,9 +28,18 @@ MessagesSnapshotEvent, ) +from ag_ui_claude_sdk.reasoning_events import ( + ReasoningStartEvent, + ReasoningEndEvent, + ReasoningMessageStartEvent, + ReasoningMessageContentEvent, + ReasoningMessageEndEvent, +) + from .types import ( InitEvent, MessageEvent, + ThinkingEvent, ToolUseEvent, ToolResultEvent, ErrorEvent, @@ -65,6 +74,9 @@ def _summarize_event(event: object) -> str: return f"severity={event.severity} msg={event.message[:80]}" if isinstance(event, ResultEvent): return f"status={event.status} stats={event.stats}" + if isinstance(event, ThinkingEvent): + preview = (event.content or "")[:80] + return f"delta={event.delta} content={preview!r}" return "" @@ -100,6 +112,10 @@ async def run( accumulated_text = "" message_timestamp_ms: int | None = None + # Reasoning/thinking state + reasoning_open = False + reasoning_message_id: str | None = None + # Tool tracking current_tool_call_id: str | None = None @@ -142,12 +158,67 @@ async def run( ) continue + # ── thinking ── + if isinstance(event, ThinkingEvent): + if not reasoning_open: + reasoning_message_id = str(uuid.uuid4()) + yield ReasoningStartEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + ) + yield ReasoningMessageStartEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + ) + reasoning_open = True + + if event.content: + yield ReasoningMessageContentEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + delta=event.content, + ) + + # Non-delta thinking: close immediately + if not event.delta and reasoning_open: + yield ReasoningMessageEndEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + ) + yield ReasoningEndEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + ) + reasoning_open = False + reasoning_message_id = None + continue + # ── message (assistant, delta) ── if isinstance(event, MessageEvent): # Skip user messages (already in input) if event.role == "user": continue + # Close open reasoning block before text output + if reasoning_open and reasoning_message_id: + yield ReasoningMessageEndEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + ) + yield ReasoningEndEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + ) + reasoning_open = False + reasoning_message_id = None + if event.role == "assistant" and event.delta: # First text chunk: open a text message if not text_message_open: @@ -218,6 +289,21 @@ async def run( # ── tool_use ── if isinstance(event, ToolUseEvent): + # Close open reasoning block before tool call + if reasoning_open and reasoning_message_id: + yield ReasoningMessageEndEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + ) + yield ReasoningEndEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + ) + reasoning_open = False + reasoning_message_id = None + # Close any open text message before tool call if text_message_open and current_message_id: yield TextMessageEndEvent( @@ -356,6 +442,19 @@ async def run( except Exception as exc: logger.error("Error in Gemini CLI adapter run: %s", exc) + # Clean up open reasoning block + if reasoning_open and reasoning_message_id: + try: + yield ReasoningMessageEndEvent( + threadId=thread_id, runId=run_id, messageId=reasoning_message_id, + ) + yield ReasoningEndEvent( + threadId=thread_id, runId=run_id, messageId=reasoning_message_id, + ) + except Exception: + pass + reasoning_open = False + # Clean up open text message if text_message_open and current_message_id: try: @@ -374,6 +473,18 @@ async def run( message=str(exc), ) finally: + # Safety: close any hanging reasoning block + if reasoning_open and reasoning_message_id: + try: + yield ReasoningMessageEndEvent( + threadId=thread_id, runId=run_id, messageId=reasoning_message_id, + ) + yield ReasoningEndEvent( + threadId=thread_id, runId=run_id, messageId=reasoning_message_id, + ) + except Exception: + pass + # Safety: close any hanging text message if text_message_open and current_message_id: try: diff --git a/components/runners/ambient-runner/ag_ui_gemini_cli/types.py b/components/runners/ambient-runner/ag_ui_gemini_cli/types.py old mode 100644 new mode 100755 index 8ba58e938..b0132f12b --- a/components/runners/ambient-runner/ag_ui_gemini_cli/types.py +++ b/components/runners/ambient-runner/ag_ui_gemini_cli/types.py @@ -1,4 +1,4 @@ -"""Dataclasses for the 6 Gemini CLI JSONL event types.""" +"""Dataclasses for Gemini CLI JSONL event types.""" import json import logging @@ -8,7 +8,7 @@ # Type tag used to dispatch parsed events. _EVENT_TYPES = frozenset( - {"init", "message", "tool_use", "tool_result", "error", "result"} + {"init", "message", "tool_use", "tool_result", "error", "result", "thinking"} ) @@ -65,6 +65,22 @@ class ResultEvent: stats: dict | None = None +@dataclass +class ThinkingEvent: + """Gemini CLI thinking/reasoning event. + + Emitted when the model produces reasoning traces (requires the CLI to + expose ``thinking`` events in its ``stream-json`` output). The Gemini + CLI internally tracks thinking via a ``ThoughtSummary`` structure with + ``subject`` and ``description`` fields. + """ + + type: str # "thinking" + timestamp: str + content: str = "" + delta: bool = False + + _TYPE_MAP = { "init": InitEvent, "message": MessageEvent, @@ -72,12 +88,13 @@ class ResultEvent: "tool_result": ToolResultEvent, "error": ErrorEvent, "result": ResultEvent, + "thinking": ThinkingEvent, } def parse_event( line: str, -) -> InitEvent | MessageEvent | ToolUseEvent | ToolResultEvent | ErrorEvent | ResultEvent | None: +) -> InitEvent | MessageEvent | ToolUseEvent | ToolResultEvent | ErrorEvent | ResultEvent | ThinkingEvent | None: """Parse a JSON line into the appropriate event dataclass. Returns ``None`` when the line cannot be parsed or has an unknown type. diff --git a/components/runners/ambient-runner/tests/test_gemini_cli_adapter.py b/components/runners/ambient-runner/tests/test_gemini_cli_adapter.py old mode 100644 new mode 100755 index fd88f0171..7e736547a --- a/components/runners/ambient-runner/tests/test_gemini_cli_adapter.py +++ b/components/runners/ambient-runner/tests/test_gemini_cli_adapter.py @@ -9,6 +9,7 @@ InitEvent, MessageEvent, ResultEvent, + ThinkingEvent, ToolResultEvent, ToolUseEvent, parse_event, @@ -158,6 +159,33 @@ def test_result_event_error(self): assert evt.status == "error" assert evt.error["type"] == "FatalAuthenticationError" + def test_thinking_event(self): + line = json.dumps( + { + "type": "thinking", + "timestamp": "2025-01-01T00:00:01Z", + "content": "Let me reason about this...", + "delta": True, + } + ) + evt = parse_event(line) + assert isinstance(evt, ThinkingEvent) + assert evt.content == "Let me reason about this..." + assert evt.delta is True + + def test_thinking_event_non_delta(self): + line = json.dumps( + { + "type": "thinking", + "timestamp": "2025-01-01T00:00:01Z", + "content": "Full thought.", + } + ) + evt = parse_event(line) + assert isinstance(evt, ThinkingEvent) + assert evt.content == "Full thought." + assert evt.delta is False + def test_invalid_json_returns_none(self): evt = parse_event("not valid json") assert evt is None @@ -301,3 +329,233 @@ async def line_stream(): assert "TOOL_CALL_START" in types assert "TOOL_CALL_ARGS" in types assert "TOOL_CALL_END" in types + + @pytest.mark.asyncio + async def test_thinking_then_text_response(self): + """thinking + assistant message → REASONING events + TEXT events.""" + from ag_ui_gemini_cli.adapter import GeminiCLIAdapter + from ag_ui.core import RunAgentInput + + lines = [ + json.dumps( + { + "type": "init", + "timestamp": "T", + "session_id": "s1", + "model": "gemini-2.5-pro", + } + ), + json.dumps( + { + "type": "thinking", + "timestamp": "T", + "content": "Let me think about this...", + "delta": True, + } + ), + json.dumps( + { + "type": "thinking", + "timestamp": "T", + "content": " I should consider X.", + "delta": True, + } + ), + json.dumps( + { + "type": "message", + "timestamp": "T", + "role": "assistant", + "content": "Here is my answer.", + "delta": True, + } + ), + json.dumps( + { + "type": "result", + "timestamp": "T", + "status": "success", + "stats": {"total_tokens": 20}, + } + ), + ] + + async def line_stream(): + for line in lines: + yield line + + input_data = RunAgentInput( + thread_id="t1", + run_id="r1", + state={}, + messages=[], + tools=[], + context=[], + forwardedProps={}, + ) + adapter = GeminiCLIAdapter() + events = [] + async for event in adapter.run(input_data, line_stream=line_stream()): + events.append(event) + + types = [e.type if isinstance(e.type, str) else e.type for e in events] + assert "RUN_STARTED" in types + assert "REASONING_START" in types + assert "REASONING_MESSAGE_START" in types + assert "REASONING_MESSAGE_CONTENT" in types + assert "REASONING_MESSAGE_END" in types + assert "REASONING_END" in types + assert "TEXT_MESSAGE_START" in types + assert "TEXT_MESSAGE_CONTENT" in types + assert "RUN_FINISHED" in types + + # Reasoning events should come before text events + reasoning_start_idx = types.index("REASONING_START") + reasoning_end_idx = types.index("REASONING_END") + text_start_idx = types.index("TEXT_MESSAGE_START") + assert reasoning_start_idx < reasoning_end_idx < text_start_idx + + # Should have two REASONING_MESSAGE_CONTENT events (two delta chunks) + reasoning_content_events = [ + e for e in events if getattr(e, "type", None) == "REASONING_MESSAGE_CONTENT" + ] + assert len(reasoning_content_events) == 2 + assert reasoning_content_events[0].delta == "Let me think about this..." + assert reasoning_content_events[1].delta == " I should consider X." + + @pytest.mark.asyncio + async def test_non_delta_thinking(self): + """Non-delta thinking event opens and closes reasoning block immediately.""" + from ag_ui_gemini_cli.adapter import GeminiCLIAdapter + from ag_ui.core import RunAgentInput + + lines = [ + json.dumps( + { + "type": "init", + "timestamp": "T", + "session_id": "s1", + "model": "gemini-2.5-pro", + } + ), + json.dumps( + { + "type": "thinking", + "timestamp": "T", + "content": "Full reasoning block.", + } + ), + json.dumps( + { + "type": "message", + "timestamp": "T", + "role": "assistant", + "content": "Answer.", + "delta": True, + } + ), + json.dumps({"type": "result", "timestamp": "T", "status": "success"}), + ] + + async def line_stream(): + for line in lines: + yield line + + input_data = RunAgentInput( + thread_id="t1", + run_id="r1", + state={}, + messages=[], + tools=[], + context=[], + forwardedProps={}, + ) + adapter = GeminiCLIAdapter() + events = [] + async for event in adapter.run(input_data, line_stream=line_stream()): + events.append(event) + + types = [e.type if isinstance(e.type, str) else e.type for e in events] + # Reasoning should be fully closed before text starts + assert "REASONING_START" in types + assert "REASONING_MESSAGE_START" in types + assert "REASONING_MESSAGE_CONTENT" in types + assert "REASONING_MESSAGE_END" in types + assert "REASONING_END" in types + assert "TEXT_MESSAGE_START" in types + + # Non-delta: reasoning block closed immediately (not by the message handler) + reasoning_end_idx = types.index("REASONING_END") + text_start_idx = types.index("TEXT_MESSAGE_START") + assert reasoning_end_idx < text_start_idx + + @pytest.mark.asyncio + async def test_thinking_before_tool_call(self): + """Reasoning block is closed before tool call events are emitted.""" + from ag_ui_gemini_cli.adapter import GeminiCLIAdapter + from ag_ui.core import RunAgentInput + + lines = [ + json.dumps( + { + "type": "init", + "timestamp": "T", + "session_id": "s1", + "model": "gemini-2.5-pro", + } + ), + json.dumps( + { + "type": "thinking", + "timestamp": "T", + "content": "I need to read a file.", + "delta": True, + } + ), + json.dumps( + { + "type": "tool_use", + "timestamp": "T", + "tool_name": "read_file", + "tool_id": "t1", + "parameters": {"path": "a.py"}, + } + ), + json.dumps( + { + "type": "tool_result", + "timestamp": "T", + "tool_id": "t1", + "status": "success", + "output": "data", + } + ), + json.dumps({"type": "result", "timestamp": "T", "status": "success"}), + ] + + async def line_stream(): + for line in lines: + yield line + + input_data = RunAgentInput( + thread_id="t1", + run_id="r1", + state={}, + messages=[], + tools=[], + context=[], + forwardedProps={}, + ) + adapter = GeminiCLIAdapter() + events = [] + async for event in adapter.run(input_data, line_stream=line_stream()): + events.append(event) + + types = [e.type if isinstance(e.type, str) else e.type for e in events] + assert "REASONING_END" in types + assert "TOOL_CALL_START" in types + + # Reasoning must be closed before tool call starts + reasoning_end_idx = types.index("REASONING_END") + tool_start_idx = types.index("TOOL_CALL_START") + assert reasoning_end_idx < tool_start_idx