Skip to content

Commit 20fb74a

Browse files
committed
fix(core,sdk): resume session.in via seq cursor on turn-complete header
Replaces the wall-clock dedup cutoff on session.in with a seq-based resume cursor carried on each turn-complete control record as a `session-in-event-id` sibling header. On worker boot the cursor is read off .out's latest turn-complete and seeds .in's SSE subscription so already-processed user messages aren't replayed from S2. The timestamp model had a narrow race: the snapshot's lastOutTimestamp was Date.now() sampled after turn-complete landed on S2, so a user message arriving between turn-complete and snapshot-write could be silently filtered as "old" on the next worker boot. Seq numbers are unique, total-ordered, and stamped by S2, so the same cutoff now falls out without any clock involved. The sessionStreams manager grows a second cursor (lastDispatchedSeqNum) distinct from the existing received cursor. It advances only when a record is delivered to a once()/wait() consumer or shifted off the buffer into one. The agent persists it on turn-complete and seeds it back at the next worker's boot before any .in SSE subscribe opens. lastOutTimestamp is gone from the snapshot; the rescan is O(1 turn) since .out is bounded. Also bundles four small PR-review fixes: - runStream SSEStreamPart doc had the trigger-control vs S2-command-record marker reversed - agentChat removed a redundant reader.releaseLock() before a recursive return that the finally block already covers - sessions surfaces onControl on SessionSubscribeOptions so consumers of the raw .out.read() API see header-form control records - chatSnapshot drops the now-unused lastOutTimestamp field
1 parent b3aa85b commit 20fb74a

11 files changed

Lines changed: 229 additions & 67 deletions

File tree

packages/cli-v3/src/mcp/tools/agentChat.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -433,9 +433,8 @@ async function collectAgentResponse(
433433
// record's seq (set above when the part arrived). The recursive
434434
// subscribe resumes right after that marker, so we don't replay
435435
// the entire session.out stream — which would hit a historical
436-
// turn-complete and break the loop with empty/old text.
437-
reader.releaseLock();
438-
// Recurse — subscribe to the new run's stream (same session.out URL)
436+
// turn-complete and break the loop with empty/old text. The outer
437+
// `finally` block releases the reader before the recursion runs.
439438
return collectAgentResponse(session, depth + 1);
440439
}
441440

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,10 +179,10 @@ export type SSEStreamPart<TChunk = unknown> = {
179179
/**
180180
* S2 record headers, when the underlying transport is the v2 batch shape
181181
* (Session streams). Undefined for v1 streams. Empty array when the record
182-
* had no headers. First-header empty-name is a Trigger control protocol
183-
* marker (see `trigger-control` records on `session.out`); empty-name
184-
* records that S2 itself interprets as command records (trim/fence) are
185-
* filtered out before reaching this struct.
182+
* had no headers. Trigger control records carry a `trigger-control` named
183+
* header (see `trigger-control` records on `session.out`) and may reach
184+
* this struct. S2 command records (trim/fence) are identified by an
185+
* empty-name first header and are filtered out before enqueue.
186186
*/
187187
headers?: Array<[string, string]>;
188188
};

packages/core/src/v3/sessionStreams/chatSnapshot.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@
1313
* replay from precisely after the snapshot, and as the trim-chain seed
1414
* for the agent's next turn.
1515
*
16-
* `lastOutTimestamp` is the same record's S2 arrival timestamp (ms since
17-
* epoch). Used as the dedup cutoff for `session.in` on OOM-retry boot.
18-
*
1916
* The `version` field is a forward-compat lever: readers that don't
2017
* recognise a version silently fall back to no-snapshot behaviour.
2118
*/
@@ -29,7 +26,6 @@ export type ChatSnapshotV1<TUIMessage extends UIMessage = UIMessage> = {
2926
savedAt: number;
3027
messages: TUIMessage[];
3128
lastOutEventId?: string;
32-
lastOutTimestamp?: number;
3329
};
3430

