Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 93 additions & 6 deletions apps/server/src/routes/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import fs from 'node:fs';
import path from 'node:path';
import type { Request, Response, Router } from 'express';
import { Router as createRouter } from 'express';
import type { WorkflowGraph, WorkflowRunRecord, WorkflowRunResult } from '@agentic/types';
import type { WorkflowGraph, WorkflowLogEntry, WorkflowRunRecord, WorkflowRunResult } from '@agentic/types';
import type { WorkflowLLM } from '@agentic/workflow-engine';
import WorkflowEngine from '@agentic/workflow-engine';
import { addWorkflow, getWorkflow, removeWorkflow } from '../store/active-workflows';
Expand Down Expand Up @@ -47,8 +47,33 @@ function getEngineWorkflow(engine: WorkflowEngine): WorkflowGraph | undefined {
return Reflect.get(engine, 'graph') as WorkflowGraph | undefined;
}

function setEngineOnLog(
engine: WorkflowEngine,
onLog?: (entry: WorkflowLogEntry) => void
): void {
const engineAny = engine as WorkflowEngine & { setOnLog?: (handler?: (entry: WorkflowLogEntry) => void) => void };
if (typeof engineAny.setOnLog === 'function') {
engineAny.setOnLog(onLog);
}
}

export function createWorkflowRouter(llm?: WorkflowLLM): Router {
const router = createRouter();
const activeResumeModes = new Map<string, 'stream' | 'json'>();

const tryAcquireResumeLock = (runId: string, mode: 'stream' | 'json'): boolean => {
if (activeResumeModes.has(runId)) {
return false;
}
activeResumeModes.set(runId, mode);
return true;
};

const releaseResumeLock = (runId: string, mode: 'stream' | 'json'): void => {
if (activeResumeModes.get(runId) === mode) {
activeResumeModes.delete(runId);
}
};

router.post('/run', async (req: Request, res: Response) => {
const { graph } = req.body as { graph?: WorkflowGraph };
Expand Down Expand Up @@ -116,13 +141,11 @@ export function createWorkflowRouter(llm?: WorkflowLLM): Router {
res.write(`data: ${JSON.stringify(data)}\n\n`);
};

let engine: WorkflowEngine | undefined;
try {
const runId = Date.now().toString();
const engine = new WorkflowEngine(graph, {
runId,
llm,
onLog: (entry) => sendEvent({ type: 'log', entry })
});
engine = new WorkflowEngine(graph, { runId, llm });
setEngineOnLog(engine, (entry) => sendEvent({ type: 'log', entry }));
addWorkflow(engine);
sendEvent({ type: 'start', runId });

Expand All @@ -140,6 +163,63 @@ export function createWorkflowRouter(llm?: WorkflowLLM): Router {
logger.error('Failed to execute workflow stream', message);
sendEvent({ type: 'error', message });
} finally {
if (engine) {
setEngineOnLog(engine);
}
res.end();
}
});

router.post('/resume-stream', async (req: Request, res: Response) => {
const { runId, input } = req.body as { runId?: string; input?: unknown };
if (!runId) {
res.status(400).json({ error: 'runId is required' });
return;
}

const engine = getWorkflow(runId);
if (!engine) {
res.status(404).json({ error: 'Run ID not found' });
return;
}
if (!tryAcquireResumeLock(runId, 'stream')) {
res.status(409).json({ error: 'Run is already being resumed. Use a single resume endpoint per run.' });
return;
}

res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();

let clientDisconnected = false;
res.on('close', () => { clientDisconnected = true; });

const sendEvent = (data: object) => {
if (clientDisconnected) return;
res.write(`data: ${JSON.stringify(data)}\n\n`);
};

try {
setEngineOnLog(engine, (entry) => sendEvent({ type: 'log', entry }));
sendEvent({ type: 'start', runId });

const result = await engine.resume(input as Record<string, unknown>);
await persistResult(engine, result);

if (result.status !== 'paused') {
removeWorkflow(runId);
}

const workflow = getEngineWorkflow(engine);
sendEvent({ type: 'done', result: workflow ? { ...result, workflow } : result });
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
logger.error('Failed to resume workflow stream', message);
sendEvent({ type: 'error', message });
} finally {
releaseResumeLock(runId, 'stream');
setEngineOnLog(engine);
res.end();
}
Comment on lines +185 to 224
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential resume lock leak if res.flushHeaders() throws.

The lock is acquired at Line 185, but the try/finally that guarantees releaseResumeLock starts at Line 203. If res.flushHeaders() (Line 193) throws before the try block is entered, the lock is never released, permanently blocking any future resume of this runId.

🔒 Proposed fix — wrap the full post-lock body in the try/finally
     if (!tryAcquireResumeLock(runId, 'stream')) {
       res.status(409).json({ error: 'Run is already being resumed. Use a single resume endpoint per run.' });
       return;
     }

-    res.setHeader('Content-Type', 'text/event-stream');
-    res.setHeader('Cache-Control', 'no-cache');
-    res.setHeader('Connection', 'keep-alive');
-    res.flushHeaders();
-
-    let clientDisconnected = false;
-    res.on('close', () => { clientDisconnected = true; });
-
-    const sendEvent = (data: object) => {
-      if (clientDisconnected) return;
-      res.write(`data: ${JSON.stringify(data)}\n\n`);
-    };
-
     try {
+      res.setHeader('Content-Type', 'text/event-stream');
+      res.setHeader('Cache-Control', 'no-cache');
+      res.setHeader('Connection', 'keep-alive');
+      res.flushHeaders();
+
+      let clientDisconnected = false;
+      res.on('close', () => { clientDisconnected = true; });
+
+      const sendEvent = (data: object) => {
+        if (clientDisconnected) return;
+        res.write(`data: ${JSON.stringify(data)}\n\n`);
+      };
+
       setEngineOnLog(engine, (entry) => sendEvent({ type: 'log', entry }));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/server/src/routes/workflows.ts` around lines 185 - 224, After
successfully acquiring the resume lock with tryAcquireResumeLock(runId,
'stream'), ensure the releaseReleaseLock always runs by moving the try/finally
to start immediately after lock acquisition (i.e., wrap everything from
res.setHeader/.../res.flushHeaders() through the
engine.resume/persistResult/sendEvent flow) so that any throw from
res.flushHeaders() still executes releaseResumeLock(runId, 'stream'); also keep
setEngineOnLog(engine) reset in the finally block and preserve the existing
error handling that sends events and logs via logger.error.

});
Expand All @@ -156,8 +236,13 @@ export function createWorkflowRouter(llm?: WorkflowLLM): Router {
res.status(404).json({ error: 'Run ID not found' });
return;
}
if (!tryAcquireResumeLock(runId, 'json')) {
res.status(409).json({ error: 'Run is already being resumed. Use a single resume endpoint per run.' });
return;
}

try {
setEngineOnLog(engine);
const result = await engine.resume(input as Record<string, unknown>);
await persistResult(engine, result);

Expand All @@ -171,6 +256,8 @@ export function createWorkflowRouter(llm?: WorkflowLLM): Router {
const message = error instanceof Error ? error.message : String(error);
logger.error('Failed to resume workflow', message);
res.status(500).json({ error: 'Failed to resume workflow', details: message });
} finally {
releaseResumeLock(runId, 'json');
}
});

Expand Down
Loading