Skip to content

Commit 671b137

Browse files
committed
fix(supervisor): wide-event review fixes + noisy-routes flag + socket lifecycle
1 parent 570d648 commit 671b137

7 files changed

Lines changed: 56 additions & 38 deletions

File tree

apps/supervisor/src/env.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,10 @@ const Env = z
261261
// line per natural unit of work (dequeue iteration, HTTP request, socket
262262
// lifecycle). High-QPS hotpath, so the kill switch must be honoured.
263263
TRIGGER_WIDE_EVENTS_ENABLED: BoolEnv.default(false),
264+
// When true, also emit wide events for high-frequency HTTP routes
265+
// (heartbeat, snapshots-since, logs/debug). Off in prod to keep event
266+
// volume manageable; on in test environments for full-fidelity debugging.
267+
TRIGGER_WIDE_EVENTS_NOISY_ROUTES: BoolEnv.default(false),
264268
})
265269
.superRefine((data, ctx) => {
266270
if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_METADATA_URL) {

apps/supervisor/src/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class ManagedSupervisor {
6363
env: { nodeId: env.TRIGGER_WORKER_INSTANCE_NAME },
6464
enabled: env.TRIGGER_WIDE_EVENTS_ENABLED,
6565
};
66+
private readonly wideEventsNoisyRoutes = env.TRIGGER_WIDE_EVENTS_NOISY_ROUTES;
6667

6768
constructor() {
6869
const {
@@ -260,7 +261,7 @@ class ManagedSupervisor {
260261
...this.wideEventOpts,
261262
traceparent,
262263
setup: (state) => {
263-
setMeta(state, "run_id", message.run.id);
264+
setMeta(state, "run_id", message.run.friendlyId);
264265
setMeta(state, "env_id", message.environment.id);
265266
setMeta(state, "org_id", message.organization.id);
266267
setMeta(state, "project_id", message.project.id);
@@ -472,6 +473,7 @@ class ManagedSupervisor {
472473
computeManager: this.computeManager,
473474
tracing: this.tracing,
474475
wideEventOpts: this.wideEventOpts,
476+
wideEventsNoisyRoutes: this.wideEventsNoisyRoutes,
475477
});
476478

477479
this.workloadServer.on("runConnected", this.onRunConnected.bind(this));

apps/supervisor/src/wideEvents/emit.test.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ describe("emit", () => {
4343
expect(out).not.toHaveProperty("trace_id");
4444
expect(out).not.toHaveProperty("version");
4545
expect(out).not.toHaveProperty("commit_sha");
46-
expect(out).not.toHaveProperty("instance_id");
4746
expect(out).not.toHaveProperty("error.code");
4847
});
4948

apps/supervisor/src/wideEvents/emit.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ export function emit(state: State): void {
2626
appendIfSet(out, "commit_sha", state.commitSha);
2727
appendIfSet(out, "region", state.region);
2828
appendIfSet(out, "node_id", state.nodeId);
29-
appendIfSet(out, "instance_id", state.instanceId);
3029

3130
out.ok = state.ok;
3231
if (state.statusCode !== 0) out.status = state.statusCode;

apps/supervisor/src/wideEvents/middleware.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ export async function runWideEvent<T>(
5757
});
5858
if (opts.route) state.extras.route = opts.route;
5959
if (opts.method) state.extras.method = opts.method;
60-
if (opts.setup) opts.setup(state);
6160

6261
const start = performance.now();
6362
try {
63+
if (opts.setup) opts.setup(state);
6464
const result = await wideEventStorage.run(state, () => Promise.resolve(fn()));
6565
state.durationMs = Math.round(performance.now() - start);
6666
if (finalize) finalize(state);

apps/supervisor/src/wideEvents/state.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ export type State = {
2121
commitSha?: string;
2222
region?: string;
2323
nodeId?: string;
24-
instanceId?: string;
2524

2625
// Caller-attached opaque metadata, flattened to `meta.<key>` on emit.
2726
meta: Record<string, string>;

apps/supervisor/src/workloadServer/index.ts

Lines changed: 48 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ type WorkloadServerOptions = {
7676
computeManager?: ComputeWorkloadManager;
7777
tracing?: OtlpTraceService;
7878
wideEventOpts: WideEventOptions;
79+
/** When true, high-frequency HTTP routes also emit wide events. */
80+
wideEventsNoisyRoutes: boolean;
7981
};
8082

8183
export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
@@ -84,6 +86,7 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
8486

8587
private readonly logger = new SimpleStructuredLogger("workload-server");
8688
private readonly wideEventOpts: WideEventOptions;
89+
private readonly wideEventsNoisyRoutes: boolean;
8790

8891
private readonly httpServer: HttpServer;
8992
private readonly websocketServer: Namespace<
@@ -114,6 +117,7 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
114117
this.workerClient = opts.workerClient;
115118
this.checkpointClient = opts.checkpointClient;
116119
this.wideEventOpts = opts.wideEventOpts;
120+
this.wideEventsNoisyRoutes = opts.wideEventsNoisyRoutes;
117121

118122
if (opts.computeManager?.snapshotsEnabled) {
119123
this.snapshotService = new ComputeSnapshotService({
@@ -171,16 +175,25 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
171175
* `traceparent` and `x-request-id` from `req.headers`, attaches `run_id` /
172176
* `snapshot_id` / `deployment_id` meta from `params` when present, and
173177
* captures the response status from `res.statusCode` after `fn` returns.
178+
*
179+
* Pass `highFrequency: true` for noisy routes (heartbeat, polling). Those
180+
* still go through the wrapper but only emit when
181+
* `TRIGGER_WIDE_EVENTS_NOISY_ROUTES` is on, so prod can keep them dark
182+
* while test envs capture full-fidelity traffic for debugging.
174183
*/
175184
private wideRoute<T>(
176185
ctx: { req: IncomingMessage; res: ServerResponse; params?: unknown },
177186
route: string,
178187
method: string,
179-
fn: () => Promise<T> | T
188+
fn: () => Promise<T> | T,
189+
routeOpts: { highFrequency?: boolean } = {}
180190
): Promise<T> {
191+
const enabled =
192+
this.wideEventOpts.enabled && (!routeOpts.highFrequency || this.wideEventsNoisyRoutes);
181193
return runWideEvent(
182194
{
183195
...this.wideEventOpts,
196+
enabled,
184197
route,
185198
method,
186199
traceparent: this.headerValueFromRequest(ctx.req, "traceparent"),
@@ -312,7 +325,8 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
312325
reply.json({
313326
ok: true,
314327
} satisfies WorkloadHeartbeatResponseBody);
315-
}
328+
},
329+
{ highFrequency: true }
316330
),
317331
}
318332
)
@@ -482,7 +496,8 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
482496
reply.json(
483497
sinceSnapshotResponse.data satisfies WorkloadRunSnapshotsSinceResponseBody
484498
);
485-
}
499+
},
500+
{ highFrequency: true }
486501
),
487502
}
488503
)
@@ -536,7 +551,8 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
536551
body,
537552
this.runnerIdFromRequest(req)
538553
);
539-
}
554+
},
555+
{ highFrequency: true }
540556
),
541557
});
542558
} else {
@@ -549,7 +565,8 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
549565
"POST",
550566
async () => {
551567
ctx.reply.empty(204);
552-
}
568+
},
569+
{ highFrequency: true }
553570
),
554571
});
555572
}
@@ -640,6 +657,26 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
640657
};
641658
};
642659