3531
/**
@@ -43,7 +39,6 @@ export const ChatSnapshotV1Schema = z.object({
4339
savedAt: z.number(),
4440
messages: z.array(z.unknown()),
4541
lastOutEventId: z.string().optional(),
46-
lastOutTimestamp: z.number().optional(),
4742
});
4843

4944
/**

packages/core/src/v3/sessionStreams/index.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,18 @@ export class SessionStreamsAPI implements SessionStreamManager {
5959
this.#getManager().setLastSeqNum(sessionId, io, seqNum);
6060
}
6161

62+
public lastDispatchedSeqNum(sessionId: string, io: SessionChannelIO): number | undefined {
63+
return this.#getManager().lastDispatchedSeqNum(sessionId, io);
64+
}
65+
66+
public setLastDispatchedSeqNum(
67+
sessionId: string,
68+
io: SessionChannelIO,
69+
seqNum: number
70+
): void {
71+
this.#getManager().setLastDispatchedSeqNum(sessionId, io, seqNum);
72+
}
73+
6274
public setMinTimestamp(
6375
sessionId: string,
6476
io: SessionChannelIO,

packages/core/src/v3/sessionStreams/manager.ts

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ export class StandardSessionStreamManager implements SessionStreamManager {
4444
private handlers = new Map<string, Set<SessionStreamHandler>>();
4545
private onceWaiters = new Map<string, OnceWaiter[]>();
4646
private buffer = new Map<string, unknown[]>();
47+
// Parallel to `buffer`: the SSE seq_num of each buffered record. Same
48+
// length and order as `buffer[key]`. Used so that when `once()` shifts
49+
// a buffered record into a waiter, the cursor (`lastDispatchedSeqNums`)
50+
// can advance to that record's seq. Kept as a separate map so the
51+
// existing `peek()` shape (returns `unknown`) stays unchanged.
52+
private bufferSeqNums = new Map<string, number[]>();
4753
private tails = new Map<string, TailState>();
4854
// Per-stream lower-bound timestamp filter. When set, records whose
4955
// SSE timestamp is <= the bound are dropped before dispatch — used by
@@ -59,6 +65,15 @@ export class StandardSessionStreamManager implements SessionStreamManager {
5965
// that's already being delivered out-of-band via the waitpoint.
6066
private explicitlyDisconnected = new Set<string>();
6167
private seqNums = new Map<string, number>();
68+
// Highest seq_num that has been *consumed* (delivered to a once()
69+
// waiter or shifted off the buffer into a once() caller) on a channel.
70+
// Distinct from `seqNums`, which advances whenever any record is
71+
// received from SSE — even ones still sitting in the local buffer.
72+
// The committed-consume cursor is what gets persisted on the
73+
// turn-complete control record's `session-in-event-id` header so the
74+
// next worker boot can resume `.in` from this point without
75+
// re-delivering already-handled user messages.
76+
private lastDispatchedSeqNums = new Map<string, number>();
6277
// Reconnect attempt counter per key. Drives the exponential backoff
6378
// applied by `#ensureTailConnected`'s `.finally` so a persistent
6479
// backend failure (auth rejection, 5xx, DNS, etc.) doesn't reconnect
@@ -123,8 +138,14 @@ export class StandardSessionStreamManager implements SessionStreamManager {
123138
const buffered = this.buffer.get(key);
124139
if (buffered && buffered.length > 0) {
125140
const data = buffered.shift()!;
141+
const seqList = this.bufferSeqNums.get(key);
142+
const shiftedSeqNum = seqList?.shift();
126143
if (buffered.length === 0) {
127144
this.buffer.delete(key);
145+
this.bufferSeqNums.delete(key);
146+
}
147+
if (shiftedSeqNum !== undefined) {
148+
this.#advanceLastDispatched(key, shiftedSeqNum);
128149
}
129150
return new InputStreamOncePromise((resolve) => {
130151
resolve({ ok: true, output: data });
@@ -186,6 +207,25 @@ export class StandardSessionStreamManager implements SessionStreamManager {
186207
}
187208
}
188209

210+
lastDispatchedSeqNum(sessionId: string, io: SessionChannelIO): number | undefined {
211+
return this.lastDispatchedSeqNums.get(keyFor(sessionId, io));
212+
}
213+
214+
setLastDispatchedSeqNum(
215+
sessionId: string,
216+
io: SessionChannelIO,
217+
seqNum: number
218+
): void {
219+
this.#advanceLastDispatched(keyFor(sessionId, io), seqNum);
220+
}
221+
222+
#advanceLastDispatched(key: string, seqNum: number): void {
223+
const current = this.lastDispatchedSeqNums.get(key);
224+
if (current === undefined || seqNum > current) {
225+
this.lastDispatchedSeqNums.set(key, seqNum);
226+
}
227+
}
228+
189229
setMinTimestamp(
190230
sessionId: string,
191231
io: SessionChannelIO,
@@ -204,7 +244,15 @@ export class StandardSessionStreamManager implements SessionStreamManager {
204244
const buffered = this.buffer.get(key);
205245
if (buffered && buffered.length > 0) {
206246
buffered.shift();
207-
if (buffered.length === 0) this.buffer.delete(key);
247+
const seqList = this.bufferSeqNums.get(key);
248+
const shiftedSeqNum = seqList?.shift();
249+
if (buffered.length === 0) {
250+
this.buffer.delete(key);
251+
this.bufferSeqNums.delete(key);
252+
}
253+
if (shiftedSeqNum !== undefined) {
254+
this.#advanceLastDispatched(key, shiftedSeqNum);
255+
}
208256
return true;
209257
}
210258
return false;
@@ -224,6 +272,7 @@ export class StandardSessionStreamManager implements SessionStreamManager {
224272
this.tails.delete(key);
225273
}
226274
this.buffer.delete(key);
275+
this.bufferSeqNums.delete(key);
227276
// Reset the backoff counter so a future re-attach starts fresh —
228277
// an explicit disconnect is a deliberate teardown, not evidence of
229278
// a broken backend.
@@ -261,6 +310,7 @@ export class StandardSessionStreamManager implements SessionStreamManager {
261310
reset(): void {
262311
this.disconnect();
263312
this.seqNums.clear();
313+
this.lastDispatchedSeqNums.clear();
264314
this.minTimestamps.clear();
265315
this.handlers.clear();
266316
this.reconnectAttempts.clear();
@@ -276,6 +326,7 @@ export class StandardSessionStreamManager implements SessionStreamManager {
276326
}
277327
this.onceWaiters.clear();
278328
this.buffer.clear();
329+
this.bufferSeqNums.clear();
279330
}
280331

281332
#ensureTailConnected(sessionId: string, io: SessionChannelIO): void {
@@ -385,7 +436,7 @@ export class StandardSessionStreamManager implements SessionStreamManager {
385436
// keep as string
386437
}
387438
}
388-
this.#dispatch(key, data);
439+
this.#dispatch(key, data, Number.isFinite(seqNum) ? seqNum : undefined);
389440
},
390441
onComplete: () => {
391442
if (this.debug) {
@@ -410,7 +461,7 @@ export class StandardSessionStreamManager implements SessionStreamManager {
410461
}
411462
}
412463

413-
#dispatch(key: string, data: unknown): void {
464+
#dispatch(key: string, data: unknown, seqNum: number | undefined): void {
414465
// Any record flowing through = healthy connection; reset the backoff
415466
// counter so the next disconnect starts fresh.
416467
this.reconnectAttempts.delete(key);
@@ -423,6 +474,12 @@ export class StandardSessionStreamManager implements SessionStreamManager {
423474
if (waiter.signal && waiter.abortHandler) {
424475
waiter.signal.removeEventListener("abort", waiter.abortHandler);
425476
}
477+
// Record was consumed directly by a waiter — advance the
478+
// committed-consume cursor immediately. Buffered-then-shifted
479+
// records advance the cursor in `once()` / `shiftBuffer()`.
480+
if (seqNum !== undefined) {
481+
this.#advanceLastDispatched(key, seqNum);
482+
}
426483
waiter.resolve({ ok: true, output: data });
427484
this.#invokeHandlers(key, data);
428485
return;
@@ -442,6 +499,14 @@ export class StandardSessionStreamManager implements SessionStreamManager {
442499
this.buffer.set(key, buffered);
443500
}
444501
buffered.push(data);
502+
if (seqNum !== undefined) {
503+
let bufferedSeqs = this.bufferSeqNums.get(key);
504+
if (!bufferedSeqs) {
505+
bufferedSeqs = [];
506+
this.bufferSeqNums.set(key, bufferedSeqs);
507+
}
508+
bufferedSeqs.push(seqNum);
509+
}
445510
}
446511

447512
#invokeHandlers(key: string, data: unknown): void {

packages/core/src/v3/sessionStreams/noopManager.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,16 @@ export class NoopSessionStreamManager implements SessionStreamManager {
3131

3232
setLastSeqNum(_sessionId: string, _io: SessionChannelIO, _seqNum: number): void {}
3333

34+
lastDispatchedSeqNum(_sessionId: string, _io: SessionChannelIO): number | undefined {
35+
return undefined;
36+
}
37+
38+
setLastDispatchedSeqNum(
39+
_sessionId: string,
40+
_io: SessionChannelIO,
41+
_seqNum: number
42+
): void {}
43+
3444
setMinTimestamp(
3545
_sessionId: string,
3646
_io: SessionChannelIO,

packages/core/src/v3/sessionStreams/types.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,28 @@ export interface SessionStreamManager {
4545
/** Advance the last-seen sequence number (prevents SSE replay after `.wait` resume). */
4646
setLastSeqNum(sessionId: string, io: SessionChannelIO, seqNum: number): void;
4747

