Skip to content

Add WebSocket transport for OpenAI Responses API streaming#2186

Merged
dgageot merged 5 commits intodocker:mainfrom
dgageot:ws
Mar 20, 2026
Merged

Add WebSocket transport for OpenAI Responses API streaming#2186
dgageot merged 5 commits intodocker:mainfrom
dgageot:ws

Conversation

@dgageot
Copy link
Member

@dgageot dgageot commented Mar 19, 2026

Introduce an optional WebSocket transport as an alternative to SSE for the OpenAI Responses API. Users can enable it via provider_opts:

models:
  gpt-ws:
    provider: openai
    model: gpt-4.1
    provider_opts:
      transport: websocket  # Use WebSocket instead of SSE

Key changes:

  • Add responseEventStream interface to abstract SSE and WebSocket transports
  • Refactor ResponseStreamAdapter to accept any responseEventStream
  • Implement wsStream (WebSocket transport) and wsPool (connection pool with 55-min TTL, auto-reconnect, and lastResponseID tracking)
  • Integrate WebSocket path in CreateResponseStream with automatic SSE fallback on connection failure
  • No new dependencies (reuses existing gorilla/websocket)

The existing ResponseStreamAdapter.Recv() logic is fully reused since WebSocket events use the same JSON schema as SSE events.

Assisted-By: docker-agent

@dgageot dgageot requested a review from a team as a code owner March 19, 2026 20:09
docker-agent[bot]

This comment was marked as outdated.

dgageot added a commit to dgageot/cagent that referenced this pull request Mar 19, 2026
- Add Client.Close() to release pooled WebSocket connections
- Invalidate broken connections in pooledStream.Close() instead of
  returning dead sockets to the pool
- Preserve lastResponseID across reconnections (expired + broken) so
  server-side context caching survives connection resets
- Add wsMaxReconnectAttempts constant with bounded retry loop to prevent
  unbounded reconnection attempts
- Replace os.Getenv("OPENAI_API_KEY") with c.Env.Get() for consistent
  secret resolution via the environment provider
- Treat websocket.CloseNoStatusReceived as a normal close condition

Assisted-By: docker-agent
@dgageot
Copy link
Member Author

dgageot commented Mar 20, 2026

/review

Copy link

@docker-agent docker-agent bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assessment: 🟢 APPROVE

This PR introduces WebSocket transport for the OpenAI Responses API with solid implementation and comprehensive tests. The code is well-structured with proper connection pooling, error handling, and automatic SSE fallback.

One minor issue was found: the lastResponseID field is tracked but never used for server-side context caching as intended by its documentation.

// this pool. It can be passed as previous_response_id in subsequent
// requests to enable server-side context caching.
// It lives on the pool (not wsConnection) so it survives reconnections.
lastResponseID string
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ISSUE: Incomplete Feature Implementation

The lastResponseID field is tracked but never used for server-side context caching.

Details:
The field is declared with documentation stating "It can be passed as previous_response_id in subsequent requests to enable server-side context caching" (line 42-43), and it's correctly updated when terminal events are received (line 184 in pooledStream.Next()). However, there's no code that reads lastResponseID and passes it as previous_response_id in the request params.

Impact:
The server-side context caching feature mentioned in the documentation is non-functional. Subsequent requests on the same pooled connection won't benefit from cached context.

Recommendation:
Either:

  1. Implement the feature by reading p.lastResponseID in wsPool.Stream() and injecting it into the params before sending the request
  2. Remove the field and its documentation if context caching is not needed yet

dgageot added 5 commits March 20, 2026 12:34
Introduce an optional WebSocket transport as an alternative to SSE for
the OpenAI Responses API. Users can enable it via provider_opts:

  provider_opts:
    transport: websocket

Key changes:
- Add responseEventStream interface to abstract SSE and WebSocket transports
- Refactor ResponseStreamAdapter to accept any responseEventStream
- Implement wsStream (WebSocket transport) and wsPool (connection pool
  with 55-min TTL, auto-reconnect, and lastResponseID tracking)
- Integrate WebSocket path in CreateResponseStream with automatic SSE
  fallback on connection failure
- No new dependencies (reuses existing gorilla/websocket)

The existing ResponseStreamAdapter.Recv() logic is fully reused since
WebSocket events use the same JSON schema as SSE events.

Assisted-By: docker-agent
- Add Client.Close() to release pooled WebSocket connections
- Invalidate broken connections in pooledStream.Close() instead of
  returning dead sockets to the pool
- Preserve lastResponseID across reconnections (expired + broken) so
  server-side context caching survives connection resets
- Add wsMaxReconnectAttempts constant with bounded retry loop to prevent
  unbounded reconnection attempts
- Replace os.Getenv("OPENAI_API_KEY") with c.Env.Get() for consistent
  secret resolution via the environment provider
- Treat websocket.CloseNoStatusReceived as a normal close condition

Assisted-By: docker-agent
- Promote lastResponseID from wsConnection to wsPool so it naturally
  survives all connection transitions without manual threading
- Extract closeLocked(), dialLocked(), invalidateConn() helpers to
  eliminate duplicated connection lifecycle logic in Stream()
- Replace loop-of-one reconnect with a single dialLocked() call
- Extract sendResponseCreate() to deduplicate marshal+send between
  dialWebSocket() and sendOnExisting()
- Remove wsMaxReconnectAttempts constant (was always 1)
- Simplify wsConnection struct to just conn + createdAt

Net result: -18 lines, fewer code paths, same behavior.

Assisted-By: docker-agent
- Initialize wsPool eagerly in NewClient instead of lazily in
  createWebSocketStream to eliminate a potential data race when
  concurrent goroutines both see wsPool==nil
- Downgrade WebSocket→SSE fallback log from Error to Warn since this
  is an intentional graceful degradation, not an unexpected error
- Close HTTP response body defensively in dialWebSocket on handshake
  failure to prevent a potential resource leak

Assisted-By: docker-agent
The wsPool already tracked lastResponseID from completed responses but
never forwarded it to subsequent requests. Now, wsPool.Stream() injects
it as previous_response_id when the caller hasn't already set one,
enabling server-side context caching across multi-turn exchanges.

Add tests covering automatic injection, caller override preservation,
and survival across reconnections.

Assisted-By: docker-agent
@dgageot dgageot merged commit 53a35dc into docker:main Mar 20, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants