Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 188 additions & 24 deletions packages/core/src/invoke.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,115 @@ export async function runCommand(command: string, args: string[], cwd?: string,
});
}

/**
* Spawn a command and process stdout line-by-line as they arrive.
* Calls `onLine` for each complete line. Returns the full stdout when done.
*/
export async function runCommandStreaming(
command: string,
args: string[],
onLine: (line: string) => void,
cwd?: string,
envOverrides?: Record<string, string>,
): Promise<string> {
return new Promise((resolve, reject) => {
const env = { ...process.env, ...envOverrides };
delete env.CLAUDECODE;

const child = spawn(command, args, {
cwd: cwd || SCRIPT_DIR,
stdio: ['ignore', 'pipe', 'pipe'],
env,
});

let stdout = '';
let stderr = '';
let lineBuffer = '';

child.stdout.setEncoding('utf8');
child.stderr.setEncoding('utf8');

child.stdout.on('data', (chunk: string) => {
stdout += chunk;
lineBuffer += chunk;
const lines = lineBuffer.split('\n');
// Keep the last incomplete line in the buffer
lineBuffer = lines.pop()!;
for (const line of lines) {
if (line.trim()) onLine(line);
}
});

child.stderr.on('data', (chunk: string) => {
stderr += chunk;
});

child.on('error', (error) => {
reject(error);
});

child.on('close', (code) => {
// Flush remaining buffer
if (lineBuffer.trim()) onLine(lineBuffer);

if (code === 0) {
resolve(stdout);
return;
}

const errorMessage = stderr.trim() || `Command exited with code ${code}`;
reject(new Error(errorMessage));
});
Comment on lines +97 to +108
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial buffer flushed even on failed exit

onLine(lineBuffer) is called unconditionally before the exit-code check. If the child process exits with a non-zero code the last incomplete line (which may be a partial JSON fragment from an error diagnostic) is still dispatched through onLine. Callers' try/catch around JSON.parse will silently swallow it, but it can still trigger a spurious onEvent / agent_progress SSE event in the UI before the reject path propagates the real error. The flush should only happen on a clean exit:

Suggested change
child.on('close', (code) => {
// Flush remaining buffer
if (lineBuffer.trim()) onLine(lineBuffer);
if (code === 0) {
resolve(stdout);
return;
}
const errorMessage = stderr.trim() || `Command exited with code ${code}`;
reject(new Error(errorMessage));
});
child.on('close', (code) => {
if (code === 0) {
// Flush remaining buffer only on success
if (lineBuffer.trim()) onLine(lineBuffer);
resolve(stdout);
return;
}
const errorMessage = stderr.trim() || `Command exited with code ${code}`;
reject(new Error(errorMessage));
});

});
}

/**
* Extract displayable text from a Claude stream-json event.
* Returns text content from assistant messages and tool use summaries.
* Skips 'result' events — those duplicate the final assistant message
* which is already delivered through the normal response pipeline.
*/
function extractClaudeEventText(json: any): string | null {
if (json.type === 'assistant' && json.message?.content) {
const parts: string[] = [];
for (const block of json.message.content) {
if (block.type === 'text' && block.text) {
parts.push(block.text);
} else if (block.type === 'tool_use' && block.name) {
parts.push(`[tool: ${block.name}]`);
}
}
return parts.length > 0 ? parts.join('\n') : null;
}
return null;
}

/**
* Extract displayable text from a Codex JSONL event.
*/
function extractCodexEventText(json: any): string | null {
if (json.type === 'item.completed' && json.item?.type === 'agent_message') {
return json.item.text || null;
}
return null;
}

/**
* Extract displayable text from an OpenCode JSONL event.
*/
function extractOpenCodeEventText(json: any): string | null {
if (json.type === 'text' && json.part?.text) {
return json.part.text;
}
return null;
}

