Add WebSocket transport for OpenAI Responses API streaming#2186
Add WebSocket transport for OpenAI Responses API streaming#2186dgageot merged 5 commits intodocker:mainfrom
Conversation
- Add Client.Close() to release pooled WebSocket connections
- Invalidate broken connections in pooledStream.Close() instead of
returning dead sockets to the pool
- Preserve lastResponseID across reconnections (expired + broken) so
server-side context caching survives connection resets
- Add wsMaxReconnectAttempts constant with bounded retry loop to prevent
unbounded reconnection attempts
- Replace os.Getenv("OPENAI_API_KEY") with c.Env.Get() for consistent
secret resolution via the environment provider
- Treat websocket.CloseNoStatusReceived as a normal close condition
Assisted-By: docker-agent
|
/review |
There was a problem hiding this comment.
Assessment: 🟢 APPROVE
This PR introduces WebSocket transport for the OpenAI Responses API with solid implementation and comprehensive tests. The code is well-structured with proper connection pooling, error handling, and automatic SSE fallback.
One minor issue was found: the lastResponseID field is tracked but never used for server-side context caching as intended by its documentation.
| // this pool. It can be passed as previous_response_id in subsequent | ||
| // requests to enable server-side context caching. | ||
| // It lives on the pool (not wsConnection) so it survives reconnections. | ||
| lastResponseID string |
There was a problem hiding this comment.
ISSUE: Incomplete Feature Implementation
The lastResponseID field is tracked but never used for server-side context caching.
Details:
The field is declared with documentation stating "It can be passed as previous_response_id in subsequent requests to enable server-side context caching" (line 42-43), and it's correctly updated when terminal events are received (line 184 in pooledStream.Next()). However, there's no code that reads lastResponseID and passes it as previous_response_id in the request params.
Impact:
The server-side context caching feature mentioned in the documentation is non-functional. Subsequent requests on the same pooled connection won't benefit from cached context.
Recommendation:
Either:
- Implement the feature by reading
p.lastResponseIDinwsPool.Stream()and injecting it into the params before sending the request - Remove the field and its documentation if context caching is not needed yet
Introduce an optional WebSocket transport as an alternative to SSE for
the OpenAI Responses API. Users can enable it via provider_opts:
provider_opts:
transport: websocket
Key changes:
- Add responseEventStream interface to abstract SSE and WebSocket transports
- Refactor ResponseStreamAdapter to accept any responseEventStream
- Implement wsStream (WebSocket transport) and wsPool (connection pool
with 55-min TTL, auto-reconnect, and lastResponseID tracking)
- Integrate WebSocket path in CreateResponseStream with automatic SSE
fallback on connection failure
- No new dependencies (reuses existing gorilla/websocket)
The existing ResponseStreamAdapter.Recv() logic is fully reused since
WebSocket events use the same JSON schema as SSE events.
Assisted-By: docker-agent
- Add Client.Close() to release pooled WebSocket connections
- Invalidate broken connections in pooledStream.Close() instead of
returning dead sockets to the pool
- Preserve lastResponseID across reconnections (expired + broken) so
server-side context caching survives connection resets
- Add wsMaxReconnectAttempts constant with bounded retry loop to prevent
unbounded reconnection attempts
- Replace os.Getenv("OPENAI_API_KEY") with c.Env.Get() for consistent
secret resolution via the environment provider
- Treat websocket.CloseNoStatusReceived as a normal close condition
Assisted-By: docker-agent
- Promote lastResponseID from wsConnection to wsPool so it naturally survives all connection transitions without manual threading - Extract closeLocked(), dialLocked(), invalidateConn() helpers to eliminate duplicated connection lifecycle logic in Stream() - Replace loop-of-one reconnect with a single dialLocked() call - Extract sendResponseCreate() to deduplicate marshal+send between dialWebSocket() and sendOnExisting() - Remove wsMaxReconnectAttempts constant (was always 1) - Simplify wsConnection struct to just conn + createdAt Net result: -18 lines, fewer code paths, same behavior. Assisted-By: docker-agent
- Initialize wsPool eagerly in NewClient instead of lazily in createWebSocketStream to eliminate a potential data race when concurrent goroutines both see wsPool==nil - Downgrade WebSocket→SSE fallback log from Error to Warn since this is an intentional graceful degradation, not an unexpected error - Close HTTP response body defensively in dialWebSocket on handshake failure to prevent a potential resource leak Assisted-By: docker-agent
The wsPool already tracked lastResponseID from completed responses but never forwarded it to subsequent requests. Now, wsPool.Stream() injects it as previous_response_id when the caller hasn't already set one, enabling server-side context caching across multi-turn exchanges. Add tests covering automatic injection, caller override preservation, and survival across reconnections. Assisted-By: docker-agent
Introduce an optional WebSocket transport as an alternative to SSE for the OpenAI Responses API. Users can enable it via provider_opts:
Key changes:
The existing ResponseStreamAdapter.Recv() logic is fully reused since WebSocket events use the same JSON schema as SSE events.
Assisted-By: docker-agent