Skip to content

feat(invoke): stream agent execution progress in real-time#196

Merged
jlia0 merged 4 commits intomainfrom
jlia0/stream-agent-output
Mar 18, 2026
Merged

feat(invoke): stream agent execution progress in real-time#196
jlia0 merged 4 commits intomainfrom
jlia0/stream-agent-output

Conversation

@jlia0
Copy link
Collaborator

@jlia0 jlia0 commented Mar 12, 2026

Summary

Add real-time streaming of agent execution progress. The invokeAgent function now accepts an optional onEvent callback that streams extracted text from each CLI event as it arrives, instead of waiting for the full response. The main package emits agent_progress SSE events so subscribers (e.g., the web UI) can display live agent activity.

Changes

  • runCommandStreaming(): New helper that spawns a process and calls a callback for each stdout line as they arrive (properly line-buffered)
  • Event extractors: Added extractClaudeEventText(), extractCodexEventText(), and extractOpenCodeEventText() to parse JSON events from each provider
  • invokeAgent streaming: When onEvent callback provided, uses runCommandStreaming() instead of buffering all output. Claude adds --output-format stream-json --verbose flags
  • Progress events: processMessage() now passes a callback that emits agent_progress SSE events with agent name, text chunk, and messageId

Testing

  • Verified TypeScript compilation passes
  • No behavioral changes when onEvent is not provided (default case unchanged)
  • Streaming properly handles line buffering and final partial lines

🤖 Generated with Claude Code

Add optional streaming mode to invokeAgent that processes CLI output line-by-line and emits intermediate text events as they arrive. When onEvent callback is provided, extracts JSON events from Claude (stream-json format), Codex (JSONL), and OpenCode (JSONL) outputs and streams extracted text in real-time. Main package now emits agent_progress SSE events during execution, allowing UI to display live agent activity.

- New runCommandStreaming() spawns processes and calls callback for each stdout line
- New extractClaudeEventText/extractCodexEventText/extractOpenCodeEventText parse provider events
- invokeAgent accepts optional onEvent callback for streaming mode
- Claude CLI uses --output-format stream-json --verbose when streaming
- processMessage emits agent_progress events during agent execution

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
@greptile-apps
Copy link

greptile-apps bot commented Mar 12, 2026

Greptile Summary

This PR adds real-time streaming of agent execution progress by introducing runCommandStreaming(), per-provider JSON event extractors, and wiring an onEvent callback through invokeAgent(). The main entry point (processMessage) now always passes this callback, so every agent invocation emits live agent_progress SSE events to connected subscribers (e.g., the web UI).

Key changes:

  • runCommandStreaming() — line-buffered stdout processor; mirrors runCommand() but calls onLine per complete line
  • extractClaudeEventText() / extractCodexEventText() / extractOpenCodeEventText() — thin parsers for each provider's JSONL format
  • Claude path now uses --output-format stream-json --verbose whenever streaming is enabled
  • processMessage() unconditionally supplies an onEvent callback, meaning streaming mode is always active for all three providers (not opt-in per call)

Issues found:

  • runCommandStreaming flushes the remaining line buffer unconditionally in the close handler, including on non-zero exit. This can dispatch partial/error output as progress events before the rejection propagates.
  • extractClaudeEventText returns json.result for result-type events without checking json.is_error. If Claude emits a result event with is_error: true (and the process exits cleanly), the error description would be returned as the successful agent response instead of triggering the fallback message.
  • response is set from both intermediate assistant events and the final result event in the Claude streaming path; while the result event is typically last and overwrites correctly, if it is absent (abnormal but zero-exit scenario) an intermediate fragment would silently become the return value.

Confidence Score: 3/5

  • Safe to merge with minor fixes — two logic issues in invoke.ts should be addressed before production use
  • The overall architecture is sound and the non-streaming path is unchanged. However, the unconditional buffer flush on process failure and the missing is_error check on Claude result events are real logic bugs that could surface as misleading UI state or incorrect responses in error scenarios. The streaming-always-on change is intentional but represents a behavioral shift worth acknowledging explicitly.
  • packages/core/src/invoke.ts — specifically the runCommandStreaming close handler and extractClaudeEventText result handling

Important Files Changed

Filename Overview
packages/core/src/invoke.ts Adds runCommandStreaming(), three event-extractor helpers, and wires an optional onEvent callback into all three provider paths (Claude, Codex, OpenCode). Contains a logic issue where the remaining line buffer is flushed unconditionally on process close (even on non-zero exit), and the Claude result extractor does not check is_error, potentially treating error descriptions as valid responses.
packages/main/src/index.ts Small change: processMessage now always passes an onEvent callback to invokeAgent, emitting agent_progress SSE events with agentId, agentName, text chunk, and messageId. Change is clean and consistent with the existing emitEvent pattern.

