Skip to content

fix: fix openai asr reconnection issue#2163

Open
YiminW wants to merge 2 commits into
mainfrom
dev/openai_asr_reconnect
Open

fix: fix openai asr reconnection issue#2163
YiminW wants to merge 2 commits into
mainfrom
dev/openai_asr_reconnect

Conversation

@YiminW
Copy link
Copy Markdown
Contributor

@YiminW YiminW commented May 13, 2026

No description provided.

@YiminW YiminW requested review from halajohn and plutoless as code owners May 13, 2026 11:09
@github-actions
Copy link
Copy Markdown

Review

Nice diagnosis — combining a priority queue at the transport layer with client-side buffering of audio/commit until transcription_session.updated is a reasonable fix for the reconnection race.

Bugs / correctness

  1. Unbounded _pending_audio_messages (client.py:290-293). If the server stalls on reconnect, or the first transcription_session.update takes a while to be acknowledged, audio keeps accumulating in memory with no cap. At 24 kHz PCM16 + base64, a few minutes of speech is ~5–10 MB. Recommend either a bounded collections.deque(maxlen=N) (dropping oldest is acceptable for real-time ASR) or a max-byte cap with a warning when the cap is hit.

  2. _pending_commit_requested collapsed to a bool. If a caller expects send_end_of_stream() to produce a committed chunk per call, coalescing multiple pre-ready commits into one silently changes the transcription boundaries. In practice send_end_of_stream seems to be a once-per-stream call so this is likely fine, but worth a one-line comment or an assert.

  3. Pending buffer is not cleared on stop(). If stop() runs before the session becomes ready, the buffer just leaks. Minor, but consider clearing in on_disconnect when the shutdown flag is set.

  4. _message_sequence is not reset on reconnect (ws_client.py:70). Harmless today because it's only used as a tiebreaker inside the priority queue, but if you ever swap to a fresh queue on reconnect, make sure the monotonicity is preserved or reset together.

  5. Pre-reconnect audio still bypasses the client buffer. Between the socket closing and on_reconnect() clearing params_ready_event, send_pcm_data still sees is_set() == True and routes audio straight into the priority queue. That audio then sits in the queue through the reconnect. The priority-queue layer keeps it ordered correctly relative to the new transcription_session.update (priority 0 wins), but these messages are sent before the server acknowledges the update — the exact condition the buffering is trying to avoid. Either clear params_ready_event earlier (e.g. on on_close / on_disconnect), or re-enqueue unacked audio into _pending_audio_messages when a disconnect fires.

