Skip to content

Commit a434e4e

Browse files
committed
fix(sdk,core): cache realtime-stream credentials per slot with refresh on writer failure
Hot-loop writers (`streams.writer` / `streams.pipe` on the run-scoped side, `chat.response.write` / `chat.stream.*` on the session side) were issuing a fresh PUT to mint S2 credentials for every chunk. On run streams, each PUT also pushed the streamId onto `TaskRun.realtimeStreams`, so a chat-agent turn writing N chunks produced N PUTs and N duplicate array pushes against the same row. The SDK now caches the initialize response per cache slot — `(runId, key)` for run streams, the session id for session streams. First call PUTs as before; subsequent calls reuse the cached promise. Hot-loop writers do one PUT per slot for the lifetime of the cache. S2 access tokens have a 1-day TTL. If a writer's `wait()` rejects (auth error, expired token, network blip), the cache evicts the matching slot so the next call re-PUTs and mints fresh credentials, identity-checked so a concurrent caller's fresh promise isn't accidentally cleared. `streams.pipe / writer / append / read` called inside a `chat.agent` run now logs a one-time warning pointing at `chat.response.write` / `chat.stream.*` — `streams.*` is run-scoped and isn't visible on the chat session. Drops the run-stream guidance from the ai-chat docs.
1 parent 6c9f1f1 commit a434e4e

10 files changed

Lines changed: 576 additions & 21 deletions

File tree

packages/core/src/v3/realtime-streams-api.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ export const realtimeStreams = RealtimeStreamsAPI.getInstance();
66