Sequence Diagram

sequenceDiagram
    participant UI as Web UI (SSE subscriber)
    participant Main as processMessage()
    participant Core as invokeAgent()
    participant Streaming as runCommandStreaming()
    participant CLI as Claude / Codex / OpenCode CLI

    Main->>Core: invokeAgent(..., onEvent)
    Core->>Streaming: runCommandStreaming(cmd, args, onLine)
    Streaming->>CLI: spawn process
    loop stdout data events
        CLI-->>Streaming: stdout chunk
        Streaming->>Streaming: buffer lines
        Streaming->>Core: onLine(completeLine)
        Core->>Core: JSON.parse + extractXxxEventText()
        alt text extracted
            Core->>Main: onEvent(text)
            Main->>UI: emitEvent('agent_progress', {agentId, text, messageId})
        end
    end
    CLI-->>Streaming: close (exit code)
    alt exit code == 0
        Streaming-->>Core: resolve(stdout)
        Core-->>Main: return response
        Main->>UI: emitEvent('chain_step_done', {...})
    else exit code != 0
        Streaming-->>Core: reject(Error)
        Core-->>Main: throw
        Main->>UI: (error logged, fallback response)
    end
Loading

Last reviewed commit: 83ff15c

Comment on lines +97 to +108
child.on('close', (code) => {
// Flush remaining buffer
if (lineBuffer.trim()) onLine(lineBuffer);

if (code === 0) {
resolve(stdout);
return;
}

const errorMessage = stderr.trim() || `Command exited with code ${code}`;
reject(new Error(errorMessage));
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial buffer flushed even on failed exit

onLine(lineBuffer) is called unconditionally before the exit-code check. If the child process exits with a non-zero code the last incomplete line (which may be a partial JSON fragment from an error diagnostic) is still dispatched through onLine. Callers' try/catch around JSON.parse will silently swallow it, but it can still trigger a spurious onEvent / agent_progress SSE event in the UI before the reject path propagates the real error. The flush should only happen on a clean exit:

Suggested change
child.on('close', (code) => {
// Flush remaining buffer
if (lineBuffer.trim()) onLine(lineBuffer);
if (code === 0) {
resolve(stdout);
return;
}
const errorMessage = stderr.trim() || `Command exited with code ${code}`;
reject(new Error(errorMessage));
});
child.on('close', (code) => {
if (code === 0) {
// Flush remaining buffer only on success
if (lineBuffer.trim()) onLine(lineBuffer);
resolve(stdout);
return;
}
const errorMessage = stderr.trim() || `Command exited with code ${code}`;
reject(new Error(errorMessage));
});

Comment on lines +128 to +131
if (json.type === 'result' && json.result) {
return json.result;
}
return null;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error result text returned as successful response

Claude's stream-json result event carries an is_error boolean. When is_error: true, json.result holds an error description string, not the agent's answer. The current code returns that error string as if it were a successful response (and the process may still exit with code 0 in some scenarios). This means callers receive an error message in the normal response field instead of a fallback.

Add an is_error guard:

Suggested change
if (json.type === 'result' && json.result) {
return json.result;
}
return null;
if (json.type === 'result' && json.result && !json.is_error) {
return json.result;
}

Comment on lines +355 to +373
if (onEvent) {
claudeArgs.push('--output-format', 'stream-json', '--verbose');
claudeArgs.push('-p', message);

let response = '';
await runCommandStreaming('claude', claudeArgs, (line) => {
try {
const json = JSON.parse(line);
const text = extractClaudeEventText(json);
if (text) {
response = text;
onEvent(text);
}
} catch (e) {
// Ignore lines that aren't valid JSON
}
}, workingDir, envOverrides);

return response || 'Sorry, I could not generate a response from Claude.';
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

response accumulates intermediate text, not just the final result

extractClaudeEventText returns text for both intermediate assistant events (partial turns, [tool: …] labels) and the final result event. Since each call does response = text (assignment, not append), the final return value should be the result event text in the happy path. However, if the result event is never emitted (e.g. the CLI exits 0 but without a result line), response will silently hold the last intermediate assistant fragment and be returned as the full answer — without the fallback message triggering.

Consider only setting response from result-typed events and treating assistant events as progress-only:

child.stdout: on('data') → onLine
  └─ extractClaudeEventText:
       assistant event  → onEvent(text)  // progress only
       result   event  → response = text + onEvent(text)

The Claude 'result' event duplicates the final assistant message text.
Skip emitting it via onEvent while still capturing it as the return value,
so the final response goes through streamResponse exactly once.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
jlia0 and others added 2 commits March 18, 2026 18:45
…ents

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@jlia0 jlia0 merged commit 993fdce into main Mar 18, 2026
@jlia0 jlia0 deleted the jlia0/stream-agent-output branch March 18, 2026 10:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant