diff --git a/.changeset/configurable-stream-flush-interval.md b/.changeset/configurable-stream-flush-interval.md new file mode 100644 index 0000000000..c0b6e444c5 --- /dev/null +++ b/.changeset/configurable-stream-flush-interval.md @@ -0,0 +1,8 @@ +--- +"@workflow/world": patch +"@workflow/core": patch +"@workflow/world-local": patch +"@workflow/world-postgres": patch +--- + +Add `streamFlushIntervalMs` option to `Streamer` interface, optional for worlds to allow overwriting the default of 10ms in low-latency environments. diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 4f011f069d..1c44df63f6 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -522,7 +522,7 @@ export class WorkflowServerWritableStream extends WritableStream { for (const w of currentWaiters) w.reject(err); } ); - }, STREAM_FLUSH_INTERVAL_MS); + }, world.streamFlushIntervalMs ?? STREAM_FLUSH_INTERVAL_MS); }; super({ diff --git a/packages/core/src/writable-stream.test.ts b/packages/core/src/writable-stream.test.ts index d07500d589..7514374a56 100644 --- a/packages/core/src/writable-stream.test.ts +++ b/packages/core/src/writable-stream.test.ts @@ -11,6 +11,7 @@ describe('WorkflowServerWritableStream', () => { writeToStream: ReturnType; writeToStreamMulti: ReturnType; closeStream: ReturnType; + streamFlushIntervalMs?: number; }; beforeEach(async () => { @@ -248,4 +249,52 @@ describe('WorkflowServerWritableStream', () => { ); }); }); + + describe('streamFlushIntervalMs', () => { + it('should use world.streamFlushIntervalMs when set to 0 (immediate flush)', async () => { + mockWorld.streamFlushIntervalMs = 0; + + const stream = new WorkflowServerWritableStream('s', 'run-1'); + const writer = stream.getWriter(); + + // With interval=0, the flush fires on the next microtask tick via setTimeout(fn, 0) + await writer.write(new Uint8Array([1])); + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); + + await writer.close(); + }); + + it('should fall back to default interval when streamFlushIntervalMs is undefined', async () => { + // mockWorld has no streamFlushIntervalMs set — uses default 10ms + delete mockWorld.streamFlushIntervalMs; + + const stream = new WorkflowServerWritableStream('s', 'run-1'); + const writer = stream.getWriter(); + + await writer.write(new Uint8Array([1])); + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); + + await writer.close(); + }); + + it('should respect a custom non-zero flush interval', async () => { + mockWorld.streamFlushIntervalMs = 50; + + const stream = new WorkflowServerWritableStream('s', 'run-1'); + const writer = stream.getWriter(); + + // Start a write — the flush is scheduled 50ms from now + const writePromise = writer.write(new Uint8Array([1])); + + // After 10ms (the old default), data should NOT have flushed yet + await new Promise((r) => setTimeout(r, 10)); + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + + // Wait for the write to complete (will resolve after the 50ms timer fires) + await writePromise; + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); + + await writer.close(); + }); + }); }); diff --git a/packages/world-local/src/config.ts b/packages/world-local/src/config.ts index 7f686d879f..99431be3e1 100644 --- a/packages/world-local/src/config.ts +++ b/packages/world-local/src/config.ts @@ -22,6 +22,11 @@ export type Config = { * `.workflow-data` directory. */ tag?: string; + /** + * Override the flush interval (in ms) for buffered stream writes. + * Default is 10ms. Set to 0 for immediate flushing. + */ + streamFlushIntervalMs?: number; }; export const config = once(() => { diff --git a/packages/world-local/src/index.ts b/packages/world-local/src/index.ts index 9e0d242222..ccc28f8188 100644 --- a/packages/world-local/src/index.ts +++ b/packages/world-local/src/index.ts @@ -65,11 +65,13 @@ export function createLocalWorld(args?: Partial): LocalWorld { const storage = createStorage(mergedConfig.dataDir, tag); return { ...queue, - ...storage, - ...instrumentObject( - 'world.streams', - createStreamer(mergedConfig.dataDir, tag) - ), + ...createStorage(mergedConfig.dataDir, tag), + ...instrumentObject('world.streams', { + ...createStreamer(mergedConfig.dataDir, tag), + ...(mergedConfig.streamFlushIntervalMs !== undefined && { + streamFlushIntervalMs: mergedConfig.streamFlushIntervalMs, + }), + }), async start() { await initDataDir(mergedConfig.dataDir); await reenqueueActiveRuns(storage.runs, queue.queue, 'world-local'); diff --git a/packages/world-postgres/src/config.ts b/packages/world-postgres/src/config.ts index 02473e89ac..ca778914ea 100644 --- a/packages/world-postgres/src/config.ts +++ b/packages/world-postgres/src/config.ts @@ -7,4 +7,9 @@ type PgConnectionConfig = export type PostgresWorldConfig = PgConnectionConfig & { jobPrefix?: string; queueConcurrency?: number; + /** + * Override the flush interval (in ms) for buffered stream writes. + * Default is 10ms. Set to 0 for immediate flushing. + */ + streamFlushIntervalMs?: number; }; diff --git a/packages/world-postgres/src/index.ts b/packages/world-postgres/src/index.ts index bdc2eceeeb..62e1530138 100644 --- a/packages/world-postgres/src/index.ts +++ b/packages/world-postgres/src/index.ts @@ -60,6 +60,9 @@ export function createWorld( ...storage, ...streamer, ...queue, + ...(config.streamFlushIntervalMs !== undefined && { + streamFlushIntervalMs: config.streamFlushIntervalMs, + }), async start() { await queue.start(); await reenqueueActiveRuns(storage.runs, queue.queue, 'world-postgres'); diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index 77b8cd04e6..c9628a9685 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -30,6 +30,19 @@ import type { } from './steps.js'; export interface Streamer { + /** + * Override the default flush interval (in milliseconds) for buffered stream writes. + * Chunks are accumulated in a buffer and flushed together on this interval. + * + * The default is 10ms, which is appropriate for HTTP-based backends where + * each flush is a network round-trip. For backends with sub-millisecond writes + * (e.g., Redis, local filesystem), a lower value (or 0 for immediate flushing) reduces + * end-to-end stream latency. + * + * Not supported by all worlds. + */ + streamFlushIntervalMs?: number; + writeToStream( name: string, runId: string,