77
export * from "./realtimeStreams/types.js";
88
export { SessionStreamInstance } from "./realtimeStreams/sessionStreamInstance.js";
9-
export type { SessionStreamInstanceOptions } from "./realtimeStreams/sessionStreamInstance.js";
9+
export type {
10+
SessionStreamInstanceOptions,
11+
InitializeSessionStreamResponseLike,
12+
} from "./realtimeStreams/sessionStreamInstance.js";
1013
export {
1114
trimSessionStream,
1215
writeSessionControlRecord,

packages/core/src/v3/realtimeStreams/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ import {
1010
// `SessionOutputChannel.pipe` / `.writer` can construct it without reaching
1111
// into the core package's internals.
1212
export { SessionStreamInstance } from "./sessionStreamInstance.js";
13-
export type { SessionStreamInstanceOptions } from "./sessionStreamInstance.js";
13+
export type {
14+
SessionStreamInstanceOptions,
15+
InitializeSessionStreamResponseLike,
16+
} from "./sessionStreamInstance.js";
1417
export {
1518
trimSessionStream,
1619
writeSessionControlRecord,
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
import type { ApiClient } from "../apiClient/index.js";
3+
import { StandardRealtimeStreamsManager } from "./manager.js";
4+
5+
// The cache lives on a private method to keep `pipe()` callers from having
6+
// to thread cache concerns. Tests exercise it via bracket-notation to keep
7+
// the assertions tight on cache contracts and avoid spinning up real
8+
// `StreamsWriterV1`/`StreamsWriterV2` infrastructure (HTTP requests, S2
9+
// connections) for what is purely an in-memory dedup check.
10+
type GetCached = (
11+
runId: string,
12+
key: string,
13+
requestOptions?: undefined
14+
) => Promise<{ version: string; headers?: Record<string, string> }>;
15+
16+
function getCached(manager: StandardRealtimeStreamsManager, runId: string, key: string) {
17+
return (manager as unknown as { getCachedCreateStream: GetCached }).getCachedCreateStream(
18+
runId,
19+
key
20+
);
21+
}
22+
23+
function makeApiClient(impl: () => Promise<{ version: string; headers?: Record<string, string> }>) {
24+
const spy = vi.fn(impl);
25+
const client = { createStream: spy } as unknown as ApiClient;
26+
return { client, spy };
27+
}
28+
29+
describe("StandardRealtimeStreamsManager createStream cache", () => {
30+
it("dedupes repeated calls for the same (runId, key)", async () => {
31+
const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} }));
32+
const manager = new StandardRealtimeStreamsManager(client, "http://localhost");
33+
34+
const p1 = getCached(manager, "run-1", "chat");
35+
const p2 = getCached(manager, "run-1", "chat");
36+
37+
expect(p1).toBe(p2);
38+
expect(spy).toHaveBeenCalledTimes(1);
39+
await Promise.all([p1, p2]);
40+
expect(spy).toHaveBeenCalledTimes(1);
41+
});
42+
43+
it("issues a separate PUT for each distinct stream key on the same run", async () => {
44+
const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} }));
45+
const manager = new StandardRealtimeStreamsManager(client, "http://localhost");
46+
47+
await Promise.all([
48+
getCached(manager, "run-1", "chat"),
49+
getCached(manager, "run-1", "tool-output"),
50+
]);
51+
52+
expect(spy).toHaveBeenCalledTimes(2);
53+
expect(spy).toHaveBeenNthCalledWith(1, "run-1", "self", "chat", undefined);
54+
expect(spy).toHaveBeenNthCalledWith(2, "run-1", "self", "tool-output", undefined);
55+
});
56+
57+
it("issues a separate PUT for each distinct run, even with the same key", async () => {
58+
const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} }));
59+
const manager = new StandardRealtimeStreamsManager(client, "http://localhost");
60+
61+
await Promise.all([
62+
getCached(manager, "run-1", "chat"),
63+
getCached(manager, "run-2", "chat"),
64+
]);
65+
66+
expect(spy).toHaveBeenCalledTimes(2);
67+
});
68+
69+
it("evicts on failure so the next call retries instead of returning a poisoned entry", async () => {
70+
const spy = vi
71+
.fn()
72+
.mockRejectedValueOnce(new Error("boom"))
73+
.mockResolvedValueOnce({ version: "v1", headers: {} });
74+
const client = { createStream: spy } as unknown as ApiClient;
75+
const manager = new StandardRealtimeStreamsManager(client, "http://localhost");
76+
77+
await expect(getCached(manager, "run-1", "chat")).rejects.toThrow("boom");
78+
79+
const retried = await getCached(manager, "run-1", "chat");
80+
81+
expect(retried).toEqual({ version: "v1", headers: {} });
82+
expect(spy).toHaveBeenCalledTimes(2);
83+
});
84+
85+
it("reset() clears cached entries so the next call re-PUTs", async () => {
86+
const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} }));
87+
const manager = new StandardRealtimeStreamsManager(client, "http://localhost");
88+
89+
await getCached(manager, "run-1", "chat");
90+
expect(spy).toHaveBeenCalledTimes(1);
91+
92+
manager.reset();
93+
94+
await getCached(manager, "run-1", "chat");
95+
expect(spy).toHaveBeenCalledTimes(2);
96+
});
97+
98+
it("evictCreateStreamIfStale clears the matching entry so the next call re-PUTs", async () => {
99+
const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} }));
100+
const manager = new StandardRealtimeStreamsManager(client, "http://localhost");
101+
102+
// Prime the cache and capture which promise was stored.
103+
const cachedPromise = getCached(manager, "run-1", "chat");
104+
await cachedPromise;
105+
expect(spy).toHaveBeenCalledTimes(1);
106+
107+
// Simulate the reactive invalidation path that `pipe()` runs when a
108+
// writer's `wait()` rejects.
109+
(
110+
manager as unknown as {
111+
evictCreateStreamIfStale: (
112+
runId: string,
113+
key: string,
114+
expected: Promise<unknown>
115+
) => void;
116+
}
117+
).evictCreateStreamIfStale("run-1", "chat", cachedPromise);
118+
119+
await getCached(manager, "run-1", "chat");
120+
expect(spy).toHaveBeenCalledTimes(2);
121+
});
122+
123+
it("evictCreateStreamIfStale is a no-op when the cache holds a different promise", async () => {
124+
const { client, spy } = makeApiClient(async () => ({ version: "v1", headers: {} }));
125+
const manager = new StandardRealtimeStreamsManager(client, "http://localhost");
126+
127+
const original = getCached(manager, "run-1", "chat");
128+
await original;
129+
130+
// A different promise (e.g. from a concurrent caller that already
131+
// refreshed) shouldn't trigger eviction.
132+
const stalePromise = Promise.resolve({ version: "v1", headers: {} });
133+
(
134+
manager as unknown as {
135+
evictCreateStreamIfStale: (
136+
runId: string,
137+
key: string,
138+
expected: Promise<unknown>
139+
) => void;
140+
}
141+
).evictCreateStreamIfStale("run-1", "chat", stalePromise);
142+
143+
// Cache should still hold the original entry; next call is a hit.
144+
await getCached(manager, "run-1", "chat");
145+
expect(spy).toHaveBeenCalledTimes(1);
146+
});
147+
});

packages/core/src/v3/realtimeStreams/manager.ts

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { ApiClient } from "../apiClient/index.js";
22
import { ensureAsyncIterable, ensureReadableStream } from "../streams/asyncIterableStream.js";
3+
import { AnyZodFetchOptions } from "../zodfetch.js";
34
import { taskContext } from "../task-context-api.js";
4-
import { StreamInstance } from "./streamInstance.js";
5+
import { CreateStreamResponseLike, StreamInstance } from "./streamInstance.js";
56
import {
67
RealtimeStreamInstance,
78
RealtimeStreamOperationOptions,
@@ -21,8 +22,60 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
2122
abortController: AbortController;
2223
}>();
2324

25+
// Cache of in-flight / resolved `createStream` responses, keyed by
26+
// `${runId}:${key}`. S2 v2 access tokens are scoped to the org basin
27+
// (default 1-day TTL server-side) so reusing them across repeated
28+
// `pipe()` calls for the same `(runId, key)` is safe, and avoids the
29+
// per-call PUT that pushes `streamId` onto `TaskRun.realtimeStreams`,
30+
// which under chat-agent-style hot-loop writers caused row-lock
31+
// contention on the writer DB.
32+
private createStreamCache = new Map<string, Promise<CreateStreamResponseLike>>();
33+
2434
reset(): void {
2535
this.activeStreams.clear();
36+
this.createStreamCache.clear();
37+
}
38+
39+
private getCachedCreateStream(
40+
runId: string,
41+
key: string,
42+
requestOptions: AnyZodFetchOptions | undefined
43+
): Promise<CreateStreamResponseLike> {
44+
const cacheKey = `${runId}:${key}`;
45+
const cached = this.createStreamCache.get(cacheKey);
46+
if (cached) {
47+
return cached;
48+
}
49+
50+
const promise = this.apiClient.createStream(runId, "self", key, requestOptions);
51+
this.createStreamCache.set(cacheKey, promise);
52+
// Evict on failure so the next call retries instead of returning a
53+
// poisoned cache entry forever.
54+
promise.catch((err) => {
55+
if (this.createStreamCache.get(cacheKey) === promise) {
56+
this.createStreamCache.delete(cacheKey);
57+
}
58+
});
59+
return promise;
60+
}
61+
62+
/**
63+
* Reactive invalidation: a writer's `wait()` rejecting can mean the
64+
* cached S2 credentials have gone stale (expired token, revoked
65+
* access, basin retired), so evict the cached `createStream` response
66+
* for `(runId, key)` and let the next `pipe()` re-PUT to mint fresh
67+
* credentials. Compare by identity so a fresh promise installed by a
68+
* concurrent caller isn't accidentally cleared.
69+
*/
70+
private evictCreateStreamIfStale(
71+
runId: string,
72+
key: string,
73+
expected: Promise<CreateStreamResponseLike>
74+
): void {
75+
const cacheKey = `${runId}:${key}`;
76+
if (this.createStreamCache.get(cacheKey) === expected) {
77+
this.createStreamCache.delete(cacheKey);
78+
}
2679
}
2780