48+
/**
49+
* Highest sequence number that has been *consumed* on the channel —
50+
* delivered to a `once()` waiter or shifted off the buffer into one.
51+
* Distinct from {@link lastSeqNum}, which advances on every received
52+
* record regardless of whether anything consumed it. Used by
53+
* `chat.agent` to persist the `.in` resume cursor on each
54+
* `turn-complete` control record so the next worker boot can resume
55+
* the channel from this point without replaying processed messages.
56+
*/
57+
lastDispatchedSeqNum(sessionId: string, io: SessionChannelIO): number | undefined;
58+
59+
/**
60+
* Seed the committed-consume cursor at worker boot — e.g. from the
61+
* `session-in-event-id` header on the latest `turn-complete` on
62+
* `.out`. Monotonic: only ever advances forward, never backwards.
63+
*/
64+
setLastDispatchedSeqNum(
65+
sessionId: string,
66+
io: SessionChannelIO,
67+
seqNum: number
68+
): void;
69+
4870
/**
4971
* Set a per-stream lower-bound SSE timestamp. Records whose timestamp
5072
* is `<= minTimestamp` are dropped before dispatch. Used by chat.agent

packages/core/src/v3/sessionStreams/wireProtocol.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,18 @@ export const PUBLIC_ACCESS_TOKEN_HEADER = "public-access-token" as const;
2828
/** Header name carrying the agent's last S2 event id on a handover bridge. */
2929
export const SESSION_STATE_LAST_EVENT_ID_HEADER = "last-event-id" as const;
3030

