fix(executor): recover empty codex stream completions#2624
fix(executor): recover empty codex stream completions#2624ckckck wants to merge 1 commit intorouter-for-me:devfrom
Conversation
|
This pull request targeted The base branch has been automatically changed to |
There was a problem hiding this comment.
Code Review
This pull request introduces a mechanism to recover missing output items in the Codex executor's streaming response. It adds codexStreamCompletionState to track events and patch the final response.completed event if the output array is empty. Feedback was provided regarding the logic for assigning indices to fallback items, which could lead to incorrect ordering if existing indices are non-contiguous.
| indexes := make([]int64, 0, len(s.outputItemsByIndex)) | ||
| for idx := range s.outputItemsByIndex { | ||
| indexes = append(indexes, idx) | ||
| } | ||
| sort.Slice(indexes, func(i, j int) bool { return indexes[i] < indexes[j] }) | ||
| for _, idx := range indexes { | ||
| raw := s.outputItemsByIndex[idx] | ||
| recovered = append(recovered, recoveredItem{outputIndex: idx, raw: raw}) | ||
| if callID := strings.TrimSpace(gjson.GetBytes(raw, "call_id").String()); callID != "" { | ||
| seenCallIDs[callID] = struct{}{} | ||
| } | ||
| if itemID := strings.TrimSpace(gjson.GetBytes(raw, "id").String()); itemID != "" { | ||
| seenItemIDs[itemID] = struct{}{} | ||
| } | ||
| } | ||
| for _, raw := range s.outputItemsFallback { | ||
| recovered = append(recovered, recoveredItem{outputIndex: int64(len(indexes) + len(recovered)), raw: raw}) | ||
| if callID := strings.TrimSpace(gjson.GetBytes(raw, "call_id").String()); callID != "" { | ||
| seenCallIDs[callID] = struct{}{} | ||
| } | ||
| if itemID := strings.TrimSpace(gjson.GetBytes(raw, "id").String()); itemID != "" { | ||
| seenItemIDs[itemID] = struct{}{} | ||
| } | ||
| } |
There was a problem hiding this comment.
The logic for calculating the outputIndex for fallback items (line 180) can lead to incorrect reordering of the output array if the existing indices are not contiguous starting from 0. For example, if only index 5 exists, fallback items would be assigned indices starting at 2 (len(indexes)=1 + len(recovered)=1), causing them to be sorted before the item at index 5. To maintain consistency with the non-streaming path and ensure fallback items appear at the end, they should be assigned indices higher than any known index.
indexes := make([]int64, 0, len(s.outputItemsByIndex))
for idx := range s.outputItemsByIndex {
indexes = append(indexes, idx)
}
sort.Slice(indexes, func(i, j int) bool { return indexes[i] < indexes[j] })
var maxIdx int64 = -1
for _, idx := range indexes {
raw := s.outputItemsByIndex[idx]
recovered = append(recovered, recoveredItem{outputIndex: idx, raw: raw})
if idx > maxIdx {
maxIdx = idx
}
if callID := strings.TrimSpace(gjson.GetBytes(raw, "call_id").String()); callID != "" {
seenCallIDs[callID] = struct{}{}
}
if itemID := strings.TrimSpace(gjson.GetBytes(raw, "id").String()); itemID != "" {
seenItemIDs[itemID] = struct{}{}
}
}
for _, state := range s.functionCallsByItem {
if state != nil && state.OutputIndex > maxIdx {
maxIdx = state.OutputIndex
}
}
for i, raw := range s.outputItemsFallback {
recovered = append(recovered, recoveredItem{outputIndex: maxIdx + 1 + int64(i), raw: raw})
if callID := strings.TrimSpace(gjson.GetBytes(raw, "call_id").String()); callID != "" {
seenCallIDs[callID] = struct{}{}
}
if itemID := strings.TrimSpace(gjson.GetBytes(raw, "id").String()); itemID != "" {
seenItemIDs[itemID] = struct{}{}
}
}There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 11643f4490
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| log.Warnf( | ||
| "codex stream completed with empty response.output and no recoverable items; cached_done_items=%d cached_function_calls=%d", |
There was a problem hiding this comment.
Guard empty-output warning with actual empty check
This else branch logs an empty response.output warning whenever recoveredCount == 0, but patchCompletedOutputIfEmpty also returns 0 when the upstream response.completed already contains a non-empty response.output. In normal streams this produces false-positive warnings on successful completions, which can flood logs and hide real recovery cases. Add an explicit empty-output check (or return a second flag from patchCompletedOutputIfEmpty) before emitting this warning.
Useful? React with 👍 / 👎.
luispater
left a comment
There was a problem hiding this comment.
Summary:
This improves Codex streaming robustness by caching response.output_item.done items and function-call argument state, then reconstructing response.output when the terminal response.completed event arrives with response.output: [].
Key findings:
- Recovery is gated to the
response.completedevent and only triggers whenresponse.outputis empty, minimizing impact on normal streams. - Adds regression tests for both message output recovery and function-call recovery in stream mode.
Non-blocking suggestions:
- Consider assigning fallback indexes monotonically after the max observed
output_index(instead oflen(indexes)+len(recovered)) to avoid edge-case reordering if upstream emits sparse/non-zero-based indexes. - The
Warnflogs might be noisy if this happens frequently; consider downgrading/rate-limiting or emitting a metric.
Test plan:
go test ./internal/runtime/executor -run 'TestCodexExecutorExecute(Stream)?_Empty' -count=1
This is an automated Codex review result and still requires manual verification by a human reviewer.
Summary
response.outputwhenresponse.completedarrives with an empty output arrayBackground
Some Codex/OpenAI Responses streams emit
response.output_item.doneor function call argument events, but end withresponse.completed.response.output=[].The non-stream path already had recovery logic for empty completed output, while
ExecuteStreamdid not. That mismatch can leave downstream clients with an empty final response even though the stream already carried recoverable output state.Testing
go test ./internal/runtime/executor -run 'TestCodexExecutorExecute(Stream)?_Empty' -count=1go test ./internal/runtime/executor -count=1