Skip to content

Commit b728453

Browse files
committed
feat(sdk): chat.agent oomMachine for automatic OOM-retry on a larger machine
Setting oomMachine opts a chat.agent into one-shot OOM recovery: the failed turn re-runs on the larger machine, derives a session.in cutoff from the latest trigger:turn-complete chunk on session.out, and skips the turns that already completed on the prior attempt. Adds the setMinTimestamp filter on the session-stream manager and force-kills the dev worker between attempts so local behavior matches prod.
1 parent 72efbcc commit b728453

9 files changed

Lines changed: 425 additions & 20 deletions

File tree

packages/cli-v3/src/dev/taskRunProcessPool.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,11 @@ export class TaskRunProcessPool {
127127
return { taskRunProcess: newProcess, isReused: false };
128128
}
129129

130-
async returnProcess(process: TaskRunProcess, version: string): Promise<void> {
130+
async returnProcess(
131+
process: TaskRunProcess,
132+
version: string,
133+
options?: { forceKill?: boolean }
134+
): Promise<void> {
131135
// Remove from busy processes for this version
132136
const busyProcesses = this.busyProcessesByVersion.get(version);
133137
if (busyProcesses) {
@@ -141,6 +145,19 @@ export class TaskRunProcessPool {
141145
);
142146
}
143147

148+
// `forceKill` skips the reuse heuristic and tears the process down. Used
149+
// on outcomes that leave the process in a state we can't safely reuse
150+
// (OOM in particular — production would get a fresh container, so local
151+
// dev should match that).
152+
if (options?.forceKill) {
153+
logger.debug("[TaskRunProcessPool] Force-killing process", {
154+
version,
155+
pid: process.pid,
156+
});
157+
await this.killProcess(process);
158+
return;
159+
}
160+
144161
if (this.shouldReuseProcess(process, version)) {
145162
const availableCount = this.availableProcessesByVersion.get(version)?.length || 0;
146163
const busyCount = this.busyProcessesByVersion.get(version)?.size || 0;

packages/cli-v3/src/entryPoints/dev-run-controller.ts

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import {
22
CompleteRunAttemptResult,
33
DequeuedMessage,
44
IntervalService,
5+
isManualOutOfMemoryError,
6+
isOOMRunError,
57
LogLevel,
68
RunExecutionData,
79
SuspendedProcessError,
@@ -52,6 +54,12 @@ export class DevRunController {
5254
private readonly cwd?: string;
5355
private isCompletingRun = false;
5456
private isShuttingDown = false;
57+
// Set when the current attempt's outcome means the worker process can't
58+
// safely be reused (OOM in particular). Production gives every retry a
59+
// fresh container; local dev's process pool needs the same on these
60+
// outcomes or in-process state (e.g. session.in cursors) leaks across
61+
// attempts and the OOM retry skips the message that triggered it.
62+
private discardProcessOnReturn = false;
5563

5664
private state:
5765
| {
@@ -539,6 +547,13 @@ export class DevRunController {
539547
error: TaskRunProcess.parseExecuteError(error),
540548
} satisfies TaskRunFailedExecutionResult;
541549

550+
// Same OOM check as the success path: if the thrown error parses to
551+
// an OOM, force-kill the process when it's eventually returned (via
552+
// runFinished / stop) instead of recycling it.
553+
if (isOOMRunError(completion.error) || isManualOutOfMemoryError(completion.error)) {
554+
this.discardProcessOnReturn = true;
555+
}
556+
542557
const completionResult = await this.httpClient.dev.completeRunAttempt(
543558
run.friendlyId,
544559
this.snapshotFriendlyId ?? snapshot.friendlyId,
@@ -664,10 +679,22 @@ export class DevRunController {
664679

665680
this.isCompletingRun = true;
666681

682+
// Detect OOM in the failure result so we can force-kill the worker
683+
// instead of returning it to the pool. Mirrors the production behavior
684+
// where OOM retry happens on a brand-new container.
685+
if (
686+
!completion.ok &&
687+
(isOOMRunError(completion.error) || isManualOutOfMemoryError(completion.error))
688+
) {
689+
this.discardProcessOnReturn = true;
690+
}
691+
667692
// Return process to pool instead of killing it
668693
try {
669694
const version = this.opts.worker.serverWorker?.version || "unknown";
670-
await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess, version);
695+
await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess, version, {
696+
forceKill: this.discardProcessOnReturn,
697+
});
671698
this.taskRunProcess = undefined;
672699
} catch (error) {
673700
logger.debug("Failed to return task run process to pool, submitting completion anyway", {
@@ -820,7 +847,9 @@ export class DevRunController {
820847
if (this.taskRunProcess) {
821848
try {
822849
const version = this.opts.worker.serverWorker?.version || "unknown";
823-
await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess, version);
850+
await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess, version, {
851+
forceKill: this.discardProcessOnReturn,
852+
});
824853
this.taskRunProcess = undefined;
825854
} catch (error) {
826855
logger.debug("Failed to return task run process to pool during runFinished", { error });
@@ -854,7 +883,9 @@ export class DevRunController {
854883
if (this.taskRunProcess && !this.taskRunProcess.isBeingKilled) {
855884
try {
856885
const version = this.opts.worker.serverWorker?.version || "unknown";
857-
await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess, version);
886+
await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess, version, {
887+
forceKill: this.discardProcessOnReturn,
888+
});
858889
this.taskRunProcess = undefined;
859890
} catch (error) {
860891
logger.debug("Failed to return task run process to pool during stop", { error });

packages/core/src/v3/sessionStreams/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ export class SessionStreamsAPI implements SessionStreamManager {
5959
this.#getManager().setLastSeqNum(sessionId, io, seqNum);
6060
}
6161

62+
public setMinTimestamp(
63+
sessionId: string,
64+
io: SessionChannelIO,
65+
minTimestamp: number | undefined
66+
): void {
67+
this.#getManager().setMinTimestamp(sessionId, io, minTimestamp);
68+
}
69+
6270
public shiftBuffer(sessionId: string, io: SessionChannelIO): boolean {
6371
return this.#getManager().shiftBuffer(sessionId, io);
6472
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import { describe, expect, it } from "vitest";
2+
import { StandardSessionStreamManager } from "./manager.js";
3+
import type { ApiClient } from "../apiClient/index.js";
4+
import type { SSEStreamPart } from "../apiClient/runStream.js";
5+
6+
// Single-shot mock that mimics S2's long-poll: delivers `records` once via
7+
// `onPart` on the first subscribe call, then keeps the returned async
8+
// iterable OPEN until the abort signal fires. Real S2 keeps the SSE
9+
// connection alive on a long-poll; the manager's `runTail` finally /
10+
// reconnect path only fires when the connection actually closes. Returning
11+
// an empty stream synchronously triggers a tight reconnect loop, so the
12+
// mock parks indefinitely instead.
13+
function singleShotApiClient(
14+
records: Array<{ id: string; chunk: unknown; timestamp: number }>
15+
): ApiClient {
16+
let delivered = false;
17+
return {
18+
async subscribeToSessionStream<T>(
19+
_sessionIdOrExternalId: string,
20+
_io: "out" | "in",
21+
options?: { onPart?: (part: SSEStreamPart<T>) => void; signal?: AbortSignal }
22+
) {
23+
if (!delivered) {
24+
delivered = true;
25+
for (const record of records) {
26+
options?.onPart?.(record as SSEStreamPart<T>);
27+
}
28+
}
29+
const signal = options?.signal;
30+
return (async function* () {
31+
if (signal?.aborted) return;
32+
await new Promise<void>((resolve) => {
33+
if (!signal) {
34+
// No signal — block the stream forever; tests must
35+
// explicitly call `disconnectStream` / `disconnect` to
36+
// unblock.
37+
return;
38+
}
39+
signal.addEventListener("abort", () => resolve(), { once: true });
40+
});
41+
})() as unknown as Awaited<ReturnType<ApiClient["subscribeToSessionStream"]>>;
42+
},
43+
} as unknown as ApiClient;
44+
}
45+
46+
describe("StandardSessionStreamManager — minTimestamp filter", () => {
47+
const sessionId = "session-1";
48+
const io = "in" as const;
49+
50+
it("dispatches records when no filter is set", async () => {
51+
const records = [
52+
{ id: "0", chunk: { kind: "message", payload: { id: "u1" } }, timestamp: 1000 },
53+
{ id: "1", chunk: { kind: "message", payload: { id: "u2" } }, timestamp: 2000 },
54+
];
55+
const manager = new StandardSessionStreamManager(singleShotApiClient(records), "http://localhost");
56+
57+
const first = await manager.once(sessionId, io);
58+
expect(first).toEqual({ ok: true, output: { kind: "message", payload: { id: "u1" } } });
59+
60+
const second = await manager.once(sessionId, io);
61+
expect(second).toEqual({ ok: true, output: { kind: "message", payload: { id: "u2" } } });
62+
63+
manager.disconnectStream(sessionId, io); // stop reconnect loop
64+
manager.disconnect();
65+
});
66+
67+
it("drops records whose timestamp is <= minTimestamp", async () => {
68+
const records = [
69+
{ id: "0", chunk: { kind: "message", payload: { id: "u1" } }, timestamp: 1000 },
70+
{ id: "1", chunk: { kind: "message", payload: { id: "u2" } }, timestamp: 2000 },
71+
{ id: "2", chunk: { kind: "message", payload: { id: "u3" } }, timestamp: 3000 },
72+
];
73+
const manager = new StandardSessionStreamManager(singleShotApiClient(records), "http://localhost");
74+
75+
// Cutoff at 2000 (inclusive: `<=` is dropped). Only u3 should pass.
76+
manager.setMinTimestamp(sessionId, io, 2000);
77+
78+
const passed = await manager.once(sessionId, io, { timeoutMs: 200 });
79+
expect(passed).toEqual({ ok: true, output: { kind: "message", payload: { id: "u3" } } });
80+
81+
manager.disconnectStream(sessionId, io);
82+
manager.disconnect();
83+
});
84+
85+
it("clears the filter when set to undefined", async () => {
86+
const records = [
87+
{ id: "0", chunk: { kind: "message", payload: { id: "u1" } }, timestamp: 1000 },
88+
];
89+
const manager = new StandardSessionStreamManager(singleShotApiClient(records), "http://localhost");
90+
91+
manager.setMinTimestamp(sessionId, io, 5000);
92+
manager.setMinTimestamp(sessionId, io, undefined);
93+
94+
const passed = await manager.once(sessionId, io, { timeoutMs: 200 });
95+
expect(passed).toEqual({ ok: true, output: { kind: "message", payload: { id: "u1" } } });
96+
97+
manager.disconnectStream(sessionId, io);
98+
manager.disconnect();
99+
});
100+
101+
it("filter is per-(sessionId, io) and doesn't bleed across streams", async () => {
102+
const inApi = singleShotApiClient([
103+
{ id: "0", chunk: { kind: "in-record" }, timestamp: 1000 },
104+
]);
105+
const manager = new StandardSessionStreamManager(inApi, "http://localhost");
106+
107+
manager.setMinTimestamp(sessionId, "in", 5000);
108+
109+
// The "out" stream uses the same singleShotApiClient instance — its
110+
// single-shot delivers the same fixture, but the filter doesn't apply
111+
// to "out" so the record passes.
112+
const outResult = await manager.once(sessionId, "out", { timeoutMs: 200 });
113+
expect(outResult).toEqual({ ok: true, output: { kind: "in-record" } });
114+
115+
// The "in" stream is filtered (minTimestamp=5000, record ts=1000): the
116+
// once() call should idle-timeout instead of resolving with the record.
117+
// But the singleShot instance has already delivered to the "out" tail,
118+
// so the "in" tail will get nothing on first connect anyway. Use a
119+
// separate manager+api to keep the assertion crisp.
120+
const inApi2 = singleShotApiClient([
121+
{ id: "0", chunk: { kind: "in-record-2" }, timestamp: 1000 },
122+
]);
123+
const manager2 = new StandardSessionStreamManager(inApi2, "http://localhost");
124+
manager2.setMinTimestamp(sessionId, "in", 5000);
125+
126+
const inResult = await manager2.once(sessionId, "in", { timeoutMs: 100 });
127+
expect(inResult.ok).toBe(false); // filter-dropped → idle timeout
128+
129+
manager.disconnectStream(sessionId, "in");
130+
manager.disconnectStream(sessionId, "out");
131+
manager.disconnect();
132+
manager2.disconnectStream(sessionId, "in");
133+
manager2.disconnect();
134+
});
135+
136+
it("reset() clears all per-stream timestamp filters", async () => {
137+
const records = [
138+
{ id: "0", chunk: { kind: "message", payload: { id: "u1" } }, timestamp: 1000 },
139+
];
140+
const manager = new StandardSessionStreamManager(singleShotApiClient(records), "http://localhost");
141+
142+
manager.setMinTimestamp(sessionId, io, 5000);
143+
manager.reset();
144+
145+
const passed = await manager.once(sessionId, io, { timeoutMs: 200 });
146+
expect(passed).toEqual({ ok: true, output: { kind: "message", payload: { id: "u1" } } });
147+
148+
manager.disconnectStream(sessionId, io);
149+
manager.disconnect();
150+
});
151+
});

packages/core/src/v3/sessionStreams/manager.ts

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ export class StandardSessionStreamManager implements SessionStreamManager {
3636
private onceWaiters = new Map<string, OnceWaiter[]>();
3737
private buffer = new Map<string, unknown[]>();
3838
private tails = new Map<string, TailState>();
39+
// Per-stream lower-bound timestamp filter. When set, records whose
40+
// SSE timestamp is <= the bound are dropped before dispatch — used by
41+
// chat.agent on OOM-retry boot to skip session.in records belonging
42+
// to turns that already completed on the prior attempt. The filter
43+
// is consulted in `runTail`'s `onPart` so the buffer never sees the
44+
// dropped records.
45+
private minTimestamps = new Map<string, number>();
3946
// Keys that were explicitly torn down by `disconnectStream`. The tail's
4047
// `.finally` reconnect path checks this so a long-lived persistent handler
4148
// (e.g. `chat.agent`'s run-level `stopInput.on(...)`) doesn't silently
@@ -164,6 +171,19 @@ export class StandardSessionStreamManager implements SessionStreamManager {
164171
}
165172
}
166173

174+
setMinTimestamp(
175+
sessionId: string,
176+
io: SessionChannelIO,
177+
minTimestamp: number | undefined
178+
): void {
179+
const key = keyFor(sessionId, io);
180+
if (minTimestamp === undefined) {
181+
this.minTimestamps.delete(key);
182+
} else {
183+
this.minTimestamps.set(key, minTimestamp);
184+
}
185+
}
186+
167187
shiftBuffer(sessionId: string, io: SessionChannelIO): boolean {
168188
const key = keyFor(sessionId, io);
169189
const buffered = this.buffer.get(key);
@@ -213,6 +233,7 @@ export class StandardSessionStreamManager implements SessionStreamManager {
213233
reset(): void {
214234
this.disconnect();
215235
this.seqNums.clear();
236+
this.minTimestamps.clear();
216237
this.handlers.clear();
217238

218239
for (const [, waiters] of this.onceWaiters) {
@@ -271,16 +292,40 @@ export class StandardSessionStreamManager implements SessionStreamManager {
271292
const key = keyFor(sessionId, io);
272293
try {
273294
const lastSeq = this.seqNums.get(key);
295+
// Dispatch is driven from `onPart` (not the for-await loop) so each
296+
// record reaches dispatch with its full SSE metadata in scope —
297+
// specifically the timestamp, which we need for the per-stream
298+
// min-timestamp filter. The for-await loop below just drains the
299+
// pipeThrough output to keep the source flowing.
274300
const stream = await this.apiClient.subscribeToSessionStream<unknown>(sessionId, io, {
275301
signal,
276302
baseUrl: this.baseUrl,
277303
timeoutInSeconds: 600,
278304
lastEventId: lastSeq !== undefined ? String(lastSeq) : undefined,
279305
onPart: (part) => {
306+
if (signal.aborted) return;
280307
const seqNum = parseInt(part.id, 10);
281308
if (Number.isFinite(seqNum)) {
282309
this.seqNums.set(key, seqNum);
283310
}
311+
312+
// Min-timestamp filter: drop records older than (or at) the
313+
// bound. Used to skip already-processed records on OOM-retry
314+
// boot.
315+
const minTs = this.minTimestamps.get(key);
316+
if (minTs !== undefined && part.timestamp <= minTs) {
317+
return;
318+
}
319+
320+
let data: unknown = part.chunk;
321+
if (typeof data === "string") {
322+
try {
323+
data = JSON.parse(data);
324+
} catch {
325+
// keep as string
326+
}
327+
}
328+
this.#dispatch(key, data);
284329
},
285330
onComplete: () => {
286331
if (this.debug) {
@@ -294,21 +339,10 @@ export class StandardSessionStreamManager implements SessionStreamManager {
294339
},
295340
});
296341

297-
for await (const record of stream) {
342+
// Drain to keep the pipeThrough flowing. Records were already
343+
// dispatched in `onPart`, so the body here is a no-op.
344+
for await (const _record of stream) {
298345
if (signal.aborted) break;
299-
300-
let data: unknown;
301-
if (typeof record === "string") {
302-
try {
303-
data = JSON.parse(record);
304-
} catch {
305-
data = record;
306-
}
307-
} else {
308-
data = record;
309-
}
310-
311-
this.#dispatch(key, data);
312346
}
313347
} catch (error) {
314348
if (error instanceof Error && error.name === "AbortError") return;

0 commit comments

Comments
 (0)