31+
/**
32+
* Header on `turn-complete` records carrying the highest `session.in`
33+
* seq_num the agent committed to processing during this turn. Read on
34+
* the next worker boot to seed `.in`'s resume cursor — anything past
35+
* this seq is new and gets delivered; anything at-or-before was already
36+
* processed and is skipped. Decimal-string form of the seq_num.
37+
*
38+
* Omitted when no `.in` records have been consumed yet (first turn of a
39+
* fresh chat triggered via the wire payload).
40+
*/
41+
export const SESSION_IN_EVENT_ID_HEADER = "session-in-event-id" as const;
42+
3143
export const TRIGGER_CONTROL_SUBTYPE = {
3244
TURN_COMPLETE: "turn-complete",
3345
UPGRADE_REQUIRED: "upgrade-required",

packages/core/src/v3/test/test-session-stream-manager.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,21 @@ export class TestSessionStreamManager implements SessionStreamManager {
145145
this.seqNums.set(keyFor(sessionId, io), seqNum);
146146
}
147147

148+
lastDispatchedSeqNum(_sessionId: string, _io: SessionChannelIO): number | undefined {
149+
// The test harness drives records via `__sendFromTest` without seq
150+
// numbers, so the committed-consume cursor stays undefined. Tests
151+
// that need cursor behaviour exercise it via the real manager.
152+
return undefined;
153+
}
154+
155+
setLastDispatchedSeqNum(
156+
_sessionId: string,
157+
_io: SessionChannelIO,
158+
_seqNum: number
159+
): void {
160+
// no-op — see comment on `lastDispatchedSeqNum`.
161+
}
162+
148163
setMinTimestamp(
149164
_sessionId: string,
150165
_io: SessionChannelIO,

0 commit comments

Comments
 (0)