660+
const emitSocketLifecycle = (
661+
event: "run_connected" | "run_disconnected",
662+
friendlyId: string,
663+
disconnectReason?: string
664+
) => {
665+
emitOneShot({
666+
...this.wideEventOpts,
667+
populate: (state) => {
668+
state.extras.event = event;
669+
setMeta(state, "run_id", friendlyId);
670+
if (socket.data.deploymentId) {
671+
setMeta(state, "deployment_id", socket.data.deploymentId);
672+
}
673+
if (socket.data.runnerId) setMeta(state, "runner_id", socket.data.runnerId);
674+
state.extras.socket_id = socket.id;
675+
if (disconnectReason) state.extras.disconnect_reason = disconnectReason;
676+
},
677+
});
678+
};
679+
643680
const runConnected = (friendlyId: string) => {
644681
socketLogger.debug("runConnected", { ...getSocketMetadata() });
645682

@@ -650,20 +687,22 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
650687
newRunId: friendlyId,
651688
oldRunId: socket.data.runFriendlyId,
652689
});
653-
runDisconnected(socket.data.runFriendlyId);
690+
runDisconnected(socket.data.runFriendlyId, "socket_run_replaced");
654691
}
655692

656693
this.runSockets.set(friendlyId, socket);
657694
this.emit("runConnected", { run: { friendlyId } });
658695
socket.data.runFriendlyId = friendlyId;
696+
emitSocketLifecycle("run_connected", friendlyId);
659697
};
660698

