Skip to content

fix(executor): recover empty codex stream completions#2624

Open
ckckck wants to merge 1 commit intorouter-for-me:devfrom
ckckck:fix/codex-stream-empty-output-recovery
Open

fix(executor): recover empty codex stream completions#2624
ckckck wants to merge 1 commit intorouter-for-me:devfrom
ckckck:fix/codex-stream-empty-output-recovery

Conversation

@ckckck
Copy link
Copy Markdown

@ckckck ckckck commented Apr 9, 2026

Summary

  • cache Codex stream output items and function call state while consuming SSE events
  • recover response.output when response.completed arrives with an empty output array
  • add regression coverage for message output recovery and function call recovery in stream mode

Background

Some Codex/OpenAI Responses streams emit response.output_item.done or function call argument events, but end with response.completed.response.output=[].

The non-stream path already had recovery logic for empty completed output, while ExecuteStream did not. That mismatch can leave downstream clients with an empty final response even though the stream already carried recoverable output state.

Testing

  • go test ./internal/runtime/executor -run 'TestCodexExecutorExecute(Stream)?_Empty' -count=1
  • go test ./internal/runtime/executor -count=1

@github-actions github-actions bot changed the base branch from main to dev April 9, 2026 02:24
@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 9, 2026

This pull request targeted main.

The base branch has been automatically changed to dev.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a mechanism to recover missing output items in the Codex executor's streaming response. It adds codexStreamCompletionState to track events and patch the final response.completed event if the output array is empty. Feedback was provided regarding the logic for assigning indices to fallback items, which could lead to incorrect ordering if existing indices are non-contiguous.

Comment on lines +164 to +187
indexes := make([]int64, 0, len(s.outputItemsByIndex))
for idx := range s.outputItemsByIndex {
indexes = append(indexes, idx)
}
sort.Slice(indexes, func(i, j int) bool { return indexes[i] < indexes[j] })
for _, idx := range indexes {
raw := s.outputItemsByIndex[idx]
recovered = append(recovered, recoveredItem{outputIndex: idx, raw: raw})
if callID := strings.TrimSpace(gjson.GetBytes(raw, "call_id").String()); callID != "" {
seenCallIDs[callID] = struct{}{}
}
if itemID := strings.TrimSpace(gjson.GetBytes(raw, "id").String()); itemID != "" {
seenItemIDs[itemID] = struct{}{}
}
}
for _, raw := range s.outputItemsFallback {
recovered = append(recovered, recoveredItem{outputIndex: int64(len(indexes) + len(recovered)), raw: raw})
if callID := strings.TrimSpace(gjson.GetBytes(raw, "call_id").String()); callID != "" {
seenCallIDs[callID] = struct{}{}
}
if itemID := strings.TrimSpace(gjson.GetBytes(raw, "id").String()); itemID != "" {
seenItemIDs[itemID] = struct{}{}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for calculating the outputIndex for fallback items (line 180) can lead to incorrect reordering of the output array if the existing indices are not contiguous starting from 0. For example, if only index 5 exists, fallback items would be assigned indices starting at 2 (len(indexes)=1 + len(recovered)=1), causing them to be sorted before the item at index 5. To maintain consistency with the non-streaming path and ensure fallback items appear at the end, they should be assigned indices higher than any known index.

	indexes := make([]int64, 0, len(s.outputItemsByIndex))
	for idx := range s.outputItemsByIndex {
		indexes = append(indexes, idx)
	}
	sort.Slice(indexes, func(i, j int) bool { return indexes[i] < indexes[j] })

	var maxIdx int64 = -1
	for _, idx := range indexes {
		raw := s.outputItemsByIndex[idx]
		recovered = append(recovered, recoveredItem{outputIndex: idx, raw: raw})
		if idx > maxIdx {
			maxIdx = idx
		}
		if callID := strings.TrimSpace(gjson.GetBytes(raw, "call_id").String()); callID != "" {
			seenCallIDs[callID] = struct{}{}
		}
		if itemID := strings.TrimSpace(gjson.GetBytes(raw, "id").String()); itemID != "" {
			seenItemIDs[itemID] = struct{}{}
		}
	}
	for _, state := range s.functionCallsByItem {
		if state != nil && state.OutputIndex > maxIdx {
			maxIdx = state.OutputIndex
		}
	}

	for i, raw := range s.outputItemsFallback {
		recovered = append(recovered, recoveredItem{outputIndex: maxIdx + 1 + int64(i), raw: raw})
		if callID := strings.TrimSpace(gjson.GetBytes(raw, "call_id").String()); callID != "" {
			seenCallIDs[callID] = struct{}{}
		}
		if itemID := strings.TrimSpace(gjson.GetBytes(raw, "id").String()); itemID != "" {
			seenItemIDs[itemID] = struct{}{}
		}
	}

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 11643f4490

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +652 to +653
log.Warnf(
"codex stream completed with empty response.output and no recoverable items; cached_done_items=%d cached_function_calls=%d",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Guard empty-output warning with actual empty check

This else branch logs an empty response.output warning whenever recoveredCount == 0, but patchCompletedOutputIfEmpty also returns 0 when the upstream response.completed already contains a non-empty response.output. In normal streams this produces false-positive warnings on successful completions, which can flood logs and hide real recovery cases. Add an explicit empty-output check (or return a second flag from patchCompletedOutputIfEmpty) before emitting this warning.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator

@luispater luispater left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary:
This improves Codex streaming robustness by caching response.output_item.done items and function-call argument state, then reconstructing response.output when the terminal response.completed event arrives with response.output: [].

Key findings:

  • Recovery is gated to the response.completed event and only triggers when response.output is empty, minimizing impact on normal streams.
  • Adds regression tests for both message output recovery and function-call recovery in stream mode.

Non-blocking suggestions:

  • Consider assigning fallback indexes monotonically after the max observed output_index (instead of len(indexes)+len(recovered)) to avoid edge-case reordering if upstream emits sparse/non-zero-based indexes.
  • The Warnf logs might be noisy if this happens frequently; consider downgrading/rate-limiting or emitting a metric.

Test plan:

  • go test ./internal/runtime/executor -run 'TestCodexExecutorExecute(Stream)?_Empty' -count=1

This is an automated Codex review result and still requires manual verification by a human reviewer.

@luispater luispater added the codex label Apr 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants