-
Notifications
You must be signed in to change notification settings - Fork 502
feat(invoke): stream agent execution progress in real-time #196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
83ff15c
6d8dfd1
443e987
2a2069e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,9 +47,115 @@ export async function runCommand(command: string, args: string[], cwd?: string, | |
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Spawn a command and process stdout line-by-line as they arrive. | ||
| * Calls `onLine` for each complete line. Returns the full stdout when done. | ||
| */ | ||
| export async function runCommandStreaming( | ||
| command: string, | ||
| args: string[], | ||
| onLine: (line: string) => void, | ||
| cwd?: string, | ||
| envOverrides?: Record<string, string>, | ||
| ): Promise<string> { | ||
| return new Promise((resolve, reject) => { | ||
| const env = { ...process.env, ...envOverrides }; | ||
| delete env.CLAUDECODE; | ||
|
|
||
| const child = spawn(command, args, { | ||
| cwd: cwd || SCRIPT_DIR, | ||
| stdio: ['ignore', 'pipe', 'pipe'], | ||
| env, | ||
| }); | ||
|
|
||
| let stdout = ''; | ||
| let stderr = ''; | ||
| let lineBuffer = ''; | ||
|
|
||
| child.stdout.setEncoding('utf8'); | ||
| child.stderr.setEncoding('utf8'); | ||
|
|
||
| child.stdout.on('data', (chunk: string) => { | ||
| stdout += chunk; | ||
| lineBuffer += chunk; | ||
| const lines = lineBuffer.split('\n'); | ||
| // Keep the last incomplete line in the buffer | ||
| lineBuffer = lines.pop()!; | ||
| for (const line of lines) { | ||
| if (line.trim()) onLine(line); | ||
| } | ||
| }); | ||
|
|
||
| child.stderr.on('data', (chunk: string) => { | ||
| stderr += chunk; | ||
| }); | ||
|
|
||
| child.on('error', (error) => { | ||
| reject(error); | ||
| }); | ||
|
|
||
| child.on('close', (code) => { | ||
| // Flush remaining buffer | ||
| if (lineBuffer.trim()) onLine(lineBuffer); | ||
|
|
||
| if (code === 0) { | ||
| resolve(stdout); | ||
| return; | ||
| } | ||
|
|
||
| const errorMessage = stderr.trim() || `Command exited with code ${code}`; | ||
| reject(new Error(errorMessage)); | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Extract displayable text from a Claude stream-json event. | ||
| * Returns text content from assistant messages and tool use summaries. | ||
| * Skips 'result' events — those duplicate the final assistant message | ||
| * which is already delivered through the normal response pipeline. | ||
| */ | ||
| function extractClaudeEventText(json: any): string | null { | ||
| if (json.type === 'assistant' && json.message?.content) { | ||
| const parts: string[] = []; | ||
| for (const block of json.message.content) { | ||
| if (block.type === 'text' && block.text) { | ||
| parts.push(block.text); | ||
| } else if (block.type === 'tool_use' && block.name) { | ||
| parts.push(`[tool: ${block.name}]`); | ||
| } | ||
| } | ||
| return parts.length > 0 ? parts.join('\n') : null; | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| /** | ||
| * Extract displayable text from a Codex JSONL event. | ||
| */ | ||
| function extractCodexEventText(json: any): string | null { | ||
| if (json.type === 'item.completed' && json.item?.type === 'agent_message') { | ||
| return json.item.text || null; | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| /** | ||
| * Extract displayable text from an OpenCode JSONL event. | ||
| */ | ||
| function extractOpenCodeEventText(json: any): string | null { | ||
| if (json.type === 'text' && json.part?.text) { | ||
| return json.part.text; | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| /** | ||
| * Invoke a single agent with a message. Contains all Claude/Codex invocation logic. | ||
| * Returns the raw response text. | ||
| * | ||
| * When `onEvent` is provided, streams intermediate text events as they arrive | ||
| * from the CLI subprocess (verbose/streaming mode). | ||
| */ | ||
| export async function invokeAgent( | ||
| agent: AgentConfig, | ||
|
|
@@ -58,7 +164,8 @@ export async function invokeAgent( | |
| workspacePath: string, | ||
| shouldReset: boolean, | ||
| agents: Record<string, AgentConfig> = {}, | ||
| teams: Record<string, TeamConfig> = {} | ||
| teams: Record<string, TeamConfig> = {}, | ||
| onEvent?: (text: string) => void, | ||
| ): Promise<string> { | ||
| // Ensure agent directory exists with config files | ||
| const agentDir = path.join(workspacePath, agentId); | ||
|
|
@@ -140,19 +247,33 @@ export async function invokeAgent( | |
| } | ||
| codexArgs.push('--skip-git-repo-check', '--dangerously-bypass-approvals-and-sandbox', '--json', message); | ||
|
|
||
| const codexOutput = await runCommand('codex', codexArgs, workingDir, envOverrides); | ||
|
|
||
| // Parse JSONL output and extract final agent_message | ||
| let response = ''; | ||
| const lines = codexOutput.trim().split('\n'); | ||
| for (const line of lines) { | ||
| try { | ||
| const json = JSON.parse(line); | ||
| if (json.type === 'item.completed' && json.item?.type === 'agent_message') { | ||
| response = json.item.text; | ||
|
|
||
| if (onEvent) { | ||
| await runCommandStreaming('codex', codexArgs, (line) => { | ||
| try { | ||
| const json = JSON.parse(line); | ||
| const text = extractCodexEventText(json); | ||
| if (text) { | ||
| response = text; | ||
| onEvent(text); | ||
| } | ||
| } catch (e) { | ||
| // Ignore lines that aren't valid JSON | ||
| } | ||
| }, workingDir, envOverrides); | ||
| } else { | ||
| const codexOutput = await runCommand('codex', codexArgs, workingDir, envOverrides); | ||
| const lines = codexOutput.trim().split('\n'); | ||
| for (const line of lines) { | ||
| try { | ||
| const json = JSON.parse(line); | ||
| if (json.type === 'item.completed' && json.item?.type === 'agent_message') { | ||
| response = json.item.text; | ||
| } | ||
| } catch (e) { | ||
| // Ignore lines that aren't valid JSON | ||
| } | ||
| } catch (e) { | ||
| // Ignore lines that aren't valid JSON | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -180,19 +301,33 @@ export async function invokeAgent( | |
| } | ||
| opencodeArgs.push(message); | ||
|
|
||
| const opencodeOutput = await runCommand('opencode', opencodeArgs, workingDir, envOverrides); | ||
|
|
||
| // Parse JSONL output and collect all text parts | ||
| let response = ''; | ||
| const lines = opencodeOutput.trim().split('\n'); | ||
| for (const line of lines) { | ||
| try { | ||
| const json = JSON.parse(line); | ||
| if (json.type === 'text' && json.part?.text) { | ||
| response = json.part.text; | ||
|
|
||
| if (onEvent) { | ||
| await runCommandStreaming('opencode', opencodeArgs, (line) => { | ||
| try { | ||
| const json = JSON.parse(line); | ||
| const text = extractOpenCodeEventText(json); | ||
| if (text) { | ||
| response = text; | ||
| onEvent(text); | ||
| } | ||
| } catch (e) { | ||
| // Ignore lines that aren't valid JSON | ||
| } | ||
| }, workingDir, envOverrides); | ||
| } else { | ||
| const opencodeOutput = await runCommand('opencode', opencodeArgs, workingDir, envOverrides); | ||
| const lines = opencodeOutput.trim().split('\n'); | ||
| for (const line of lines) { | ||
| try { | ||
| const json = JSON.parse(line); | ||
| if (json.type === 'text' && json.part?.text) { | ||
| response = json.part.text; | ||
| } | ||
| } catch (e) { | ||
| // Ignore lines that aren't valid JSON | ||
| } | ||
| } catch (e) { | ||
| // Ignore lines that aren't valid JSON | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -215,8 +350,37 @@ export async function invokeAgent( | |
| if (continueConversation) { | ||
| claudeArgs.push('-c'); | ||
| } | ||
| claudeArgs.push('-p', message); | ||
|
|
||
| if (onEvent) { | ||
| claudeArgs.push('--output-format', 'stream-json', '--verbose'); | ||
| claudeArgs.push('-p', message); | ||
|
|
||
| let response = ''; | ||
| await runCommandStreaming('claude', claudeArgs, (line) => { | ||
| try { | ||
| const json = JSON.parse(line); | ||
| // Use result event for the return value (not emitted as progress) | ||
| if (json.type === 'result') { | ||
| if (json.result) response = json.result; | ||
| // Log raw usage stats from the result event | ||
| if (json.usage) log('INFO', `Claude usage (${agentId}): ${JSON.stringify(json.usage)}`); | ||
| if (json.modelUsage) log('INFO', `Claude model usage (${agentId}): ${JSON.stringify(json.modelUsage)}`); | ||
| return; | ||
| } | ||
| const text = extractClaudeEventText(json); | ||
| if (text) { | ||
| response = text; | ||
| onEvent(text); | ||
| } | ||
| } catch (e) { | ||
| // Ignore lines that aren't valid JSON | ||
| } | ||
| }, workingDir, envOverrides); | ||
|
|
||
| return response || 'Sorry, I could not generate a response from Claude.'; | ||
|
Comment on lines
+354
to
+380
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Consider only setting |
||
| } | ||
|
|
||
| claudeArgs.push('-p', message); | ||
| return await runCommand('claude', claudeArgs, workingDir, envOverrides); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial buffer flushed even on failed exit
onLine(lineBuffer)is called unconditionally before the exit-code check. If the child process exits with a non-zero code the last incomplete line (which may be a partial JSON fragment from an error diagnostic) is still dispatched throughonLine. Callers'try/catcharoundJSON.parsewill silently swallow it, but it can still trigger a spuriousonEvent/agent_progressSSE event in the UI before therejectpath propagates the real error. The flush should only happen on a clean exit: