Skip to content

Commit 896d2f9

Browse files
committed
fix(sdk): cache realtime-stream create response per (runId, key)
`chat.response.write`, `streams.writer`, and other one-shot writers each call `streams.pipe` internally, which constructs a fresh `StreamInstance` and issues `PUT /realtime/v1/streams/:runId/self/:key` on every call. Each PUT does `UPDATE "TaskRun" SET realtimeStreams = realtimeStreams || $1`, so a per-chunk writer loop in a chat-agent turn produces one row-update per chunk and bloats the array with thousands of duplicate entries. Under concurrency this monopolised the writer pool with tuple-lock contention on a single TaskRun row. Cache the `createStream` response in `StandardRealtimeStreamsManager` keyed by `${runId}:${key}`. First call PUTs as before; subsequent calls reuse the cached promise and construct a `StreamsWriterV2` straight from the cached S2 access token / basin / stream name. Net effect for hot-loop writers is one PUT per `(run, stream-key)` for the lifetime of the SDK process (S2 tokens default to a 1-day server-side TTL). Cache evicts on `createStream` failure and on `manager.reset()`.
1 parent 8b53b58 commit 896d2f9

3 files changed

Lines changed: 63 additions & 7 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Cache the `PUT /realtime/v1/streams/:runId/self/:key` response per `(runId, key)` so repeated `streams.pipe()` / `chat.response.write` / `chat.stream.writer` calls reuse the same S2 credentials instead of issuing a fresh PUT (and the `realtimeStreams || $1` array push it triggers on `TaskRun`) for every chunk. Hot-loop writers, most notably `chat.response.write` called per chunk inside a `chat.agent` turn, now do one PUT per `(run, stream-key)` instead of one per write, eliminating the writer-pool lock contention that scaled with the customer's chunk rate. S2 v2 access tokens are scoped to the org basin with a 1-day server-side TTL so reusing them across calls within a single run is safe; the cache evicts on `createStream` failure and on `manager.reset()`.

packages/core/src/v3/realtimeStreams/manager.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { ApiClient } from "../apiClient/index.js";
22
import { ensureAsyncIterable, ensureReadableStream } from "../streams/asyncIterableStream.js";
3+
import { AnyZodFetchOptions } from "../zodfetch.js";
34
import { taskContext } from "../task-context-api.js";
4-
import { StreamInstance } from "./streamInstance.js";
5+
import { CreateStreamResponseLike, StreamInstance } from "./streamInstance.js";
56
import {
67
RealtimeStreamInstance,
78
RealtimeStreamOperationOptions,
@@ -21,8 +22,39 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
2122
abortController: AbortController;
2223
}>();
2324

25+
// Cache of in-flight / resolved `createStream` responses, keyed by
26+
// `${runId}:${key}`. S2 v2 access tokens are scoped to the org basin
27+
// (default 1-day TTL server-side) so reusing them across repeated
28+
// `pipe()` calls for the same `(runId, key)` is safe, and avoids the
29+
// per-call PUT that pushes `streamId` onto `TaskRun.realtimeStreams`,
30+
// which under chat-agent-style hot-loop writers caused row-lock
31+
// contention on the writer DB.
32+
private createStreamCache = new Map<string, Promise<CreateStreamResponseLike>>();
33+
2434
reset(): void {
2535
this.activeStreams.clear();
36+
this.createStreamCache.clear();
37+
}
38+
39+
private getCachedCreateStream(
40+
runId: string,
41+
key: string,
42+
requestOptions: AnyZodFetchOptions | undefined
43+
): Promise<CreateStreamResponseLike> {
44+
const cacheKey = `${runId}:${key}`;
45+
const cached = this.createStreamCache.get(cacheKey);
46+
if (cached) return cached;
47+
48+
const promise = this.apiClient.createStream(runId, "self", key, requestOptions);
49+
this.createStreamCache.set(cacheKey, promise);
50+
// Evict on failure so the next call retries instead of returning a
51+
// poisoned cache entry forever.
52+
promise.catch(() => {
53+
if (this.createStreamCache.get(cacheKey) === promise) {
54+
this.createStreamCache.delete(cacheKey);
55+
}
56+
});
57+
return promise;
2658
}
2759

2860
public pipe<T>(
@@ -58,6 +90,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
5890
requestOptions: options?.requestOptions,
5991
target: options?.target,
6092
debug: this.debug,
93+
createStream: () => this.getCachedCreateStream(runId, key, options?.requestOptions),
6194
});
6295

6396
// Register this stream

packages/core/src/v3/realtimeStreams/streamInstance.ts

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@ import { StreamsWriterV1 } from "./streamsWriterV1.js";
55
import { StreamsWriterV2 } from "./streamsWriterV2.js";
66
import { StreamsWriter, StreamWriteResult } from "./types.js";
77

8+
export type CreateStreamResponseLike = {
9+
version: string;
10+
headers?: Record<string, string>;
11+
};
12+
813
export type StreamInstanceOptions<T> = {
914
apiClient: ApiClient;
1015
baseUrl: string;
@@ -15,6 +20,14 @@ export type StreamInstanceOptions<T> = {
1520
requestOptions?: AnyZodFetchOptions;
1621
target?: "self" | "parent" | "root" | string;
1722
debug?: boolean;
23+
/**
24+
* Optional override for the create-stream call. Defaults to
25+
* `apiClient.createStream(runId, "self", key, requestOptions)`. The
26+
* manager passes a cached version so repeated `pipe()` calls for the
27+
* same `(runId, key)` share a single PUT instead of hammering the
28+
* server on every chunk.
29+
*/
30+
createStream?: () => Promise<CreateStreamResponseLike>;
1831
};
1932

2033
type StreamsWriterInstance<T> = StreamsWriterV1<T> | StreamsWriterV2<T>;
@@ -27,12 +40,17 @@ export class StreamInstance<T> implements StreamsWriter {
2740
}
2841

2942
private async initializeWriter(): Promise<StreamsWriterInstance<T>> {
30-
const { version, headers } = await this.options.apiClient.createStream(
31-
this.options.runId,
32-
"self",
33-
this.options.key,
34-
this.options?.requestOptions
35-
);
43+
const createStreamFn =
44+
this.options.createStream ??
45+
(() =>
46+
this.options.apiClient.createStream(
47+
this.options.runId,
48+
"self",
49+
this.options.key,
50+
this.options?.requestOptions
51+
));
52+
53+
const { version, headers } = await createStreamFn();
3654

3755
const parsedResponse = parseCreateStreamResponse(version, headers);
3856

0 commit comments

Comments
 (0)