/**
* Invoke a single agent with a message. Contains all Claude/Codex invocation logic.
* Returns the raw response text.
*
* When `onEvent` is provided, streams intermediate text events as they arrive
* from the CLI subprocess (verbose/streaming mode).
*/
export async function invokeAgent(
agent: AgentConfig,
Expand All @@ -58,7 +164,8 @@ export async function invokeAgent(
workspacePath: string,
shouldReset: boolean,
agents: Record<string, AgentConfig> = {},
teams: Record<string, TeamConfig> = {}
teams: Record<string, TeamConfig> = {},
onEvent?: (text: string) => void,
): Promise<string> {
// Ensure agent directory exists with config files
const agentDir = path.join(workspacePath, agentId);
Expand Down Expand Up @@ -140,19 +247,33 @@ export async function invokeAgent(
}
codexArgs.push('--skip-git-repo-check', '--dangerously-bypass-approvals-and-sandbox', '--json', message);

const codexOutput = await runCommand('codex', codexArgs, workingDir, envOverrides);

// Parse JSONL output and extract final agent_message
let response = '';
const lines = codexOutput.trim().split('\n');
for (const line of lines) {
try {
const json = JSON.parse(line);
if (json.type === 'item.completed' && json.item?.type === 'agent_message') {
response = json.item.text;

if (onEvent) {
await runCommandStreaming('codex', codexArgs, (line) => {
try {
const json = JSON.parse(line);
const text = extractCodexEventText(json);
if (text) {
response = text;
onEvent(text);
}
} catch (e) {
// Ignore lines that aren't valid JSON
}
}, workingDir, envOverrides);
} else {
const codexOutput = await runCommand('codex', codexArgs, workingDir, envOverrides);
const lines = codexOutput.trim().split('\n');
for (const line of lines) {
try {
const json = JSON.parse(line);
if (json.type === 'item.completed' && json.item?.type === 'agent_message') {
response = json.item.text;
}
} catch (e) {
// Ignore lines that aren't valid JSON
}
} catch (e) {
// Ignore lines that aren't valid JSON
}
}

Expand Down Expand Up @@ -180,19 +301,33 @@ export async function invokeAgent(
}
opencodeArgs.push(message);

const opencodeOutput = await runCommand('opencode', opencodeArgs, workingDir, envOverrides);

// Parse JSONL output and collect all text parts
let response = '';
const lines = opencodeOutput.trim().split('\n');
for (const line of lines) {
try {
const json = JSON.parse(line);
if (json.type === 'text' && json.part?.text) {
response = json.part.text;

if (onEvent) {
await runCommandStreaming('opencode', opencodeArgs, (line) => {
try {
const json = JSON.parse(line);
const text = extractOpenCodeEventText(json);
if (text) {
response = text;
onEvent(text);
}
} catch (e) {
// Ignore lines that aren't valid JSON
}
}, workingDir, envOverrides);
} else {
const opencodeOutput = await runCommand('opencode', opencodeArgs, workingDir, envOverrides);
const lines = opencodeOutput.trim().split('\n');
for (const line of lines) {
try {
const json = JSON.parse(line);
if (json.type === 'text' && json.part?.text) {
response = json.part.text;
}
} catch (e) {
// Ignore lines that aren't valid JSON
}
} catch (e) {
// Ignore lines that aren't valid JSON
}
}

Expand All @@ -215,8 +350,37 @@ export async function invokeAgent(
if (continueConversation) {
claudeArgs.push('-c');
}
claudeArgs.push('-p', message);

if (onEvent) {
claudeArgs.push('--output-format', 'stream-json', '--verbose');
claudeArgs.push('-p', message);

let response = '';
await runCommandStreaming('claude', claudeArgs, (line) => {
try {
const json = JSON.parse(line);
// Use result event for the return value (not emitted as progress)
if (json.type === 'result') {
if (json.result) response = json.result;
// Log raw usage stats from the result event
if (json.usage) log('INFO', `Claude usage (${agentId}): ${JSON.stringify(json.usage)}`);
if (json.modelUsage) log('INFO', `Claude model usage (${agentId}): ${JSON.stringify(json.modelUsage)}`);
return;
}
const text = extractClaudeEventText(json);
if (text) {
response = text;
onEvent(text);
}
} catch (e) {
// Ignore lines that aren't valid JSON
}
}, workingDir, envOverrides);

return response || 'Sorry, I could not generate a response from Claude.';
Comment on lines +354 to +380
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

response accumulates intermediate text, not just the final result

extractClaudeEventText returns text for both intermediate assistant events (partial turns, [tool: …] labels) and the final result event. Since each call does response = text (assignment, not append), the final return value should be the result event text in the happy path. However, if the result event is never emitted (e.g. the CLI exits 0 but without a result line), response will silently hold the last intermediate assistant fragment and be returned as the full answer — without the fallback message triggering.

Consider only setting response from result-typed events and treating assistant events as progress-only:

child.stdout: on('data') → onLine
  └─ extractClaudeEventText:
       assistant event  → onEvent(text)  // progress only
       result   event  → response = text + onEvent(text)

}

claudeArgs.push('-p', message);
return await runCommand('claude', claudeArgs, workingDir, envOverrides);
}
}
5 changes: 4 additions & 1 deletion packages/main/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ async function processMessage(dbMsg: any): Promise<void> {
emitEvent('chain_step_start', { agentId, agentName: agent.name, fromAgent: data.fromAgent || null });
let response: string;
try {
response = await invokeAgent(agent, agentId, message, workspacePath, shouldReset, agents, teams);
response = await invokeAgent(agent, agentId, message, workspacePath, shouldReset, agents, teams, (text) => {
log('INFO', `Agent ${agentId}: ${text}`);
emitEvent('agent_progress', { agentId, agentName: agent.name, text, messageId });
});
} catch (error) {
const provider = agent.provider || 'anthropic';
const providerLabel = provider === 'openai' ? 'Codex' : provider === 'opencode' ? 'OpenCode' : 'Claude';
Expand Down