diff --git a/apps/server/src/routes/workflows.ts b/apps/server/src/routes/workflows.ts index bd5c93f..a865eaf 100644 --- a/apps/server/src/routes/workflows.ts +++ b/apps/server/src/routes/workflows.ts @@ -2,7 +2,7 @@ import fs from 'node:fs'; import path from 'node:path'; import type { Request, Response, Router } from 'express'; import { Router as createRouter } from 'express'; -import type { WorkflowGraph, WorkflowRunRecord, WorkflowRunResult } from '@agentic/types'; +import type { WorkflowGraph, WorkflowLogEntry, WorkflowRunRecord, WorkflowRunResult } from '@agentic/types'; import type { WorkflowLLM } from '@agentic/workflow-engine'; import WorkflowEngine from '@agentic/workflow-engine'; import { addWorkflow, getWorkflow, removeWorkflow } from '../store/active-workflows'; @@ -47,8 +47,33 @@ function getEngineWorkflow(engine: WorkflowEngine): WorkflowGraph | undefined { return Reflect.get(engine, 'graph') as WorkflowGraph | undefined; } +function setEngineOnLog( + engine: WorkflowEngine, + onLog?: (entry: WorkflowLogEntry) => void +): void { + const engineAny = engine as WorkflowEngine & { setOnLog?: (handler?: (entry: WorkflowLogEntry) => void) => void }; + if (typeof engineAny.setOnLog === 'function') { + engineAny.setOnLog(onLog); + } +} + export function createWorkflowRouter(llm?: WorkflowLLM): Router { const router = createRouter(); + const activeResumeModes = new Map(); + + const tryAcquireResumeLock = (runId: string, mode: 'stream' | 'json'): boolean => { + if (activeResumeModes.has(runId)) { + return false; + } + activeResumeModes.set(runId, mode); + return true; + }; + + const releaseResumeLock = (runId: string, mode: 'stream' | 'json'): void => { + if (activeResumeModes.get(runId) === mode) { + activeResumeModes.delete(runId); + } + }; router.post('/run', async (req: Request, res: Response) => { const { graph } = req.body as { graph?: WorkflowGraph }; @@ -116,13 +141,11 @@ export function createWorkflowRouter(llm?: WorkflowLLM): Router { res.write(`data: ${JSON.stringify(data)}\n\n`); }; + let engine: WorkflowEngine | undefined; try { const runId = Date.now().toString(); - const engine = new WorkflowEngine(graph, { - runId, - llm, - onLog: (entry) => sendEvent({ type: 'log', entry }) - }); + engine = new WorkflowEngine(graph, { runId, llm }); + setEngineOnLog(engine, (entry) => sendEvent({ type: 'log', entry })); addWorkflow(engine); sendEvent({ type: 'start', runId }); @@ -140,6 +163,63 @@ export function createWorkflowRouter(llm?: WorkflowLLM): Router { logger.error('Failed to execute workflow stream', message); sendEvent({ type: 'error', message }); } finally { + if (engine) { + setEngineOnLog(engine); + } + res.end(); + } + }); + + router.post('/resume-stream', async (req: Request, res: Response) => { + const { runId, input } = req.body as { runId?: string; input?: unknown }; + if (!runId) { + res.status(400).json({ error: 'runId is required' }); + return; + } + + const engine = getWorkflow(runId); + if (!engine) { + res.status(404).json({ error: 'Run ID not found' }); + return; + } + if (!tryAcquireResumeLock(runId, 'stream')) { + res.status(409).json({ error: 'Run is already being resumed. Use a single resume endpoint per run.' }); + return; + } + + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + res.flushHeaders(); + + let clientDisconnected = false; + res.on('close', () => { clientDisconnected = true; }); + + const sendEvent = (data: object) => { + if (clientDisconnected) return; + res.write(`data: ${JSON.stringify(data)}\n\n`); + }; + + try { + setEngineOnLog(engine, (entry) => sendEvent({ type: 'log', entry })); + sendEvent({ type: 'start', runId }); + + const result = await engine.resume(input as Record); + await persistResult(engine, result); + + if (result.status !== 'paused') { + removeWorkflow(runId); + } + + const workflow = getEngineWorkflow(engine); + sendEvent({ type: 'done', result: workflow ? { ...result, workflow } : result }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + logger.error('Failed to resume workflow stream', message); + sendEvent({ type: 'error', message }); + } finally { + releaseResumeLock(runId, 'stream'); + setEngineOnLog(engine); res.end(); } }); @@ -156,8 +236,13 @@ export function createWorkflowRouter(llm?: WorkflowLLM): Router { res.status(404).json({ error: 'Run ID not found' }); return; } + if (!tryAcquireResumeLock(runId, 'json')) { + res.status(409).json({ error: 'Run is already being resumed. Use a single resume endpoint per run.' }); + return; + } try { + setEngineOnLog(engine); const result = await engine.resume(input as Record); await persistResult(engine, result); @@ -171,6 +256,8 @@ export function createWorkflowRouter(llm?: WorkflowLLM): Router { const message = error instanceof Error ? error.message : String(error); logger.error('Failed to resume workflow', message); res.status(500).json({ error: 'Failed to resume workflow', details: message }); + } finally { + releaseResumeLock(runId, 'json'); } }); diff --git a/apps/web/src/app/workflow-editor.ts b/apps/web/src/app/workflow-editor.ts index 9caf312..1163e4b 100644 --- a/apps/web/src/app/workflow-editor.ts +++ b/apps/web/src/app/workflow-editor.ts @@ -1,7 +1,7 @@ // Bespoke Agent Builder - Client Logic import type { WorkflowConnection, WorkflowGraph, WorkflowNode, WorkflowRunResult } from '@agentic/types'; -import { runWorkflowStream, resumeWorkflow, fetchConfig, fetchRun } from '../services/api'; +import { runWorkflowStream, resumeWorkflowStream, fetchConfig, fetchRun } from '../services/api'; import { renderMarkdown, escapeHtml } from './markdown'; const EXPANDED_NODE_WIDTH = 420; @@ -23,6 +23,7 @@ const IF_PORT_STEP = 30; const IF_COLLAPSED_MULTI_CONDITION_PORT_TOP = 18; const IF_COLLAPSED_MULTI_FALLBACK_PORT_TOP = 45; const PREVIOUS_OUTPUT_TEMPLATE = '{{PREVIOUS_OUTPUT}}'; +const GENERIC_AGENT_SPINNER_KEY = '__generic_agent_spinner__'; const IF_CONDITION_OPERATORS = [ { value: 'equal', label: 'Equal' }, { value: 'contains', label: 'Contains' } @@ -197,7 +198,9 @@ export class WorkflowEditor { private rightPanel: HTMLElement | null; - private pendingAgentMessage: HTMLElement | null; + private pendingAgentMessages: Map; + + private pendingAgentMessageCounts: Map; private currentPrompt: string; @@ -305,7 +308,8 @@ export class WorkflowEditor { this.zoomValue = document.getElementById('zoom-value'); this.workflowState = 'idle'; this.rightPanel = document.getElementById('right-panel'); - this.pendingAgentMessage = null; + this.pendingAgentMessages = new Map(); + this.pendingAgentMessageCounts = new Map(); this.currentPrompt = ''; this.pendingApprovalRequest = null; this.activeRunController = null; @@ -818,9 +822,12 @@ export class WorkflowEditor { this.runHistory.push({ role: 'user', content: text }); } - showAgentSpinner(name?: string) { + showAgentSpinner(name?: string, nodeId?: string) { if (!this.chatMessages) return; - this.hideAgentSpinner(); + const spinnerKey = nodeId || GENERIC_AGENT_SPINNER_KEY; + const currentCount = this.pendingAgentMessageCounts.get(spinnerKey) ?? 0; + this.pendingAgentMessageCounts.set(spinnerKey, currentCount + 1); + if (currentCount > 0 && this.pendingAgentMessages.has(spinnerKey)) return; const resolvedName = name || this.getPrimaryAgentName(); const spinner = document.createElement('div'); spinner.className = 'chat-message agent spinner'; @@ -841,14 +848,27 @@ export class WorkflowEditor { spinner.appendChild(body); this.chatMessages.appendChild(spinner); this.chatMessages.scrollTop = this.chatMessages.scrollHeight; - this.pendingAgentMessage = spinner; + this.pendingAgentMessages.set(spinnerKey, spinner); } - hideAgentSpinner() { - if (this.pendingAgentMessage) { - this.pendingAgentMessage.remove(); - this.pendingAgentMessage = null; + hideAgentSpinner(nodeId?: string) { + if (nodeId) { + const currentCount = this.pendingAgentMessageCounts.get(nodeId) ?? 0; + if (currentCount > 1) { + this.pendingAgentMessageCounts.set(nodeId, currentCount - 1); + return; + } + this.pendingAgentMessageCounts.delete(nodeId); + const spinner = this.pendingAgentMessages.get(nodeId); + if (!spinner) return; + spinner.remove(); + this.pendingAgentMessages.delete(nodeId); + return; } + + this.pendingAgentMessages.forEach((spinner) => spinner.remove()); + this.pendingAgentMessages.clear(); + this.pendingAgentMessageCounts.clear(); } zoomCanvas(stepPercent: any) { @@ -1812,7 +1832,8 @@ export class WorkflowEditor { this.getIfConditionHandle(0), 'port-out port-condition port-condition-aggregate', title, - IF_COLLAPSED_MULTI_CONDITION_PORT_TOP + IF_COLLAPSED_MULTI_CONDITION_PORT_TOP, + false ); aggregateConditionPort.textContent = String(conditions.length); aggregateConditionPort.setAttribute('aria-label', `${conditions.length} conditions`); @@ -1860,19 +1881,29 @@ export class WorkflowEditor { } } - createPort(nodeId: string, handle: string, className: string, title = '', top: number | null = null): HTMLDivElement { + createPort( + nodeId: string, + handle: string, + className: string, + title = '', + top: number | null = null, + connectable = true + ): HTMLDivElement { const port = document.createElement('div'); - port.className = `port ${className}`; + port.className = `port ${className}${connectable ? '' : ' port-disabled'}`; if (title) port.title = title; if (typeof top === 'number') { port.style.top = `${top}px`; } port.dataset.nodeId = nodeId; port.dataset.handle = handle; + if (!connectable) { + port.setAttribute('aria-disabled', 'true'); + } if (handle === 'input') { port.addEventListener('mouseup', (e: any) => this.onPortMouseUp(e, nodeId, handle)); - } else { + } else if (connectable) { port.addEventListener('mousedown', (e: any) => this.onPortMouseDown(e, nodeId, handle)); } return port; @@ -1884,6 +1915,10 @@ export class WorkflowEditor { e.stopPropagation(); e.preventDefault(); if (!this.connectionsLayer) return; + const sourceNode = this.nodes.find((candidate: any) => candidate.id === nodeId); + if (sourceNode && this.shouldAggregateCollapsedIfPorts(sourceNode) && this.getIfConditionIndexFromHandle(handle) !== null) { + return; + } const world = this.screenToWorld(e.clientX, e.clientY); this.connectionStart = { nodeId, handle, x: world.x, y: world.y }; @@ -1903,24 +1938,31 @@ export class WorkflowEditor { onPortMouseUp(e: any, nodeId: any, handle: any) { e.stopPropagation(); if (this.connectionStart && this.connectionStart.nodeId !== nodeId) { + const nextConnection: WorkflowConnection = { + source: this.connectionStart.nodeId, + target: nodeId, + sourceHandle: this.connectionStart.handle, + targetHandle: handle + }; + const duplicateExists = this.connections.some( + (conn: any) => + conn.source === nextConnection.source && + conn.target === nextConnection.target && + conn.sourceHandle === nextConnection.sourceHandle && + conn.targetHandle === nextConnection.targetHandle + ); // If we're reconnecting an existing connection, create new connection with updated target if (this.reconnectingConnection !== null) { // Connection was already removed from array, just create new one - this.connections.push({ - source: this.connectionStart.nodeId, - target: nodeId, - sourceHandle: this.connectionStart.handle, - targetHandle: handle - }); + if (!duplicateExists) { + this.connections.push(nextConnection); + } this.reconnectingConnection = null; } else { // Creating a new connection - this.connections.push({ - source: this.connectionStart.nodeId, - target: nodeId, - sourceHandle: this.connectionStart.handle, - targetHandle: handle - }); + if (!duplicateExists) { + this.connections.push(nextConnection); + } } this.renderConnections(); if(this.tempConnection) this.tempConnection.remove(); @@ -2135,14 +2177,6 @@ export class WorkflowEditor { this.pendingApprovalRequest.rejectBtn.disabled = disabled; } - getApprovalNextNode(nodeId: string, decision: 'approve' | 'reject'): EditorNode | undefined { - const connection = this.getRunConnections().find( - (conn: any) => conn.source === nodeId && conn.sourceHandle === decision - ); - if (!connection) return undefined; - return this.getRunNodes().find((node: any) => node.id === connection.target); - } - extractWaitingNodeId(logs: any = []) { if (!Array.isArray(logs)) return null; for (let i = logs.length - 1; i >= 0; i -= 1) { @@ -2191,6 +2225,8 @@ export class WorkflowEditor { startChatSession(_promptText: any) { if (!this.chatMessages) return; this.chatMessages.innerHTML = ''; + this.pendingAgentMessages.clear(); + this.pendingAgentMessageCounts.clear(); if (typeof _promptText === 'string' && _promptText.trim()) { this.appendChatMessage(_promptText, 'user'); } @@ -2250,14 +2286,20 @@ export class WorkflowEditor { if (type === 'step_start') { const node = this.getRunNodeById(entry.nodeId); if (node?.type === 'agent') { - this.showAgentSpinner(this.getAgentNameForNode(entry.nodeId)); + this.hideAgentSpinner(GENERIC_AGENT_SPINNER_KEY); + this.showAgentSpinner(this.getAgentNameForNode(entry.nodeId), entry.nodeId); } } else if (type === 'llm_response') { - this.hideAgentSpinner(); + this.hideAgentSpinner(GENERIC_AGENT_SPINNER_KEY); + this.hideAgentSpinner(entry.nodeId); this.lastLlmResponseContent = entry.content ?? null; this.appendChatMessage(entry.content || '', 'agent', this.getAgentNameForNode(entry.nodeId)); } else if (type === 'llm_error' || type === 'error') { - this.hideAgentSpinner(); + this.hideAgentSpinner(GENERIC_AGENT_SPINNER_KEY); + const node = this.getRunNodeById(entry.nodeId); + if (node?.type === 'agent') { + this.hideAgentSpinner(entry.nodeId); + } this.appendChatMessage(entry.content || '', 'error'); } } @@ -2265,13 +2307,34 @@ export class WorkflowEditor { renderChatFromLogs(logs: any = []) { if (!this.chatMessages) return; this.chatMessages.innerHTML = ''; + this.pendingAgentMessages.clear(); + this.pendingAgentMessageCounts.clear(); this.lastLlmResponseContent = null; const initialPromptFromLogs = this.getInitialPromptFromLogs(logs); if (initialPromptFromLogs) { this.appendChatMessage(initialPromptFromLogs, 'user'); } - let messageShown = false; + const activeAgentNodeCounts = new Map(); logs.forEach((entry: any) => { + const entryNodeId = typeof entry?.nodeId === 'string' ? entry.nodeId : null; + const entryNode = entryNodeId ? this.getRunNodeById(entryNodeId) : undefined; + if (entry.type === 'step_start' && entryNode?.type === 'agent' && entryNodeId) { + const nextCount = (activeAgentNodeCounts.get(entryNodeId) ?? 0) + 1; + activeAgentNodeCounts.set(entryNodeId, nextCount); + } + if ( + (entry.type === 'llm_response' || entry.type === 'llm_error' || entry.type === 'error') && + entryNode?.type === 'agent' && + entryNodeId + ) { + const nextCount = (activeAgentNodeCounts.get(entryNodeId) ?? 0) - 1; + if (nextCount > 0) { + activeAgentNodeCounts.set(entryNodeId, nextCount); + } else { + activeAgentNodeCounts.delete(entryNodeId); + } + } + if (this.isApprovalInputLog(entry)) { const approvalText = this.formatLogContent(entry); if (approvalText) { @@ -2282,18 +2345,17 @@ export class WorkflowEditor { const role = this.mapLogEntryToRole(entry); if (!role) return; if (entry.type === 'llm_response') this.lastLlmResponseContent = entry.content ?? null; - if ((role === 'agent' || role === 'error') && !messageShown) { - this.hideAgentSpinner(); - messageShown = true; - } const text = this.formatLogContent(entry); if (!text) return; const agentName = role === 'agent' ? this.getAgentNameForNode(entry.nodeId) : undefined; this.appendChatMessage(text, role, agentName); }); - if (!messageShown) { - this.showAgentSpinner(); - } + + activeAgentNodeCounts.forEach((activeCount, nodeId) => { + for (let i = 0; i < activeCount; i += 1) { + this.showAgentSpinner(this.getAgentNameForNode(nodeId), nodeId); + } + }); } async runWorkflow() { @@ -2414,7 +2476,9 @@ export class WorkflowEditor { // Engine still executing on server — show partial chat and poll for updates this.currentRunId = runId; this.renderChatFromLogs(result.logs); - this.showAgentSpinner(); + if (this.pendingAgentMessages.size === 0) { + this.showAgentSpinner(); + } this.setWorkflowState('running'); this.pollForRun(runId, result.logs.length); } else if (result.status === 'paused' && !result.waitingForInput) { @@ -2472,23 +2536,22 @@ export class WorkflowEditor { async submitApprovalDecision(decision: any) { if (!this.currentRunId) return; - const pendingApprovalNodeId = this.pendingApprovalRequest?.nodeId ?? null; this.setApprovalButtonsDisabled(true); const note = ''; this.replaceApprovalWithResult(decision, note); this.setWorkflowState('running'); - if (pendingApprovalNodeId) { - const nextNode = this.getApprovalNextNode(pendingApprovalNodeId, decision); - if (nextNode?.type === 'agent') { - this.showAgentSpinner(this.getAgentNameForNode(nextNode.id)); - } - } + this.showAgentSpinner(); const controller = new AbortController(); this.activeRunController = controller; try { - const result = await resumeWorkflow(this.currentRunId, { decision, note }, { signal: controller.signal }); - this.handleRunResult(result); + const result = await resumeWorkflowStream( + this.currentRunId, + { decision, note }, + (entry: any) => this.onLogEntry(entry), + { signal: controller.signal } + ); + this.handleRunResult(result, true); } catch (e) { if (this.isAbortError(e)) return; this.appendChatMessage(this.getErrorMessage(e), 'error'); diff --git a/apps/web/src/services/api.ts b/apps/web/src/services/api.ts index 49457de..e756e6b 100644 --- a/apps/web/src/services/api.ts +++ b/apps/web/src/services/api.ts @@ -46,15 +46,16 @@ export function runWorkflow(graph: WorkflowGraph, options: RequestOptions = {}): return request('/api/run', { graph }, options); } -export async function runWorkflowStream( - graph: WorkflowGraph, +async function requestWorkflowStream( + url: string, + body: unknown, onLog: (entry: WorkflowLogEntry) => void, options: RequestOptions = {} ): Promise { - const res = await fetch('/api/run-stream', { + const res = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ graph }), + body: JSON.stringify(body), signal: options.signal }); @@ -107,6 +108,14 @@ export async function runWorkflowStream( return result; } +export function runWorkflowStream( + graph: WorkflowGraph, + onLog: (entry: WorkflowLogEntry) => void, + options: RequestOptions = {} +): Promise { + return requestWorkflowStream('/api/run-stream', { graph }, onLog, options); +} + export type ProviderModel = { id: string; name: string; @@ -138,6 +147,15 @@ export function resumeWorkflow( return request('/api/resume', { runId, input }, options); } +export function resumeWorkflowStream( + runId: string, + input: ApprovalInput, + onLog: (entry: WorkflowLogEntry) => void, + options: RequestOptions = {} +): Promise { + return requestWorkflowStream('/api/resume-stream', { runId, input }, onLog, options); +} + export async function fetchRun(runId: string): Promise { const res = await fetch(`/api/run/${encodeURIComponent(runId)}`); if (res.status === 404) return null; diff --git a/apps/web/src/workflow-editor.css b/apps/web/src/workflow-editor.css index 14a1e65..9cc8c20 100644 --- a/apps/web/src/workflow-editor.css +++ b/apps/web/src/workflow-editor.css @@ -472,6 +472,17 @@ path.connection-line.active { transform: scale(1.3); } +.port-disabled { + cursor: not-allowed; + opacity: 0.7; +} + +.port-disabled:hover { + border-color: var(--Colors-Stroke-Default); + background: var(--Colors-Backgrounds-Main-Top); + transform: none; +} + .port-in { left: -8px; } .port-out { right: -8px; } .port-true { top: 45px; border-color: var(--Colors-Alert-Success-Default); } diff --git a/packages/types/src/index.d.ts b/packages/types/src/index.d.ts deleted file mode 100644 index ec6a3dd..0000000 --- a/packages/types/src/index.d.ts +++ /dev/null @@ -1,44 +0,0 @@ -export type NodeType = 'start' | 'agent' | 'if' | 'approval' | 'end' | string; -export interface BaseNodeData { - collapsed?: boolean; - [key: string]: unknown; -} -export interface WorkflowNode { - id: string; - type: NodeType; - x: number; - y: number; - data?: TData; -} -export interface WorkflowConnection { - source: string; - target: string; - sourceHandle?: string; - targetHandle?: string; -} -export interface WorkflowGraph { - nodes: WorkflowNode[]; - connections: WorkflowConnection[]; -} -export type WorkflowStatus = 'pending' | 'running' | 'paused' | 'completed' | 'failed'; -export interface WorkflowLogEntry { - timestamp: string; - nodeId: string | 'system'; - type: string; - content: string; -} -export interface WorkflowRunResult { - runId: string; - status: WorkflowStatus; - logs: WorkflowLogEntry[]; - state: Record; - waitingForInput: boolean; - currentNodeId: string | null; - workflow?: WorkflowGraph; -} -export interface ApprovalInput { - decision: 'approve' | 'reject'; - note?: string; -} -export interface WorkflowEngineResult extends WorkflowRunResult { -} diff --git a/packages/types/src/index.js b/packages/types/src/index.js deleted file mode 100644 index cb0ff5c..0000000 --- a/packages/types/src/index.js +++ /dev/null @@ -1 +0,0 @@ -export {}; diff --git a/packages/workflow-engine/src/index.js b/packages/workflow-engine/src/index.js deleted file mode 100644 index df1b848..0000000 --- a/packages/workflow-engine/src/index.js +++ /dev/null @@ -1,353 +0,0 @@ -"use strict"; -Object.defineProperty(exports, "__esModule", { value: true }); -exports.WorkflowEngine = void 0; -const DEFAULT_REASONING = 'low'; -const IF_CONDITION_HANDLE_PREFIX = 'condition-'; -class MissingLLM { - async respond() { - throw new Error('No LLM service configured. Set OPENAI_API_KEY in the environment.'); - } -} -class WorkflowEngine { - constructor(graph, options = {}) { - this.logs = []; - this.state = {}; - this.status = 'pending'; - this.currentNodeId = null; - this.waitingForInput = false; - this.graph = this.normalizeGraph(graph); - this.runId = options.runId ?? Date.now().toString(); - this.llm = options.llm ?? new MissingLLM(); - this.timestampFn = options.timestampFn ?? (() => new Date().toISOString()); - this.onLog = options.onLog; - } - getRunId() { - return this.runId; - } - getLogs() { - return this.logs; - } - getStatus() { - return this.status; - } - getResult() { - return { - runId: this.runId, - status: this.status, - logs: this.logs, - state: this.state, - waitingForInput: this.waitingForInput, - currentNodeId: this.currentNodeId - }; - } - async run() { - this.status = 'running'; - const startNode = this.graph.nodes.find((n) => n.type === 'start'); - if (!startNode) { - this.log('system', 'error', 'No start node found in workflow graph'); - this.status = 'failed'; - return this.getResult(); - } - this.currentNodeId = startNode.id; - await this.processNode(startNode); - return this.getResult(); - } - async resume(input) { - if (this.status !== 'paused' || !this.currentNodeId) { - return this.getResult(); - } - const currentNode = this.graph.nodes.find((n) => n.id === this.currentNodeId); - if (!currentNode) { - this.status = 'failed'; - this.log(this.currentNodeId, 'error', 'Unable to resume, current node missing'); - return this.getResult(); - } - this.waitingForInput = false; - this.status = 'running'; - let connection; - if (currentNode.type === 'approval') { - const normalized = this.normalizeApprovalInput(input); - const logMessage = this.describeApprovalResult(normalized); - this.log(currentNode.id, 'input_received', logMessage); - this.state[`${currentNode.id}_approval`] = normalized; - const restored = this.state.pre_approval_output; - if (restored !== undefined) { - if (typeof restored === 'string') { - this.state.previous_output = restored; - } - else { - this.state.previous_output = JSON.stringify(restored); - } - } - delete this.state.pre_approval_output; - connection = this.graph.connections.find((c) => c.source === currentNode.id && c.sourceHandle === normalized.decision); - } - else { - this.log(currentNode.id, 'input_received', JSON.stringify(input)); - this.state.previous_output = input ?? ''; - connection = this.graph.connections.find((c) => c.source === currentNode.id); - } - if (connection) { - const nextNode = this.graph.nodes.find((n) => n.id === connection.target); - if (nextNode) { - await this.processNode(nextNode); - } - else { - this.status = 'completed'; - } - } - else { - this.status = 'completed'; - } - return this.getResult(); - } - normalizeGraph(graph) { - const nodes = Array.isArray(graph.nodes) - ? graph.nodes.map((node) => { - if (node.type === 'input') { - return { ...node, type: 'approval' }; - } - return node; - }) - : []; - return { - nodes, - connections: Array.isArray(graph.connections) ? graph.connections : [] - }; - } - log(nodeId, type, content) { - const entry = { - timestamp: this.timestampFn(), - nodeId: nodeId ?? 'system', - type, - content - }; - this.logs.push(entry); - if (this.onLog) { - this.onLog(entry); - } - } - async processNode(node) { - this.currentNodeId = node.id; - this.log(node.id, 'step_start', this.describeNode(node)); - try { - let output = null; - switch (node.type) { - case 'start': - output = node.data?.initialInput || ''; - break; - case 'agent': - output = await this.executeAgentNode(node); - break; - case 'if': { - const nextNodeId = this.evaluateIfNode(node); - if (nextNodeId) { - const nextNode = this.graph.nodes.find((n) => n.id === nextNodeId); - if (nextNode) { - await this.processNode(nextNode); - } - else { - this.status = 'completed'; - } - } - else { - this.status = 'completed'; - } - return; - } - case 'approval': - this.state.pre_approval_output = this.state.previous_output; - this.status = 'paused'; - this.waitingForInput = true; - this.log(node.id, 'wait_input', 'Waiting for user approval'); - return; - case 'end': - this.status = 'completed'; - return; - default: - this.log(node.id, 'warn', `Unknown node type "${node.type}" skipped`); - } - this.state.previous_output = output; - this.state[node.id] = output; - const nextConnection = this.graph.connections.find((c) => c.source === node.id); - if (nextConnection) { - const nextNode = this.graph.nodes.find((n) => n.id === nextConnection.target); - if (nextNode) { - await this.processNode(nextNode); - } - else { - this.status = 'completed'; - } - } - else if (node.type !== 'end') { - this.status = 'completed'; - } - } - catch (error) { - const message = error instanceof Error ? error.message : String(error); - const lastLog = this.logs[this.logs.length - 1]; - const isDuplicateLlmError = lastLog && - lastLog.nodeId === node.id && - lastLog.type === 'llm_error' && - lastLog.content === message; - if (!isDuplicateLlmError) { - this.log(node.id, 'error', message); - } - this.status = 'failed'; - } - } - describeNode(node) { - if (node.type === 'agent') { - const name = node.data?.agentName || 'Agent'; - return `${name} agent node`; - } - switch (node.type) { - case 'start': - return 'start node'; - case 'if': - return 'condition node'; - case 'approval': - return 'approval node'; - case 'end': - return 'end node'; - default: - return `${node.type} node`; - } - } - evaluateIfNode(node) { - const input = this.getIfInputString(); - const normalizedInput = input.toLowerCase(); - const conditions = this.getIfConditions(node); - for (let index = 0; index < conditions.length; index += 1) { - const condition = conditions[index]; - const match = this.evaluateIfCondition(normalizedInput, condition); - this.log(node.id, 'logic_check', `Condition ${index + 1} (${condition.operator} "${condition.value}") evaluated as ${match ? 'true' : 'false'}`); - if (!match) - continue; - const conn = this.graph.connections.find((c) => { - if (c.source !== node.id) - return false; - if (c.sourceHandle === `${IF_CONDITION_HANDLE_PREFIX}${index}`) - return true; - return index === 0 && c.sourceHandle === 'true'; - }); - if (conn) - return conn.target; - } - const falseConn = this.graph.connections.find((c) => c.source === node.id && c.sourceHandle === 'false'); - if (falseConn) - return falseConn.target; - return null; - } - getIfConditions(node) { - const legacyCondition = typeof node.data?.condition === 'string' ? node.data.condition : ''; - const conditionsData = node.data?.conditions; - const rawConditions = Array.isArray(conditionsData) && conditionsData.length > 0 - ? conditionsData - : [{ operator: 'contains', value: legacyCondition }]; - return rawConditions.map((condition) => ({ - operator: condition.operator === 'contains' ? 'contains' : 'equal', - value: typeof condition.value === 'string' ? condition.value : '' - })); - } - getIfInputString() { - const previousOutput = this.state.previous_output; - if (typeof previousOutput === 'string') - return previousOutput; - if (previousOutput === undefined || previousOutput === null) - return ''; - return JSON.stringify(previousOutput); - } - evaluateIfCondition(input, condition) { - const expectedValue = condition.value.trim().toLowerCase(); - if (!expectedValue) - return false; - if (condition.operator === 'contains') { - return input.includes(expectedValue); - } - return input === expectedValue; - } - async executeAgentNode(node) { - const previousOutput = this.state.previous_output; - let lastOutputStr = ''; - if (typeof previousOutput === 'string') { - lastOutputStr = previousOutput; - } - else if (previousOutput !== undefined && previousOutput !== null) { - lastOutputStr = JSON.stringify(previousOutput); - } - if (previousOutput && - typeof previousOutput === 'object' && - ('decision' in previousOutput || - 'note' in previousOutput)) { - lastOutputStr = this.findLastNonApprovalOutput() || ''; - } - const userPrompt = node.data?.userPrompt; - let userContent; - if (userPrompt && typeof userPrompt === 'string' && userPrompt.trim()) { - userContent = userPrompt.replace(/\{\{PREVIOUS_OUTPUT\}\}/g, lastOutputStr); - } - else { - userContent = lastOutputStr; - } - const invocation = { - systemPrompt: node.data?.systemPrompt || 'You are a helpful assistant.', - userContent, - model: node.data?.model || 'gpt-5', - reasoningEffort: node.data?.reasoningEffort || DEFAULT_REASONING, - tools: node.data?.tools - }; - this.log(node.id, 'start_prompt', invocation.userContent || ''); - try { - const responseText = await this.llm.respond(invocation); - this.log(node.id, 'llm_response', responseText); - return responseText; - } - catch (error) { - const message = error instanceof Error ? error.message : String(error); - this.log(node.id, 'llm_error', message); - throw error instanceof Error ? error : new Error(message); - } - } - findLastNonApprovalOutput() { - const entries = Object.entries(this.state); - for (let i = entries.length - 1; i >= 0; i -= 1) { - const [key, value] = entries[i]; - if (key.includes('_approval') || key === 'previous_output' || key === 'pre_approval_output') { - continue; - } - if (typeof value === 'string') { - return value; - } - } - return null; - } - normalizeApprovalInput(input) { - if (typeof input === 'string') { - return { - decision: input.toLowerCase().includes('reject') ? 'reject' : 'approve', - note: '' - }; - } - if (input && typeof input === 'object') { - const decision = input.decision === 'reject' || - (typeof input.decision === 'string' && input.decision.toLowerCase() === 'reject') - ? 'reject' - : 'approve'; - return { - decision, - note: typeof input.note === 'string' ? input.note : '' - }; - } - return { decision: 'approve', note: '' }; - } - describeApprovalResult(result) { - const base = result.decision === 'approve' ? 'User approved this step.' : 'User rejected this step.'; - if (result.note && result.note.trim()) { - return `${base} Feedback: ${result.note.trim()}`; - } - return base; - } -} -exports.WorkflowEngine = WorkflowEngine; -exports.default = WorkflowEngine; diff --git a/packages/workflow-engine/src/index.ts b/packages/workflow-engine/src/index.ts index 25872fa..63c6c51 100644 --- a/packages/workflow-engine/src/index.ts +++ b/packages/workflow-engine/src/index.ts @@ -52,6 +52,9 @@ export interface WorkflowEngineInitOptions { const DEFAULT_REASONING = 'low'; const IF_CONDITION_HANDLE_PREFIX = 'condition-'; +const APPROVAL_CONTEXTS_STATE_KEY = '__approval_contexts__'; +const PENDING_APPROVAL_QUEUE_STATE_KEY = '__pending_approval_queue__'; +const DEFERRED_NODE_QUEUE_STATE_KEY = '__deferred_node_queue__'; type IfConditionOperator = 'equal' | 'contains'; @@ -60,6 +63,11 @@ interface IfCondition { value: string; } +interface DeferredNodeExecution { + nodeId: string; + previousOutput: unknown; +} + class MissingLLM implements WorkflowLLM { async respond(): Promise { throw new Error('No LLM service configured. Set OPENAI_API_KEY in the environment.'); @@ -71,7 +79,7 @@ export class WorkflowEngine { private readonly timestampFn: () => string; - private readonly onLog?: (entry: WorkflowLogEntry) => void; + private onLog?: (entry: WorkflowLogEntry) => void; private graph: WorkflowGraph; @@ -130,8 +138,13 @@ export class WorkflowEngine { }; } + setOnLog(onLog?: (entry: WorkflowLogEntry) => void): void { + this.onLog = onLog; + } + async run(): Promise { this.status = 'running'; + this.waitingForInput = false; const startNode = this.graph.nodes.find((n) => n.type === 'start'); if (!startNode) { this.log('system', 'error', 'No start node found in workflow graph'); @@ -141,6 +154,13 @@ export class WorkflowEngine { this.currentNodeId = startNode.id; await this.processNode(startNode); + if (this.status === 'running') { + await this.drainDeferredNodes(); + } + if (this.status === 'running') { + this.status = 'completed'; + this.currentNodeId = null; + } return this.getResult(); } @@ -159,41 +179,46 @@ export class WorkflowEngine { this.waitingForInput = false; this.status = 'running'; - let connection: WorkflowConnection | undefined; + let previousOutput: unknown = input ?? ''; + let connections: WorkflowConnection[] = []; if (currentNode.type === 'approval') { + this.removePendingApproval(currentNode.id); const normalized = this.normalizeApprovalInput(input); const logMessage = this.describeApprovalResult(normalized); this.log(currentNode.id, 'input_received', logMessage); this.state[`${currentNode.id}_approval`] = normalized; - const restored = this.state.pre_approval_output; + const restored = this.consumeApprovalContext(currentNode.id) ?? this.state.pre_approval_output; if (restored !== undefined) { - if (typeof restored === 'string') { - this.state.previous_output = restored; - } else { - this.state.previous_output = JSON.stringify(restored); - } + previousOutput = restored; } delete this.state.pre_approval_output; - connection = this.graph.connections.find( + connections = this.graph.connections.filter( (c) => c.source === currentNode.id && c.sourceHandle === normalized.decision ); } else { this.log(currentNode.id, 'input_received', JSON.stringify(input)); - this.state.previous_output = input ?? ''; - connection = this.graph.connections.find((c) => c.source === currentNode.id); + connections = this.graph.connections.filter((c) => c.source === currentNode.id); } - if (connection) { - const nextNode = this.graph.nodes.find((n) => n.id === connection.target); - if (nextNode) { - await this.processNode(nextNode); + this.state.previous_output = previousOutput; + + await this.processConnections(currentNode.id, connections, previousOutput); + if (this.status === 'running') { + await this.drainDeferredNodes(); + } + if (this.status === 'running') { + const nextPendingApprovalNodeId = this.dequeuePendingApproval(); + if (nextPendingApprovalNodeId) { + this.currentNodeId = nextPendingApprovalNodeId; + this.waitingForInput = true; + this.status = 'paused'; + this.log(nextPendingApprovalNodeId, 'wait_input', 'Waiting for user approval'); } else { this.status = 'completed'; + this.currentNodeId = null; } - } else { - this.status = 'completed'; } return this.getResult(); @@ -227,8 +252,20 @@ export class WorkflowEngine { } } - private async processNode(node: WorkflowNode): Promise { - this.currentNodeId = node.id; + private async processNode( + node: WorkflowNode, + previousOutput: unknown = this.state.previous_output, + writeSharedPreviousOutput = true + ): Promise { + if (this.status !== 'running') { + if (this.status === 'paused' && this.waitingForInput && node.type === 'approval') { + this.setApprovalContext(node.id, previousOutput); + this.enqueuePendingApproval(node.id); + this.log(node.id, 'wait_input', 'Waiting for user approval'); + } + return undefined; + } + this.log(node.id, 'step_start', this.describeNode(node)); try { @@ -239,49 +276,44 @@ export class WorkflowEngine { output = node.data?.initialInput || ''; break; case 'agent': - output = await this.executeAgentNode(node); + output = await this.executeAgentNode(node, previousOutput); break; case 'if': { - const nextNodeId = this.evaluateIfNode(node); - if (nextNodeId) { - const nextNode = this.graph.nodes.find((n) => n.id === nextNodeId); - if (nextNode) { - await this.processNode(nextNode); - } else { - this.status = 'completed'; - } - } else { - this.status = 'completed'; - } - return; + const nextConnections = this.evaluateIfNodeConnections(node, previousOutput); + await this.processConnections(node.id, nextConnections, previousOutput, writeSharedPreviousOutput); + return undefined; } case 'approval': - this.state.pre_approval_output = this.state.previous_output; + this.setApprovalContext(node.id, previousOutput); + if (this.waitingForInput) { + this.enqueuePendingApproval(node.id); + this.log(node.id, 'wait_input', 'Waiting for user approval'); + return undefined; + } + this.state.pre_approval_output = previousOutput; + this.currentNodeId = node.id; this.status = 'paused'; this.waitingForInput = true; this.log(node.id, 'wait_input', 'Waiting for user approval'); - return; + return undefined; case 'end': - this.status = 'completed'; - return; + return undefined; default: this.log(node.id, 'warn', `Unknown node type "${node.type}" skipped`); } - this.state.previous_output = output; - this.state[node.id] = output; + if (this.shouldSkipPostNodePropagation()) { + return undefined; + } - const nextConnection = this.graph.connections.find((c) => c.source === node.id); - if (nextConnection) { - const nextNode = this.graph.nodes.find((n) => n.id === nextConnection.target); - if (nextNode) { - await this.processNode(nextNode); - } else { - this.status = 'completed'; - } - } else if (node.type !== 'end') { - this.status = 'completed'; + if (writeSharedPreviousOutput) { + this.state.previous_output = output; } + this.state[node.id] = output; + + const nextConnections = this.graph.connections.filter((c) => c.source === node.id); + await this.processConnections(node.id, nextConnections, output, writeSharedPreviousOutput); + return output; } catch (error) { const message = error instanceof Error ? error.message : String(error); const lastLog = this.logs[this.logs.length - 1]; @@ -294,6 +326,78 @@ export class WorkflowEngine { this.log(node.id, 'error', message); } this.status = 'failed'; + return undefined; + } + } + + private async processConnections( + sourceNodeId: string, + connections: WorkflowConnection[], + previousOutput: unknown, + writeSharedPreviousOutput = true + ): Promise { + if (connections.length === 0) { + return; + } + + if (this.status !== 'running') { + if (this.status === 'paused' && this.waitingForInput) { + this.deferConnections(sourceNodeId, connections, previousOutput); + } + return; + } + + const nextNodes: WorkflowNode[] = []; + for (const connection of connections) { + const nextNode = this.graph.nodes.find((n) => n.id === connection.target); + if (!nextNode) { + this.log(sourceNodeId, 'warn', `Connection target "${connection.target}" not found`); + continue; + } + nextNodes.push(nextNode); + } + + if (nextNodes.length === 0) { + return; + } + + if (nextNodes.length === 1) { + await this.processNode(nextNodes[0], previousOutput, writeSharedPreviousOutput); + return; + } + + await Promise.all(nextNodes.map((nextNode) => this.processNode(nextNode, previousOutput, false))); + } + + private deferConnections( + sourceNodeId: string, + connections: WorkflowConnection[], + previousOutput: unknown + ): void { + for (const connection of connections) { + const nextNode = this.graph.nodes.find((n) => n.id === connection.target); + if (!nextNode) { + this.log(sourceNodeId, 'warn', `Connection target "${connection.target}" not found`); + continue; + } + this.enqueueDeferredNode(nextNode.id, previousOutput); + } + } + + private async drainDeferredNodes(): Promise { + while (this.status === 'running') { + const deferred = this.dequeueDeferredNode(); + if (!deferred) { + return; + } + + const node = this.graph.nodes.find((candidate) => candidate.id === deferred.nodeId); + if (!node) { + this.log('system', 'warn', `Deferred node "${deferred.nodeId}" not found`); + continue; + } + + await this.processNode(node, deferred.previousOutput); } } @@ -316,8 +420,8 @@ export class WorkflowEngine { } } - private evaluateIfNode(node: WorkflowNode): string | null { - const input = this.getIfInputString(); + private evaluateIfNodeConnections(node: WorkflowNode, previousOutput: unknown): WorkflowConnection[] { + const input = this.getIfInputString(previousOutput); const normalizedInput = input.toLowerCase(); const conditions = this.getIfConditions(node); @@ -331,19 +435,18 @@ export class WorkflowEngine { ); if (!match) continue; - const conn = this.graph.connections.find((c) => { - if (c.source !== node.id) return false; - if (c.sourceHandle === `${IF_CONDITION_HANDLE_PREFIX}${index}`) return true; - return index === 0 && c.sourceHandle === 'true'; - }); - if (conn) return conn.target; + const selectedHandles = new Set([`${IF_CONDITION_HANDLE_PREFIX}${index}`]); + if (index === 0) { + selectedHandles.add('true'); + } + return this.graph.connections.filter( + (c) => c.source === node.id && typeof c.sourceHandle === 'string' && selectedHandles.has(c.sourceHandle) + ); } - const falseConn = this.graph.connections.find( + return this.graph.connections.filter( (c) => c.source === node.id && c.sourceHandle === 'false' ); - if (falseConn) return falseConn.target; - return null; } private getIfConditions(node: WorkflowNode): IfCondition[] { @@ -360,8 +463,7 @@ export class WorkflowEngine { })); } - private getIfInputString(): string { - const previousOutput = this.state.previous_output; + private getIfInputString(previousOutput: unknown): string { if (typeof previousOutput === 'string') return previousOutput; if (previousOutput === undefined || previousOutput === null) return ''; return JSON.stringify(previousOutput); @@ -376,9 +478,7 @@ export class WorkflowEngine { return input === expectedValue; } - private async executeAgentNode(node: WorkflowNode): Promise { - const previousOutput = this.state.previous_output; - + private async executeAgentNode(node: WorkflowNode, previousOutput: unknown): Promise { // Resolve previousOutput to a string for template substitution let lastOutputStr = ''; if (typeof previousOutput === 'string') { @@ -442,6 +542,108 @@ export class WorkflowEngine { return null; } + private getApprovalContexts(): Record { + const raw = this.state[APPROVAL_CONTEXTS_STATE_KEY]; + if (raw && typeof raw === 'object' && !Array.isArray(raw)) { + return raw as Record; + } + const contexts: Record = {}; + this.state[APPROVAL_CONTEXTS_STATE_KEY] = contexts; + return contexts; + } + + private setApprovalContext(nodeId: string, previousOutput: unknown): void { + const contexts = this.getApprovalContexts(); + contexts[nodeId] = previousOutput; + } + + private consumeApprovalContext(nodeId: string): unknown { + const contexts = this.getApprovalContexts(); + const restored = contexts[nodeId]; + delete contexts[nodeId]; + if (Object.keys(contexts).length === 0) { + delete this.state[APPROVAL_CONTEXTS_STATE_KEY]; + } + return restored; + } + + private getPendingApprovalQueue(): string[] { + const raw = this.state[PENDING_APPROVAL_QUEUE_STATE_KEY]; + if (Array.isArray(raw)) { + const queue = raw.filter((value): value is string => typeof value === 'string'); + if (queue.length !== raw.length) { + this.state[PENDING_APPROVAL_QUEUE_STATE_KEY] = queue; + } + return queue; + } + const queue: string[] = []; + this.state[PENDING_APPROVAL_QUEUE_STATE_KEY] = queue; + return queue; + } + + private enqueuePendingApproval(nodeId: string): void { + if (this.currentNodeId === nodeId) return; + const queue = this.getPendingApprovalQueue(); + if (!queue.includes(nodeId)) { + queue.push(nodeId); + } + } + + private dequeuePendingApproval(): string | null { + const queue = this.getPendingApprovalQueue(); + const nextNodeId = queue.shift() ?? null; + if (queue.length === 0) { + delete this.state[PENDING_APPROVAL_QUEUE_STATE_KEY]; + } + return nextNodeId; + } + + private removePendingApproval(nodeId: string): void { + const raw = this.state[PENDING_APPROVAL_QUEUE_STATE_KEY]; + if (!Array.isArray(raw)) return; + const queue = raw.filter((value): value is string => typeof value === 'string'); + const nextQueue = queue.filter((value) => value !== nodeId); + if (nextQueue.length === queue.length) return; + if (nextQueue.length === 0) { + delete this.state[PENDING_APPROVAL_QUEUE_STATE_KEY]; + return; + } + this.state[PENDING_APPROVAL_QUEUE_STATE_KEY] = nextQueue; + } + + private getDeferredNodeQueue(): DeferredNodeExecution[] { + const raw = this.state[DEFERRED_NODE_QUEUE_STATE_KEY]; + if (Array.isArray(raw)) { + const queue = raw.flatMap((entry) => { + if (!entry || typeof entry !== 'object') return []; + const nodeId = (entry as { nodeId?: unknown }).nodeId; + if (typeof nodeId !== 'string') return []; + return [{ nodeId, previousOutput: (entry as { previousOutput?: unknown }).previousOutput }]; + }); + if (queue.length !== raw.length) { + this.state[DEFERRED_NODE_QUEUE_STATE_KEY] = queue; + } + return queue; + } + const queue: DeferredNodeExecution[] = []; + this.state[DEFERRED_NODE_QUEUE_STATE_KEY] = queue; + return queue; + } + + private enqueueDeferredNode(nodeId: string, previousOutput: unknown): void { + const queue = this.getDeferredNodeQueue(); + queue.push({ nodeId, previousOutput }); + } + + private dequeueDeferredNode(): DeferredNodeExecution | null { + const queue = this.getDeferredNodeQueue(); + const next = queue.shift() ?? null; + if (queue.length === 0) { + delete this.state[DEFERRED_NODE_QUEUE_STATE_KEY]; + } + return next; + } + private normalizeApprovalInput(input?: ApprovalInput | string | Record): ApprovalInput { if (typeof input === 'string') { return { @@ -470,6 +672,21 @@ export class WorkflowEngine { } return base; } + + private shouldSkipPostNodePropagation(): boolean { + if (this.status === 'running') { + return false; + } + if (this.status === 'failed' || this.status === 'completed') { + return true; + } + if (this.status === 'paused' && !this.waitingForInput) { + return true; + } + // Keep propagation active for paused + waitingForInput so processConnections + // can defer downstream branches that have not executed yet. + return false; + } } export default WorkflowEngine; diff --git a/tsconfig.base.json b/tsconfig.base.json index 7973924..93c5ddf 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -16,13 +16,12 @@ "baseUrl": ".", "paths": { "@agentic/types": [ - "./packages/types/src" + "./packages/types/src/index.ts" ], "@agentic/workflow-engine": [ - "./packages/workflow-engine/src" + "./packages/workflow-engine/src/index.ts" ] }, "declarationMap": true } } -