From 7502e62650de1a6ee6a20b0038af885baafd51d8 Mon Sep 17 00:00:00 2001 From: Amirhossein Ghanipour Date: Thu, 12 Feb 2026 04:10:18 +0000 Subject: [PATCH] prevent IndexError on out-of-order stream events --- .../lib/streaming/responses/_responses.py | 35 +++++++++++--- tests/lib/streaming/__init__.py | 0 tests/lib/streaming/responses/__init__.py | 0 .../responses/test_responses_stream.py | 47 +++++++++++++++++++ 4 files changed, 75 insertions(+), 7 deletions(-) create mode 100644 tests/lib/streaming/__init__.py create mode 100644 tests/lib/streaming/responses/__init__.py create mode 100644 tests/lib/streaming/responses/test_responses_stream.py diff --git a/src/openai/lib/streaming/responses/_responses.py b/src/openai/lib/streaming/responses/_responses.py index 6975a9260d..30ab442538 100644 --- a/src/openai/lib/streaming/responses/_responses.py +++ b/src/openai/lib/streaming/responses/_responses.py @@ -250,11 +250,16 @@ def handle_event(self, event: RawResponseStreamEvent) -> List[ResponseStreamEven events: List[ResponseStreamEvent[TextFormatT]] = [] if event.type == "response.output_text.delta": + if event.output_index >= len(snapshot.output): + return events output = snapshot.output[event.output_index] - assert output.type == "message" - + if output.type != "message": + return events + if event.content_index >= len(output.content): + return events content = output.content[event.content_index] - assert content.type == "output_text" + if content.type != "output_text": + return events events.append( build( @@ -270,11 +275,16 @@ def handle_event(self, event: RawResponseStreamEvent) -> List[ResponseStreamEven ) ) elif event.type == "response.output_text.done": + if event.output_index >= len(snapshot.output): + return events output = snapshot.output[event.output_index] - assert output.type == "message" - + if output.type != "message": + return events + if event.content_index >= len(output.content): + return events content = output.content[event.content_index] - assert content.type == "output_text" + if content.type != "output_text": + return events events.append( build( @@ -290,8 +300,11 @@ def handle_event(self, event: RawResponseStreamEvent) -> List[ResponseStreamEven ) ) elif event.type == "response.function_call_arguments.delta": + if event.output_index >= len(snapshot.output): + return events output = snapshot.output[event.output_index] - assert output.type == "function_call" + if output.type != "function_call": + return events events.append( build( @@ -341,18 +354,26 @@ def accumulate_event(self, event: RawResponseStreamEvent) -> ParsedResponseSnaps else: snapshot.output.append(event.item) elif event.type == "response.content_part.added": + if event.output_index >= len(snapshot.output): + return snapshot output = snapshot.output[event.output_index] if output.type == "message": output.content.append( construct_type_unchecked(type_=cast(Any, ParsedContent), value=event.part.to_dict()) ) elif event.type == "response.output_text.delta": + if event.output_index >= len(snapshot.output): + return snapshot output = snapshot.output[event.output_index] if output.type == "message": + if event.content_index >= len(output.content): + return snapshot content = output.content[event.content_index] assert content.type == "output_text" content.text += event.delta elif event.type == "response.function_call_arguments.delta": + if event.output_index >= len(snapshot.output): + return snapshot output = snapshot.output[event.output_index] if output.type == "function_call": output.arguments += event.delta diff --git a/tests/lib/streaming/__init__.py b/tests/lib/streaming/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/lib/streaming/responses/__init__.py b/tests/lib/streaming/responses/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/lib/streaming/responses/test_responses_stream.py b/tests/lib/streaming/responses/test_responses_stream.py new file mode 100644 index 0000000000..d0182d4e41 --- /dev/null +++ b/tests/lib/streaming/responses/test_responses_stream.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from openai._types import omit +from openai.lib.streaming.responses._responses import ResponseStreamState +from openai.types.responses import Response +from openai.types.responses.response_created_event import ResponseCreatedEvent +from openai.types.responses.response_text_delta_event import ResponseTextDeltaEvent + + +def _minimal_response_created_event() -> ResponseCreatedEvent: + response = Response.model_construct( + id="resp-test", + created_at=0.0, + model="gpt-4o", + object="response", + output=[], + parallel_tool_calls=False, + tool_choice="auto", + tools=[], + ) + return ResponseCreatedEvent( + response=response, + sequence_number=0, + type="response.created", + ) + + +def _delta_event_before_output_item_added() -> ResponseTextDeltaEvent: + return ResponseTextDeltaEvent( + content_index=0, + delta="x", + item_id="item-1", + logprobs=[], + output_index=0, + sequence_number=1, + type="response.output_text.delta", + ) + + +def test_responses_stream_accumulate_handles_out_of_range_output_index() -> None: + state = ResponseStreamState(input_tools=omit, text_format=omit) + + state.handle_event(_minimal_response_created_event()) + + events = state.handle_event(_delta_event_before_output_item_added()) + + assert isinstance(events, list)