661-
const runDisconnected = (friendlyId: string) => {
699+
const runDisconnected = (friendlyId: string, reason: string) => {
662700
socketLogger.debug("runDisconnected", { ...getSocketMetadata() });
663701

664702
this.runSockets.delete(friendlyId);
665703
this.emit("runDisconnected", { run: { friendlyId } });
666704
socket.data.runFriendlyId = undefined;
705+
emitSocketLifecycle("run_disconnected", friendlyId, reason);
667706
};
668707

669708
socketLogger.debug("wsServer socket connected", { ...getSocketMetadata() });
@@ -681,7 +720,7 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
681720
});
682721

683722
if (socket.data.runFriendlyId) {
684-
runDisconnected(socket.data.runFriendlyId);
723+
runDisconnected(socket.data.runFriendlyId, `socket_disconnecting:${reason}`);
685724
}
686725
});
687726

@@ -711,18 +750,6 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
711750

712751
try {
713752
runConnected(message.run.friendlyId);
714-
emitOneShot({
715-
...this.wideEventOpts,
716-
populate: (state) => {
717-
state.extras.event = "run:start";
718-
setMeta(state, "run_id", message.run.friendlyId);
719-
if (socket.data.deploymentId) {
720-
setMeta(state, "deployment_id", socket.data.deploymentId);
721-
}
722-
if (socket.data.runnerId) setMeta(state, "runner_id", socket.data.runnerId);
723-
state.extras.socket_id = socket.id;
724-
},
725-
});
726753
} catch (error) {
727754
log.error("run:start error", { error });
728755
}
@@ -738,22 +765,10 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
738765
log.debug("Handling run:stop");
739766

740767
try {
741-
runDisconnected(message.run.friendlyId);
768+
runDisconnected(message.run.friendlyId, "run_stop_message");
742769
// Don't delete trace context here - run:stop fires after each snapshot/shutdown
743770
// but the run may be restored on a new VM and snapshot again. Trace context is
744771
// re-populated on dequeue, and entries are small (4 strings per run).
745-
emitOneShot({
746-
...this.wideEventOpts,
747-
populate: (state) => {
748-
state.extras.event = "run:stop";
749-
setMeta(state, "run_id", message.run.friendlyId);
750-
if (socket.data.deploymentId) {
751-
setMeta(state, "deployment_id", socket.data.deploymentId);
752-
}
753-
if (socket.data.runnerId) setMeta(state, "runner_id", socket.data.runnerId);
754-
state.extras.socket_id = socket.id;
755-
},
756-
});
757772
} catch (error) {
758773
log.error("run:stop error", { error });
759774
}

0 commit comments

Comments
 (0)