Code quality

  • The double-buffering (priority queue + pending list) works but is subtle. A short comment in _update_session / _flush_pending_audio explaining why both layers are needed (the priority queue alone can't hold messages back until the server ACKs the session update) would save the next reader 15 minutes.
  • priority magic numbers 0 / 10 / 20 are repeated across both files. Consider module-level constants (PRIORITY_SESSION = 0, PRIORITY_AUDIO = 10, PRIORITY_COMMIT = 20).
  • The asyncio.PriorityQueue typing tuple[int, int, str | bytes] works but using dataclass(order=True) with a field(compare=False) payload is a bit more self-documenting.

Tests

No tests added. The reconnection race and the flush-on-ready paths are exactly the kind of logic that silently regresses. Even a small unit test around OpenAIAsrClient with a mocked send that asserts (a) pre-ready audio is buffered, (b) flush happens only after transcription_session.updated, and (c) flush ordering = audio-then-commit would lock this behavior in.

Nits

  • PR title `fix: fix openai asr reconnection issue` has a redundant "fix". fix(openai_asr): buffer audio until session ready on reconnect would be closer to the repo's conventional-commit style.
  • client.py:160 uses priority=0 for the session update but the comment above says "Queue transcription_session.update" — consider "Priority-enqueue transcription_session.update (must precede audio)".

Overall the approach is sound; the main thing I'd ask for before merging is a bound on _pending_audio_messages and a basic test that exercises the pre-ready buffering + flush path.

@github-actions
Copy link
Copy Markdown

Review

The fix addresses a real problem (audio sent before transcription_session.updated is silently dropped or arrives out of order vs. the session config) and the priority queue + pre-ready buffering approach is reasonable. A few items worth a closer look before merging:

Potential issues

1. Ordering race during flush in _flush_pending_audio (client.py:163-181). When transcription_session.updated arrives, the handler does:

self.params_ready_event.set()
await self._flush_pending_audio()

Between set() and the first await self.send(...) for the buffered messages, any concurrent send_pcm_data call (e.g. from the audio task) will see params_ready_event.is_set() is True and enqueue the new chunk directly. Both paths use priority=10, so order is broken on the monotonic sequence counter — and since sequence is incremented at self.send() call time (not at buffer time), the live chunk may get a lower sequence than the older buffered chunks that are still being re-enqueued. Net effect: older buffered audio can land after newer live audio in the WS sender queue.

Two options: (a) drain _pending_audio_messages and enqueue them onto the priority queue synchronously (no await between), or (b) hold a lock so send_pcm_data waits until the flush completes. Given the audio chunks are small the user-visible impact is probably tolerable, but worth a comment at minimum.

2. _pending_audio_drop_count is dead state (client.py:135, 167-168, 247-251, 254). The counter is incremented on each drop and reset in _flush_pending_audio / _clear_pending_audio, but it is never logged or read. Either log it (e.g. on flush: "flushed N pending, dropped M during buffering") or remove it. Right now you only get a single one-shot warning regardless of how much was dropped.

3. _pending_audio_drop_warned reset on flush vs. silent re-fill. It is reset on _flush_pending_audio, so the next overflow during the same session will re-warn — good. But on on_close (which fires on reconnect) params_ready_event is cleared and the buffer can fill again; the flag is not reset there, so a second overflow will be silent. Suggest resetting it in on_close as well, or simply log every drop above a threshold rather than a one-shot.

4. Hardcoded 8 MB cap (client.py:136). For 16-bit PCM at 24 kHz that is ~175 s of audio. Probably fine as a default, but consider plumbing it through the manifest config so deployments with longer reconnect windows can tune it without a code change.

5. on_close and on_disconnect both clear params_ready_event. Functionally fine (idempotent), but a brief comment about why both is helpful since the call paths differ (_receiver_handler invokes on_close, _run invokes on_disconnect).

6. send_heartbeat uses default priority 10. During a backlog of pending audio, heartbeats sit behind audio. If the heartbeat is meant to keep the socket alive under load, consider giving it priority 0 or 5.

7. Clearing pending state on shutdown only (on_disconnect). Correct — preserves audio across reconnects, drops on real shutdown. The condition if self._shutdown_event.is_set() reads from the base class' private; consider exposing a small is_shutting_down() accessor on WebSocketClient to keep the abstraction clean.

Test coverage

The PR adds non-trivial state machine logic (priority ordering, drop policy, buffer/flush across reconnects, commit coalescing) but no new tests under tests/. At minimum it would be valuable to cover:

  • Audio sent before transcription_session.updated is buffered, then flushed in order on session-ready.
  • Buffer overflow drops oldest and emits the warning once.
  • send_end_of_stream before ready is coalesced and fires once after flush.
  • Reconnect preserves buffered audio; explicit stop() clears it.

Minor / style

  • manifest.json version bump to 0.3.1 — good.
  • Mixed log formatting styles in the new code (f-strings vs %s/%d lazy formatting). The lazy form is preferred for debug level in hot paths so the format isn't materialized when the level is disabled. The audio path's f"..." debug logs run per chunk.
  • Type annotation on the deque uses tuple[str, int] but the second element is recorded as the byte length only — a tiny dataclass or named tuple might document intent better than the bare tuple, especially since the bytes count is what drives the eviction policy.

Overall the direction is correct; the ordering race in (1) is the one I'd most want addressed (or explicitly acknowledged as acceptable) before merge.

@YiminW YiminW force-pushed the dev/openai_asr_reconnect branch from 5ad8c92 to f5ad6fc Compare May 14, 2026 02:40
@github-actions
Copy link
Copy Markdown

PR #2163 Review — fix: fix openai asr reconnection issue

Thanks for tackling this — gating audio on transcription_session.updated and using priorities to ensure the session update jumps the queue is the right shape of fix. A few things worth a closer look.

Potential issues

1. Unbounded buffer in _pending_audio_messages (client.py)

If the server is slow to reply with transcription_session.updated, or the client is stuck in a reconnect loop, _pending_audio_messages grows without limit. At 16 kHz / pcm16 / 50 ms chunks, each base64-encoded JSON message is ~2–3 KB; a 10-second stall is hundreds of frames, and a stuck reconnect could OOM a long-running agent. Consider:

  • A bounded deque (drop-oldest), e.g. collections.deque(maxlen=N), with a debug log when frames are dropped, or
  • A hard upper bound plus on_asr_client_error so the caller can decide.

The same concern applies to the existing _message_queue (asyncio.PriorityQueue is also unbounded).

2. Stale audio in _message_queue survives reconnect (ws_client.py)

_pending_audio_messages correctly gates new audio after on_reconnect clears params_ready_event. But audio that was already pushed to _message_queue before the disconnect is still in the priority queue. After reconnect, the priority-0 transcription_session.update does go first, but those stale audio frames will then be sent against the new (and not-yet-confirmed) session — which is exactly the ordering bug this PR is trying to prevent for the new audio path.

Two options:

  • Drain _message_queue on on_close / on_reconnect (audio frames at least; the next session update will be re-pushed by on_open_update_session).
  • Keep audio out of _message_queue entirely and only push it through the _pending_audio_messages → flush path, which already enforces the gate.

3. params_ready_event.clear() is now in two places — worth a comment

You added params_ready_event.clear() in on_close, which is good. The reconnect-path clear in on_reconnect is also still there. They cover different timings: on_close fires before on_reconnect, and the event needs to be clear immediately so any in-flight send_pcm_data between close and reconnect doesn't slip through. A short comment would prevent a future reader from "simplifying" by removing one.

4. _pending_commit_requested coalescing is correct, but worth verifying the contract

The comment "pre-ready commit requests are coalesced" is fine for the OpenAI realtime API — input_audio_buffer.commit is idempotent at session start. Just confirm a caller that issues multiple send_end_of_stream() calls during the buffering window doesn't expect each to commit a separate audio segment.

5. _handle_error semantics

Removing the trailing return and the comment is fine; behavior is unchanged. The explicit return did make the early-exit pattern a bit more readable, but not a blocker.

Code quality

  • Mixed log formatting in client.py — most lines use f-strings, the new ones use %-style ("...pending=%d", len(...)). Pick one for consistency. %-style is lazier on disabled log levels, but the surrounding code uses f-strings throughout.
  • Removing tautological one-line comments (# awaitable function, # unexpected message...) is a nice cleanup — agrees with the project's "no obvious comments" convention.
  • The if/elif to early-return refactor in _handle_event reads well.
  • _message_sequence as a monotonic int is correct. Python ints handle unbounded growth fine; no concern.

Performance

  • Priority queue per-message overhead is O(log n) instead of O(1), which is fine at audio rates (20 msg/s for 50 ms chunks).
  • Holding _pending_lock across await self.send(...) in the flush path is correct (it preserves ordering w.r.t. send_pcm_data) but means audio producers can briefly block during the flush. At realistic buffer sizes this is microseconds; only a concern if (1) is fixed by allowing very large buffers.

Security

Nothing new. Audio content is base64-encoded (existing behavior), credentials are still in headers.

Test coverage

No tests for the new buffering / priority / reconnect-ordering behavior were added. The existing test file only covers ASR result parsing. Given that this PR fixes a non-obvious race, even one async test that:

  1. Constructs the client with a fake WebSocketClient.send,
  2. Calls send_pcm_data before _handle_event({"type": "transcription_session.updated"}),
  3. Asserts the session update is sent before the buffered audio,

would lock in the fix and prevent regression. Worth adding.

Nits

  • The simplified __main__ block is cleaner, but you also removed the KeyboardInterrupt / client.stop() handling. Running this file directly now leaves the websocket open on Ctrl-C (the client_task will be cancelled but stop() is never called). Minor since it's an example, but worth a try/finally.

Overall the direction is right and the priority + buffering combo is a sensible way to enforce the "session update before audio" invariant. The biggest follow-up I'd push for is bounding the buffers and draining _message_queue on disconnect; the rest is polish.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant