feat(invoke): stream agent execution progress in real-time#196
Conversation
Add optional streaming mode to invokeAgent that processes CLI output line-by-line and emits intermediate text events as they arrive. When onEvent callback is provided, extracts JSON events from Claude (stream-json format), Codex (JSONL), and OpenCode (JSONL) outputs and streams extracted text in real-time. Main package now emits agent_progress SSE events during execution, allowing UI to display live agent activity. - New runCommandStreaming() spawns processes and calls callback for each stdout line - New extractClaudeEventText/extractCodexEventText/extractOpenCodeEventText parse provider events - invokeAgent accepts optional onEvent callback for streaming mode - Claude CLI uses --output-format stream-json --verbose when streaming - processMessage emits agent_progress events during agent execution Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Greptile SummaryThis PR adds real-time streaming of agent execution progress by introducing Key changes:
Issues found:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant UI as Web UI (SSE subscriber)
participant Main as processMessage()
participant Core as invokeAgent()
participant Streaming as runCommandStreaming()
participant CLI as Claude / Codex / OpenCode CLI
Main->>Core: invokeAgent(..., onEvent)
Core->>Streaming: runCommandStreaming(cmd, args, onLine)
Streaming->>CLI: spawn process
loop stdout data events
CLI-->>Streaming: stdout chunk
Streaming->>Streaming: buffer lines
Streaming->>Core: onLine(completeLine)
Core->>Core: JSON.parse + extractXxxEventText()
alt text extracted
Core->>Main: onEvent(text)
Main->>UI: emitEvent('agent_progress', {agentId, text, messageId})
end
end
CLI-->>Streaming: close (exit code)
alt exit code == 0
Streaming-->>Core: resolve(stdout)
Core-->>Main: return response
Main->>UI: emitEvent('chain_step_done', {...})
else exit code != 0
Streaming-->>Core: reject(Error)
Core-->>Main: throw
Main->>UI: (error logged, fallback response)
end
Last reviewed commit: 83ff15c |
| child.on('close', (code) => { | ||
| // Flush remaining buffer | ||
| if (lineBuffer.trim()) onLine(lineBuffer); | ||
|
|
||
| if (code === 0) { | ||
| resolve(stdout); | ||
| return; | ||
| } | ||
|
|
||
| const errorMessage = stderr.trim() || `Command exited with code ${code}`; | ||
| reject(new Error(errorMessage)); | ||
| }); |
There was a problem hiding this comment.
Partial buffer flushed even on failed exit
onLine(lineBuffer) is called unconditionally before the exit-code check. If the child process exits with a non-zero code the last incomplete line (which may be a partial JSON fragment from an error diagnostic) is still dispatched through onLine. Callers' try/catch around JSON.parse will silently swallow it, but it can still trigger a spurious onEvent / agent_progress SSE event in the UI before the reject path propagates the real error. The flush should only happen on a clean exit:
| child.on('close', (code) => { | |
| // Flush remaining buffer | |
| if (lineBuffer.trim()) onLine(lineBuffer); | |
| if (code === 0) { | |
| resolve(stdout); | |
| return; | |
| } | |
| const errorMessage = stderr.trim() || `Command exited with code ${code}`; | |
| reject(new Error(errorMessage)); | |
| }); | |
| child.on('close', (code) => { | |
| if (code === 0) { | |
| // Flush remaining buffer only on success | |
| if (lineBuffer.trim()) onLine(lineBuffer); | |
| resolve(stdout); | |
| return; | |
| } | |
| const errorMessage = stderr.trim() || `Command exited with code ${code}`; | |
| reject(new Error(errorMessage)); | |
| }); |
packages/core/src/invoke.ts
Outdated
| if (json.type === 'result' && json.result) { | ||
| return json.result; | ||
| } | ||
| return null; |
There was a problem hiding this comment.
Error result text returned as successful response
Claude's stream-json result event carries an is_error boolean. When is_error: true, json.result holds an error description string, not the agent's answer. The current code returns that error string as if it were a successful response (and the process may still exit with code 0 in some scenarios). This means callers receive an error message in the normal response field instead of a fallback.
Add an is_error guard:
| if (json.type === 'result' && json.result) { | |
| return json.result; | |
| } | |
| return null; | |
| if (json.type === 'result' && json.result && !json.is_error) { | |
| return json.result; | |
| } |
| if (onEvent) { | ||
| claudeArgs.push('--output-format', 'stream-json', '--verbose'); | ||
| claudeArgs.push('-p', message); | ||
|
|
||
| let response = ''; | ||
| await runCommandStreaming('claude', claudeArgs, (line) => { | ||
| try { | ||
| const json = JSON.parse(line); | ||
| const text = extractClaudeEventText(json); | ||
| if (text) { | ||
| response = text; | ||
| onEvent(text); | ||
| } | ||
| } catch (e) { | ||
| // Ignore lines that aren't valid JSON | ||
| } | ||
| }, workingDir, envOverrides); | ||
|
|
||
| return response || 'Sorry, I could not generate a response from Claude.'; |
There was a problem hiding this comment.
response accumulates intermediate text, not just the final result
extractClaudeEventText returns text for both intermediate assistant events (partial turns, [tool: …] labels) and the final result event. Since each call does response = text (assignment, not append), the final return value should be the result event text in the happy path. However, if the result event is never emitted (e.g. the CLI exits 0 but without a result line), response will silently hold the last intermediate assistant fragment and be returned as the full answer — without the fallback message triggering.
Consider only setting response from result-typed events and treating assistant events as progress-only:
child.stdout: on('data') → onLine
└─ extractClaudeEventText:
assistant event → onEvent(text) // progress only
result event → response = text + onEvent(text)
The Claude 'result' event duplicates the final assistant message text. Skip emitting it via onEvent while still capturing it as the return value, so the final response goes through streamResponse exactly once. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ents Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
Add real-time streaming of agent execution progress. The
invokeAgentfunction now accepts an optionalonEventcallback that streams extracted text from each CLI event as it arrives, instead of waiting for the full response. The main package emitsagent_progressSSE events so subscribers (e.g., the web UI) can display live agent activity.Changes
extractClaudeEventText(),extractCodexEventText(), andextractOpenCodeEventText()to parse JSON events from each provideronEventcallback provided, usesrunCommandStreaming()instead of buffering all output. Claude adds--output-format stream-json --verboseflagsprocessMessage()now passes a callback that emitsagent_progressSSE events with agent name, text chunk, and messageIdTesting
onEventis not provided (default case unchanged)🤖 Generated with Claude Code