diff --git a/components/runners/ambient-runner/ag_ui_gemini_cli/adapter.py b/components/runners/ambient-runner/ag_ui_gemini_cli/adapter.py index 7be1666d7..315c62b4b 100755 --- a/components/runners/ambient-runner/ag_ui_gemini_cli/adapter.py +++ b/components/runners/ambient-runner/ag_ui_gemini_cli/adapter.py @@ -28,9 +28,18 @@ MessagesSnapshotEvent, ) +from ag_ui_claude_sdk.reasoning_events import ( + ReasoningStartEvent, + ReasoningEndEvent, + ReasoningMessageStartEvent, + ReasoningMessageContentEvent, + ReasoningMessageEndEvent, +) + from .types import ( InitEvent, MessageEvent, + ThinkingEvent, ToolUseEvent, ToolResultEvent, ErrorEvent, @@ -65,6 +74,9 @@ def _summarize_event(event: object) -> str: return f"severity={event.severity} msg={event.message[:80]}" if isinstance(event, ResultEvent): return f"status={event.status} stats={event.stats}" + if isinstance(event, ThinkingEvent): + preview = (event.content or "")[:80] + return f"delta={event.delta} content={preview!r}" return "" @@ -100,6 +112,10 @@ async def run( accumulated_text = "" message_timestamp_ms: int | None = None + # Reasoning/thinking state + reasoning_open = False + reasoning_message_id: str | None = None + # Tool tracking current_tool_call_id: str | None = None @@ -142,12 +158,67 @@ async def run( ) continue + # ── thinking ── + if isinstance(event, ThinkingEvent): + if not reasoning_open: + reasoning_message_id = str(uuid.uuid4()) + yield ReasoningStartEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + ) + yield ReasoningMessageStartEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + ) + reasoning_open = True + + if event.content: + yield ReasoningMessageContentEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + delta=event.content, + ) + + # Non-delta thinking: close immediately + if not event.delta and reasoning_open: + yield ReasoningMessageEndEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + ) + yield ReasoningEndEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + ) + reasoning_open = False + reasoning_message_id = None + continue + # ── message (assistant, delta) ── if isinstance(event, MessageEvent): # Skip user messages (already in input) if event.role == "user": continue + # Close open reasoning block before text output + if reasoning_open and reasoning_message_id: + yield ReasoningMessageEndEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + ) + yield ReasoningEndEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + ) + reasoning_open = False + reasoning_message_id = None + if event.role == "assistant" and event.delta: # First text chunk: open a text message if not text_message_open: @@ -218,6 +289,21 @@ async def run( # ── tool_use ── if isinstance(event, ToolUseEvent): + # Close open reasoning block before tool call + if reasoning_open and reasoning_message_id: + yield ReasoningMessageEndEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + ) + yield ReasoningEndEvent( + threadId=thread_id, + runId=run_id, + messageId=reasoning_message_id, + ) + reasoning_open = False + reasoning_message_id = None + # Close any open text message before tool call if text_message_open and current_message_id: yield TextMessageEndEvent( @@ -356,6 +442,19 @@ async def run( except Exception as exc: logger.error("Error in Gemini CLI adapter run: %s", exc) + # Clean up open reasoning block + if reasoning_open and reasoning_message_id: + try: + yield ReasoningMessageEndEvent( + threadId=thread_id, runId=run_id, messageId=reasoning_message_id, + ) + yield ReasoningEndEvent( + threadId=thread_id, runId=run_id, messageId=reasoning_message_id, + ) + except Exception: + pass + reasoning_open = False + # Clean up open text message if text_message_open and current_message_id: try: @@ -374,6 +473,18 @@ async def run( message=str(exc), ) finally: + # Safety: close any hanging reasoning block + if reasoning_open and reasoning_message_id: + try: + yield ReasoningMessageEndEvent( + threadId=thread_id, runId=run_id, messageId=reasoning_message_id, + ) + yield ReasoningEndEvent( + threadId=thread_id, runId=run_id, messageId=reasoning_message_id, + ) + except Exception: + pass + # Safety: close any hanging text message if text_message_open and current_message_id: try: diff --git a/components/runners/ambient-runner/ag_ui_gemini_cli/types.py b/components/runners/ambient-runner/ag_ui_gemini_cli/types.py old mode 100644 new mode 100755 index 8ba58e938..b0132f12b --- a/components/runners/ambient-runner/ag_ui_gemini_cli/types.py +++ b/components/runners/ambient-runner/ag_ui_gemini_cli/types.py @@ -1,4 +1,4 @@ -"""Dataclasses for the 6 Gemini CLI JSONL event types.""" +"""Dataclasses for Gemini CLI JSONL event types.""" import json import logging @@ -8,7 +8,7 @@ # Type tag used to dispatch parsed events. _EVENT_TYPES = frozenset( - {"init", "message", "tool_use", "tool_result", "error", "result"} + {"init", "message", "tool_use", "tool_result", "error", "result", "thinking"} ) @@ -65,6 +65,22 @@ class ResultEvent: stats: dict | None = None +@dataclass +class ThinkingEvent: + """Gemini CLI thinking/reasoning event. + + Emitted when the model produces reasoning traces (requires the CLI to + expose ``thinking`` events in its ``stream-json`` output). The Gemini + CLI internally tracks thinking via a ``ThoughtSummary`` structure with + ``subject`` and ``description`` fields. + """ + + type: str # "thinking" + timestamp: str + content: str = "" + delta: bool = False + + _TYPE_MAP = { "init": InitEvent, "message": MessageEvent, @@ -72,12 +88,13 @@ class ResultEvent: "tool_result": ToolResultEvent, "error": ErrorEvent, "result": ResultEvent, + "thinking": ThinkingEvent, } def parse_event( line: str, -) -> InitEvent | MessageEvent | ToolUseEvent | ToolResultEvent | ErrorEvent | ResultEvent | None: +) -> InitEvent | MessageEvent | ToolUseEvent | ToolResultEvent | ErrorEvent | ResultEvent | ThinkingEvent | None: """Parse a JSON line into the appropriate event dataclass. Returns ``None`` when the line cannot be parsed or has an unknown type. diff --git a/components/runners/ambient-runner/tests/test_gemini_cli_adapter.py b/components/runners/ambient-runner/tests/test_gemini_cli_adapter.py old mode 100644 new mode 100755 index fd88f0171..7e736547a --- a/components/runners/ambient-runner/tests/test_gemini_cli_adapter.py +++ b/components/runners/ambient-runner/tests/test_gemini_cli_adapter.py @@ -9,6 +9,7 @@ InitEvent, MessageEvent, ResultEvent, + ThinkingEvent, ToolResultEvent, ToolUseEvent, parse_event, @@ -158,6 +159,33 @@ def test_result_event_error(self): assert evt.status == "error" assert evt.error["type"] == "FatalAuthenticationError" + def test_thinking_event(self): + line = json.dumps( + { + "type": "thinking", + "timestamp": "2025-01-01T00:00:01Z", + "content": "Let me reason about this...", + "delta": True, + } + ) + evt = parse_event(line) + assert isinstance(evt, ThinkingEvent) + assert evt.content == "Let me reason about this..." + assert evt.delta is True + + def test_thinking_event_non_delta(self): + line = json.dumps( + { + "type": "thinking", + "timestamp": "2025-01-01T00:00:01Z", + "content": "Full thought.", + } + ) + evt = parse_event(line) + assert isinstance(evt, ThinkingEvent) + assert evt.content == "Full thought." + assert evt.delta is False + def test_invalid_json_returns_none(self): evt = parse_event("not valid json") assert evt is None @@ -301,3 +329,233 @@ async def line_stream(): assert "TOOL_CALL_START" in types assert "TOOL_CALL_ARGS" in types assert "TOOL_CALL_END" in types + + @pytest.mark.asyncio + async def test_thinking_then_text_response(self): + """thinking + assistant message → REASONING events + TEXT events.""" + from ag_ui_gemini_cli.adapter import GeminiCLIAdapter + from ag_ui.core import RunAgentInput + + lines = [ + json.dumps( + { + "type": "init", + "timestamp": "T", + "session_id": "s1", + "model": "gemini-2.5-pro", + } + ), + json.dumps( + { + "type": "thinking", + "timestamp": "T", + "content": "Let me think about this...", + "delta": True, + } + ), + json.dumps( + { + "type": "thinking", + "timestamp": "T", + "content": " I should consider X.", + "delta": True, + } + ), + json.dumps( + { + "type": "message", + "timestamp": "T", + "role": "assistant", + "content": "Here is my answer.", + "delta": True, + } + ), + json.dumps( + { + "type": "result", + "timestamp": "T", + "status": "success", + "stats": {"total_tokens": 20}, + } + ), + ] + + async def line_stream(): + for line in lines: + yield line + + input_data = RunAgentInput( + thread_id="t1", + run_id="r1", + state={}, + messages=[], + tools=[], + context=[], + forwardedProps={}, + ) + adapter = GeminiCLIAdapter() + events = [] + async for event in adapter.run(input_data, line_stream=line_stream()): + events.append(event) + + types = [e.type if isinstance(e.type, str) else e.type for e in events] + assert "RUN_STARTED" in types + assert "REASONING_START" in types + assert "REASONING_MESSAGE_START" in types + assert "REASONING_MESSAGE_CONTENT" in types + assert "REASONING_MESSAGE_END" in types + assert "REASONING_END" in types + assert "TEXT_MESSAGE_START" in types + assert "TEXT_MESSAGE_CONTENT" in types + assert "RUN_FINISHED" in types + + # Reasoning events should come before text events + reasoning_start_idx = types.index("REASONING_START") + reasoning_end_idx = types.index("REASONING_END") + text_start_idx = types.index("TEXT_MESSAGE_START") + assert reasoning_start_idx < reasoning_end_idx < text_start_idx + + # Should have two REASONING_MESSAGE_CONTENT events (two delta chunks) + reasoning_content_events = [ + e for e in events if getattr(e, "type", None) == "REASONING_MESSAGE_CONTENT" + ] + assert len(reasoning_content_events) == 2 + assert reasoning_content_events[0].delta == "Let me think about this..." + assert reasoning_content_events[1].delta == " I should consider X." + + @pytest.mark.asyncio + async def test_non_delta_thinking(self): + """Non-delta thinking event opens and closes reasoning block immediately.""" + from ag_ui_gemini_cli.adapter import GeminiCLIAdapter + from ag_ui.core import RunAgentInput + + lines = [ + json.dumps( + { + "type": "init", + "timestamp": "T", + "session_id": "s1", + "model": "gemini-2.5-pro", + } + ), + json.dumps( + { + "type": "thinking", + "timestamp": "T", + "content": "Full reasoning block.", + } + ), + json.dumps( + { + "type": "message", + "timestamp": "T", + "role": "assistant", + "content": "Answer.", + "delta": True, + } + ), + json.dumps({"type": "result", "timestamp": "T", "status": "success"}), + ] + + async def line_stream(): + for line in lines: + yield line + + input_data = RunAgentInput( + thread_id="t1", + run_id="r1", + state={}, + messages=[], + tools=[], + context=[], + forwardedProps={}, + ) + adapter = GeminiCLIAdapter() + events = [] + async for event in adapter.run(input_data, line_stream=line_stream()): + events.append(event) + + types = [e.type if isinstance(e.type, str) else e.type for e in events] + # Reasoning should be fully closed before text starts + assert "REASONING_START" in types + assert "REASONING_MESSAGE_START" in types + assert "REASONING_MESSAGE_CONTENT" in types + assert "REASONING_MESSAGE_END" in types + assert "REASONING_END" in types + assert "TEXT_MESSAGE_START" in types + + # Non-delta: reasoning block closed immediately (not by the message handler) + reasoning_end_idx = types.index("REASONING_END") + text_start_idx = types.index("TEXT_MESSAGE_START") + assert reasoning_end_idx < text_start_idx + + @pytest.mark.asyncio + async def test_thinking_before_tool_call(self): + """Reasoning block is closed before tool call events are emitted.""" + from ag_ui_gemini_cli.adapter import GeminiCLIAdapter + from ag_ui.core import RunAgentInput + + lines = [ + json.dumps( + { + "type": "init", + "timestamp": "T", + "session_id": "s1", + "model": "gemini-2.5-pro", + } + ), + json.dumps( + { + "type": "thinking", + "timestamp": "T", + "content": "I need to read a file.", + "delta": True, + } + ), + json.dumps( + { + "type": "tool_use", + "timestamp": "T", + "tool_name": "read_file", + "tool_id": "t1", + "parameters": {"path": "a.py"}, + } + ), + json.dumps( + { + "type": "tool_result", + "timestamp": "T", + "tool_id": "t1", + "status": "success", + "output": "data", + } + ), + json.dumps({"type": "result", "timestamp": "T", "status": "success"}), + ] + + async def line_stream(): + for line in lines: + yield line + + input_data = RunAgentInput( + thread_id="t1", + run_id="r1", + state={}, + messages=[], + tools=[], + context=[], + forwardedProps={}, + ) + adapter = GeminiCLIAdapter() + events = [] + async for event in adapter.run(input_data, line_stream=line_stream()): + events.append(event) + + types = [e.type if isinstance(e.type, str) else e.type for e in events] + assert "REASONING_END" in types + assert "TOOL_CALL_START" in types + + # Reasoning must be closed before tool call starts + reasoning_end_idx = types.index("REASONING_END") + tool_start_idx = types.index("TOOL_CALL_START") + assert reasoning_end_idx < tool_start_idx