2881
public pipe<T>(
@@ -48,6 +101,15 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
48101
? AbortSignal.any?.([options.signal, abortController.signal]) ?? abortController.signal
49102
: abortController.signal;
50103

104+
// Capture which cached promise this writer uses so reactive
105+
// invalidation below evicts only if the cache still holds it (a
106+
// concurrent caller may have already refreshed it).
107+
const activeCreatePromise = this.getCachedCreateStream(
108+
runId,
109+
key,
110+
options?.requestOptions
111+
);
112+
51113
const streamInstance = new StreamInstance({
52114
apiClient: this.apiClient,
53115
baseUrl: this.baseUrl,
@@ -58,14 +120,29 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
58120
requestOptions: options?.requestOptions,
59121
target: options?.target,
60122
debug: this.debug,
123+
createStream: () => activeCreatePromise,
61124
});
62125

63126
// Register this stream
64127
const streamInfo = { wait: () => streamInstance.wait(), abortController };
65128
this.activeStreams.add(streamInfo);
66129

67-
// Clean up when stream completes
68-
streamInstance.wait().finally(() => this.activeStreams.delete(streamInfo));
130+
// Single internal chain that handles activeStreams cleanup AND
131+
// reactive invalidation. On rejection we evict the cached
132+
// `createStream` entry so the next pipe() for the same `(runId, key)`
133+
// re-PUTs and recovers (e.g. when a cached S2 access token expired
134+
// mid-process). Customer awaiters still observe the rejection via
135+
// the returned `wait()`; this chain just keeps the cleanup path
136+
// from surfacing as unhandled.
137+
streamInstance.wait().then(
138+
() => {
139+
this.activeStreams.delete(streamInfo);
140+
},
141+
(err) => {
142+
this.evictCreateStreamIfStale(runId, key, activeCreatePromise);
143+
this.activeStreams.delete(streamInfo);
144+
}
145+
);
69146

70147
return {
71148
wait: () => streamInstance.wait(),

packages/core/src/v3/realtimeStreams/sessionStreamInstance.ts

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ import { AnyZodFetchOptions } from "../zodfetch.js";
44
import { StreamsWriterV2 } from "./streamsWriterV2.js";
55
import { StreamsWriter, StreamWriteResult } from "./types.js";
66

7+
export type InitializeSessionStreamResponseLike = {
8+
headers?: Record<string, string>;
9+
};
10+
711
export type SessionStreamInstanceOptions<T> = {
812
apiClient: ApiClient;
913
baseUrl: string;
@@ -13,6 +17,14 @@ export type SessionStreamInstanceOptions<T> = {
1317
signal?: AbortSignal;
1418
requestOptions?: AnyZodFetchOptions;
1519
debug?: boolean;
20+
/**
21+
* Optional override for the initialize-session-stream call. Defaults to
22+
* `apiClient.initializeSessionStream(sessionId, io, requestOptions)`. The
23+
* channel passes a cached version so repeated `pipe()` / `writer()`
24+
* calls for the same `(sessionId, io)` share a single PUT instead of
25+
* hammering the server on every chunk.
26+
*/
27+
initializeSession?: () => Promise<InitializeSessionStreamResponseLike>;
1628
};
1729

1830
/**
@@ -31,11 +43,16 @@ export class SessionStreamInstance<T> implements StreamsWriter {
3143
}
3244

3345
private async initializeWriter(): Promise<StreamsWriterV2<T>> {
34-
const response = await this.options.apiClient.initializeSessionStream(
35-
this.options.sessionId,
36-
this.options.io,
37-
this.options?.requestOptions
38-
);
46+
const initializeFn =
47+
this.options.initializeSession ??
48+
(() =>
49+
this.options.apiClient.initializeSessionStream(
50+
this.options.sessionId,
51+
this.options.io,
52+
this.options?.requestOptions
53+
));
54+
55+
const response = await initializeFn();
3956

4057
const headers = response.headers ?? {};
4158
const accessToken = headers["x-s2-access-token"];

packages/core/src/v3/realtimeStreams/streamInstance.ts

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@ import { StreamsWriterV1 } from "./streamsWriterV1.js";
55
import { StreamsWriterV2 } from "./streamsWriterV2.js";
66
import { StreamsWriter, StreamWriteResult } from "./types.js";
77

8+
export type CreateStreamResponseLike = {
9+
version: string;
10+
headers?: Record<string, string>;
11+
};
12+
813
export type StreamInstanceOptions<T> = {
914
apiClient: ApiClient;
1015
baseUrl: string;
@@ -15,6 +20,14 @@ export type StreamInstanceOptions<T> = {
1520
requestOptions?: AnyZodFetchOptions;
1621
target?: "self" | "parent" | "root" | string;
1722
debug?: boolean;
23+
/**
24+
* Optional override for the create-stream call. Defaults to
25+
* `apiClient.createStream(runId, "self", key, requestOptions)`. The
26+
* manager passes a cached version so repeated `pipe()` calls for the
27+
* same `(runId, key)` share a single PUT instead of hammering the
28+
* server on every chunk.
29+
*/
30+
createStream?: () => Promise<CreateStreamResponseLike>;
1831
};
1932

2033
type StreamsWriterInstance<T> = StreamsWriterV1<T> | StreamsWriterV2<T>;
@@ -27,12 +40,17 @@ export class StreamInstance<T> implements StreamsWriter {
2740
}
2841

2942
private async initializeWriter(): Promise<StreamsWriterInstance<T>> {
30-
const { version, headers } = await this.options.apiClient.createStream(
31-
this.options.runId,
32-
"self",
33-
this.options.key,
34-
this.options?.requestOptions
35-
);
43+
const createStreamFn =
44+
this.options.createStream ??
45+
(() =>
46+
this.options.apiClient.createStream(
47+
this.options.runId,
48+
"self",
49+
this.options.key,
50+
this.options?.requestOptions
51+
));
52+
53+
const { version, headers } = await createStreamFn();
3654

3755
const parsedResponse = parseCreateStreamResponse(version, headers);
3856

0 commit comments

Comments
 (0)