Skip to content

Commit 32b0e42

Browse files
committed
fix(core): tighten stream-manager lifecycles
1 parent 517c640 commit 32b0e42

4 files changed

Lines changed: 102 additions & 32 deletions

File tree

packages/core/src/v3/inputStreams/manager.ts

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ type OnceWaiter = {
1414
resolve: (result: InputStreamOnceResult<unknown>) => void;
1515
reject: (error: Error) => void;
1616
timeoutHandle?: ReturnType<typeof setTimeout>;
17+
// The abort signal and its handler are tracked on the waiter so any
18+
// resolution path (dispatch / timeout / explicit removal) can detach
19+
// the listener. Without this, a long-lived `AbortSignal` reused across
20+
// many `once()` calls accumulates listeners — `{ once: true }` only
21+
// self-clears if the signal actually aborts.
22+
signal?: AbortSignal;
23+
abortHandler?: () => void;
1724
};
1825

1926

@@ -36,6 +43,13 @@ export class StandardInputStreamManager implements InputStreamManager {
3643
// reconnect in a tight loop. Reset to 0 by `#dispatch` whenever a
3744
// record flows through.
3845
private reconnectAttempts = new Map<string, number>();
46+
// Stream IDs that were explicitly torn down by `disconnectStream`. The
47+
// tail's `.finally` reconnect path consults this set so a deliberate
48+
// teardown isn't immediately undone by the auto-reconnect when
49+
// handlers or once-waiters are still registered. Cleared on the next
50+
// explicit `on()` / `once()` (those are the only legitimate reasons to
51+
// bring the tail back up).
52+
private explicitlyDisconnected = new Set<string>();
3953

4054
constructor(
4155
private apiClient: ApiClient,
@@ -75,6 +89,10 @@ export class StandardInputStreamManager implements InputStreamManager {
7589
on(streamId: string, handler: InputStreamHandler): { off: () => void } {
7690
this.#requireV2Streams();
7791

92+
// A fresh attach is a legitimate reason to bring the tail back up;
93+
// clear any prior explicit-disconnect flag.
94+
this.explicitlyDisconnected.delete(streamId);
95+
7896
let handlerSet = this.handlers.get(streamId);
7997
if (!handlerSet) {
8098
handlerSet = new Set();
@@ -107,6 +125,10 @@ export class StandardInputStreamManager implements InputStreamManager {
107125
once(streamId: string, options?: InputStreamOnceOptions): InputStreamOncePromise<unknown> {
108126
this.#requireV2Streams();
109127

128+
// A fresh waiter is a legitimate reason to bring the tail back up;
129+
// clear any prior explicit-disconnect flag.
130+
this.explicitlyDisconnected.delete(streamId);
131+
110132
// Lazily connect a tail for this stream
111133
this.#ensureStreamTailConnected(streamId);
112134

@@ -131,17 +153,16 @@ export class StandardInputStreamManager implements InputStreamManager {
131153
reject(new Error("Aborted"));
132154
return;
133155
}
134-
options.signal.addEventListener(
135-
"abort",
136-
() => {
137-
if (waiter.timeoutHandle) {
138-
clearTimeout(waiter.timeoutHandle);
139-
}
140-
this.#removeOnceWaiter(streamId, waiter);
141-
reject(new Error("Aborted"));
142-
},
143-
{ once: true }
144-
);
156+
const abortHandler = () => {
157+
if (waiter.timeoutHandle) {
158+
clearTimeout(waiter.timeoutHandle);
159+
}
160+
this.#removeOnceWaiter(streamId, waiter);
161+
reject(new Error("Aborted"));
162+
};
163+
waiter.signal = options.signal;
164+
waiter.abortHandler = abortHandler;
165+
options.signal.addEventListener("abort", abortHandler, { once: true });
145166
}
146167

147168
// Handle timeout — resolve with error result instead of rejecting
@@ -186,6 +207,14 @@ export class StandardInputStreamManager implements InputStreamManager {
186207
}
187208

188209
disconnectStream(streamId: string): void {
210+
// Mark as explicitly disconnected BEFORE we abort, so the tail's
211+
// `.finally` reconnect path sees the flag when it runs (which can be
212+
// synchronous in the AbortError catch). Without this, an in-flight
213+
// `.on(...)` or pending `.once()` would immediately resurrect the
214+
// tail and negate the disconnect — defeating the
215+
// "drop-the-duplicate before .wait() suspends" contract. Cleared on
216+
// the next explicit `on()` / `once()`.
217+
this.explicitlyDisconnected.add(streamId);
189218
const tail = this.tails.get(streamId);
190219
if (tail) {
191220
tail.abortController.abort();
@@ -225,13 +254,17 @@ export class StandardInputStreamManager implements InputStreamManager {
225254
this.seqNums.clear();
226255
this.handlers.clear();
227256
this.reconnectAttempts.clear();
257+
this.explicitlyDisconnected.clear();
228258

229259
// Reject all pending once waiters
230260
for (const [, waiters] of this.onceWaiters) {
231261
for (const waiter of waiters) {
232262
if (waiter.timeoutHandle) {
233263
clearTimeout(waiter.timeoutHandle);
234264
}
265+
if (waiter.signal && waiter.abortHandler) {
266+
waiter.signal.removeEventListener("abort", waiter.abortHandler);
267+
}
235268
waiter.reject(new Error("Input stream manager reset"));
236269
}
237270
}
@@ -259,6 +292,13 @@ export class StandardInputStreamManager implements InputStreamManager {
259292
.finally(() => {
260293
this.tails.delete(streamId);
261294

295+
// If the tail was torn down explicitly via `disconnectStream`,
296+
// don't auto-reconnect — that's the whole point of the
297+
// disconnect call. The next `on()` / `once()` clears the flag.
298+
if (this.explicitlyDisconnected.has(streamId)) {
299+
return;
300+
}
301+
262302
// Auto-reconnect with exponential backoff if there are still
263303
// active handlers or waiters. Without backoff a persistent
264304
// failure (auth rejected, 5xx, DNS) would reconnect in a tight
@@ -273,6 +313,7 @@ export class StandardInputStreamManager implements InputStreamManager {
273313
this.reconnectAttempts.set(streamId, attempt + 1);
274314
const delayMs = computeReconnectDelayMs(attempt);
275315
setTimeout(() => {
316+
if (this.explicitlyDisconnected.has(streamId)) return;
276317
if (this.tails.has(streamId)) return;
277318
const stillHasHandlers =
278319
this.handlers.has(streamId) && this.handlers.get(streamId)!.size > 0;
@@ -361,6 +402,9 @@ export class StandardInputStreamManager implements InputStreamManager {
361402
if (waiter.timeoutHandle) {
362403
clearTimeout(waiter.timeoutHandle);
363404
}
405+
if (waiter.signal && waiter.abortHandler) {
406+
waiter.signal.removeEventListener("abort", waiter.abortHandler);
407+
}
364408
waiter.resolve({ ok: true, output: data });
365409
// Also invoke persistent handlers
366410
this.#invokeHandlers(streamId, data);
@@ -410,6 +454,13 @@ export class StandardInputStreamManager implements InputStreamManager {
410454
}
411455

412456
#removeOnceWaiter(streamId: string, waiter: OnceWaiter): void {
457+
// Centralized cleanup — both timeout and explicit abort paths funnel
458+
// through here, so detach the abort listener once instead of at every
459+
// callsite. The dispatch path doesn't go through this method (the
460+
// waiter is shifted off inline), so it detaches the listener there.
461+
if (waiter.signal && waiter.abortHandler) {
462+
waiter.signal.removeEventListener("abort", waiter.abortHandler);
463+
}
413464
const waiters = this.onceWaiters.get(streamId);
414465
if (!waiters) return;
415466
const index = waiters.indexOf(waiter);

packages/core/src/v3/sessionStreams/manager.ts

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ type OnceWaiter = {
1414
resolve: (result: InputStreamOnceResult<unknown>) => void;
1515
reject: (error: Error) => void;
1616
timeoutHandle?: ReturnType<typeof setTimeout>;
17+
// The abort signal and its handler are tracked on the waiter so any
18+
// resolution path (dispatch / timeout / explicit removal) can detach
19+
// the listener. Without this, a long-lived `AbortSignal` reused across
20+
// many `once()` calls accumulates listeners — `{ once: true }` only
21+
// self-clears if the signal actually aborts.
22+
signal?: AbortSignal;
23+
abortHandler?: () => void;
1724
};
1825

1926
type TailState = {
@@ -131,15 +138,14 @@ export class StandardSessionStreamManager implements SessionStreamManager {
131138
reject(new Error("Aborted"));
132139
return;
133140
}
134-
options.signal.addEventListener(
135-
"abort",
136-
() => {
137-
if (waiter.timeoutHandle) clearTimeout(waiter.timeoutHandle);
138-
this.#removeOnceWaiter(key, waiter);
139-
reject(new Error("Aborted"));
140-
},
141-
{ once: true }
142-
);
141+
const abortHandler = () => {
142+
if (waiter.timeoutHandle) clearTimeout(waiter.timeoutHandle);
143+
this.#removeOnceWaiter(key, waiter);
144+
reject(new Error("Aborted"));
145+
};
146+
waiter.signal = options.signal;
147+
waiter.abortHandler = abortHandler;
148+
options.signal.addEventListener("abort", abortHandler, { once: true });
143149
}
144150

145151
if (options?.timeoutMs) {
@@ -261,6 +267,9 @@ export class StandardSessionStreamManager implements SessionStreamManager {
261267
for (const [, waiters] of this.onceWaiters) {
262268
for (const waiter of waiters) {
263269
if (waiter.timeoutHandle) clearTimeout(waiter.timeoutHandle);
270+
if (waiter.signal && waiter.abortHandler) {
271+
waiter.signal.removeEventListener("abort", waiter.abortHandler);
272+
}
264273
waiter.reject(new Error("Session stream manager reset"));
265274
}
266275
}
@@ -403,6 +412,9 @@ export class StandardSessionStreamManager implements SessionStreamManager {
403412
const waiter = waiters.shift()!;
404413
if (waiters.length === 0) this.onceWaiters.delete(key);
405414
if (waiter.timeoutHandle) clearTimeout(waiter.timeoutHandle);
415+
if (waiter.signal && waiter.abortHandler) {
416+
waiter.signal.removeEventListener("abort", waiter.abortHandler);
417+
}
406418
waiter.resolve({ ok: true, output: data });
407419
this.#invokeHandlers(key, data);
408420
return;
@@ -450,6 +462,13 @@ export class StandardSessionStreamManager implements SessionStreamManager {
450462
}
451463

452464
#removeOnceWaiter(key: string, waiter: OnceWaiter): void {
465+
// Centralized cleanup — both timeout and explicit abort paths funnel
466+
// through here, so detach the abort listener once instead of at every
467+
// callsite. The dispatch path doesn't go through this method (the
468+
// waiter is shifted off inline), so it detaches the listener there.
469+
if (waiter.signal && waiter.abortHandler) {
470+
waiter.signal.removeEventListener("abort", waiter.abortHandler);
471+
}
453472
const waiters = this.onceWaiters.get(key);
454473
if (!waiters) return;
455474
const index = waiters.indexOf(waiter);

packages/core/src/v3/test/test-realtime-streams-manager.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ export class TestRealtimeStreamsManager implements RealtimeStreamsManager {
158158
reset(): void {
159159
this.buffers.clear();
160160
this.pipeWaits.clear();
161+
this.writeListeners.clear();
161162
}
162163

163164
private getBuffer(key: string): unknown[] {

packages/core/src/v3/test/test-session-stream-manager.ts

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,18 @@ export class TestSessionStreamManager implements SessionStreamManager {
221221

222222
const handlers = this.handlers.get(key);
223223
if (handlers && handlers.size > 0) {
224+
// Awaited so test code can rely on handlers having completed by the
225+
// time `__sendFromTest` resolves. Wrapped per-handler so a
226+
// throwing/rejecting handler doesn't poison Promise.all and break
227+
// unrelated test state.
224228
await Promise.all(
225-
Array.from(handlers).map((h) => Promise.resolve().then(() => h(data)))
229+
Array.from(handlers).map(async (h) => {
230+
try {
231+
await h(data);
232+
} catch {
233+
// Never let a handler error break test state
234+
}
235+
})
226236
);
227237
}
228238

@@ -267,17 +277,6 @@ export class TestSessionStreamManager implements SessionStreamManager {
267277
}
268278
}
269279

270-
private invoke(handler: Handler, data: unknown): void {
271-
try {
272-
const result = handler(data);
273-
if (result && typeof result === "object" && "catch" in result) {
274-
(result as Promise<void>).catch(() => {});
275-
}
276-
} catch {
277-
// Never let a handler error break test state
278-
}
279-
}
280-
281280
private removeWaiter(key: string, waiter: OnceWaiter): void {
282281
const waiters = this.onceWaiters.get(key);
283282
if (!waiters) return;

0 commit comments

Comments
 (0)