diff --git a/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts b/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts new file mode 100644 index 0000000000..647012589c --- /dev/null +++ b/apps/sim/app/api/workflows/[id]/execute-from-block/route.ts @@ -0,0 +1,216 @@ +import { db, workflow as workflowTable } from '@sim/db' +import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { v4 as uuidv4 } from 'uuid' +import { z } from 'zod' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' +import { SSE_HEADERS } from '@/lib/core/utils/sse' +import { markExecutionCancelled } from '@/lib/execution/cancellation' +import { LoggingSession } from '@/lib/logs/execution/logging-session' +import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' +import { createSSECallbacks } from '@/lib/workflows/executor/execution-events' +import { ExecutionSnapshot } from '@/executor/execution/snapshot' +import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types' +import { hasExecutionResult } from '@/executor/utils/errors' + +const logger = createLogger('ExecuteFromBlockAPI') + +const ExecuteFromBlockSchema = z.object({ + startBlockId: z.string().min(1, 'Start block ID is required'), + sourceSnapshot: z.object({ + blockStates: z.record(z.any()), + executedBlocks: z.array(z.string()), + blockLogs: z.array(z.any()), + decisions: z.object({ + router: z.record(z.string()), + condition: z.record(z.string()), + }), + completedLoops: z.array(z.string()), + loopExecutions: z.record(z.any()).optional(), + parallelExecutions: z.record(z.any()).optional(), + parallelBlockMapping: z.record(z.any()).optional(), + activeExecutionPath: z.array(z.string()), + }), + input: z.any().optional(), +}) + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' + +export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) { + const requestId = generateRequestId() + const { id: workflowId } = await params + + try { + const auth = await checkHybridAuth(req, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 }) + } + const userId = auth.userId + + let body: unknown + try { + body = await req.json() + } catch { + return NextResponse.json({ error: 'Invalid JSON body' }, { status: 400 }) + } + + const validation = ExecuteFromBlockSchema.safeParse(body) + if (!validation.success) { + logger.warn(`[${requestId}] Invalid request body:`, validation.error.errors) + return NextResponse.json( + { + error: 'Invalid request body', + details: validation.error.errors.map((e) => ({ + path: e.path.join('.'), + message: e.message, + })), + }, + { status: 400 } + ) + } + + const { startBlockId, sourceSnapshot, input } = validation.data + const executionId = uuidv4() + + const [workflowRecord] = await db + .select({ workspaceId: workflowTable.workspaceId, userId: workflowTable.userId }) + .from(workflowTable) + .where(eq(workflowTable.id, workflowId)) + .limit(1) + + if (!workflowRecord?.workspaceId) { + return NextResponse.json({ error: 'Workflow not found or has no workspace' }, { status: 404 }) + } + + const workspaceId = workflowRecord.workspaceId + const workflowUserId = workflowRecord.userId + + logger.info(`[${requestId}] Starting run-from-block execution`, { + workflowId, + startBlockId, + executedBlocksCount: sourceSnapshot.executedBlocks.length, + }) + + const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId) + const abortController = new AbortController() + let isStreamClosed = false + + const stream = new ReadableStream({ + async start(controller) { + const { sendEvent, onBlockStart, onBlockComplete, onStream } = createSSECallbacks({ + executionId, + workflowId, + controller, + isStreamClosed: () => isStreamClosed, + setStreamClosed: () => { + isStreamClosed = true + }, + }) + + const metadata: ExecutionMetadata = { + requestId, + workflowId, + userId, + executionId, + triggerType: 'manual', + workspaceId, + workflowUserId, + useDraftState: true, + isClientSession: true, + startTime: new Date().toISOString(), + } + + const snapshot = new ExecutionSnapshot(metadata, {}, input || {}, {}) + + try { + const startTime = new Date() + + sendEvent({ + type: 'execution:started', + timestamp: startTime.toISOString(), + executionId, + workflowId, + data: { startTime: startTime.toISOString() }, + }) + + const result = await executeWorkflowCore({ + snapshot, + loggingSession, + abortSignal: abortController.signal, + runFromBlock: { + startBlockId, + sourceSnapshot: sourceSnapshot as SerializableExecutionState, + }, + callbacks: { onBlockStart, onBlockComplete, onStream }, + }) + + if (result.status === 'cancelled') { + sendEvent({ + type: 'execution:cancelled', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { duration: result.metadata?.duration || 0 }, + }) + } else { + sendEvent({ + type: 'execution:completed', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + success: result.success, + output: result.output, + duration: result.metadata?.duration || 0, + startTime: result.metadata?.startTime || startTime.toISOString(), + endTime: result.metadata?.endTime || new Date().toISOString(), + }, + }) + } + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`) + + const executionResult = hasExecutionResult(error) ? error.executionResult : undefined + + sendEvent({ + type: 'execution:error', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + error: executionResult?.error || errorMessage, + duration: executionResult?.metadata?.duration || 0, + }, + }) + } finally { + if (!isStreamClosed) { + try { + controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n')) + controller.close() + } catch {} + } + } + }, + cancel() { + isStreamClosed = true + abortController.abort() + markExecutionCancelled(executionId).catch(() => {}) + }, + }) + + return new NextResponse(stream, { + headers: { ...SSE_HEADERS, 'X-Execution-Id': executionId }, + }) + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + logger.error(`[${requestId}] Failed to start run-from-block execution:`, error) + return NextResponse.json( + { error: errorMessage || 'Failed to start execution' }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 856a1a3c94..47f81ef122 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -53,6 +53,7 @@ const ExecuteWorkflowSchema = z.object({ parallels: z.record(z.any()).optional(), }) .optional(), + stopAfterBlockId: z.string().optional(), }) export const runtime = 'nodejs' @@ -222,6 +223,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: includeFileBase64, base64MaxBytes, workflowStateOverride, + stopAfterBlockId, } = validation.data // For API key and internal JWT auth, the entire body is the input (except for our control fields) @@ -237,6 +239,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: includeFileBase64, base64MaxBytes, workflowStateOverride, + stopAfterBlockId: _stopAfterBlockId, workflowId: _workflowId, // Also exclude workflowId used for internal JWT auth ...rest } = body @@ -434,6 +437,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: loggingSession, includeFileBase64, base64MaxBytes, + stopAfterBlockId, }) const outputWithBase64 = includeFileBase64 @@ -722,6 +726,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: abortSignal: abortController.signal, includeFileBase64, base64MaxBytes, + stopAfterBlockId, }) if (result.status === 'paused') { diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/action-bar/action-bar.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/action-bar/action-bar.tsx index 42d2c3e84e..aa65c7b30e 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/action-bar/action-bar.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/action-bar/action-bar.tsx @@ -1,11 +1,13 @@ import { memo, useCallback } from 'react' -import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut } from 'lucide-react' +import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut, Play } from 'lucide-react' import { Button, Copy, Tooltip, Trash2 } from '@/components/emcn' import { cn } from '@/lib/core/utils/cn' import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers' import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider' +import { useWorkflowExecution } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks' import { validateTriggerPaste } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils' import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow' +import { useExecutionStore } from '@/stores/execution' import { useNotificationStore } from '@/stores/notifications' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { useWorkflowStore } from '@/stores/workflows/workflow/store' @@ -49,6 +51,7 @@ export const ActionBar = memo( collaborativeBatchToggleBlockHandles, } = useCollaborativeWorkflow() const { setPendingSelection } = useWorkflowRegistry() + const { handleRunFromBlock } = useWorkflowExecution() const addNotification = useNotificationStore((s) => s.addNotification) @@ -97,12 +100,39 @@ export const ActionBar = memo( ) ) + const { activeWorkflowId } = useWorkflowRegistry() + const { isExecuting, getLastExecutionSnapshot } = useExecutionStore() const userPermissions = useUserPermissionsContext() + const edges = useWorkflowStore((state) => state.edges) const isStartBlock = isInputDefinitionTrigger(blockType) const isResponseBlock = blockType === 'response' const isNoteBlock = blockType === 'note' const isSubflowBlock = blockType === 'loop' || blockType === 'parallel' + const isInsideSubflow = parentId && (parentType === 'loop' || parentType === 'parallel') + + const snapshot = activeWorkflowId ? getLastExecutionSnapshot(activeWorkflowId) : null + const incomingEdges = edges.filter((edge) => edge.target === blockId) + const isTriggerBlock = incomingEdges.length === 0 + + // Check if each source block is either executed OR is a trigger block (triggers don't need prior execution) + const isSourceSatisfied = (sourceId: string) => { + if (snapshot?.executedBlocks.includes(sourceId)) return true + // Check if source is a trigger (has no incoming edges itself) + const sourceIncomingEdges = edges.filter((edge) => edge.target === sourceId) + return sourceIncomingEdges.length === 0 + } + + // Non-trigger blocks need a snapshot to exist (so upstream outputs are available) + const dependenciesSatisfied = + isTriggerBlock || (snapshot && incomingEdges.every((edge) => isSourceSatisfied(edge.source))) + const canRunFromBlock = + dependenciesSatisfied && !isNoteBlock && !isInsideSubflow && !isExecuting + + const handleRunFromBlockClick = useCallback(() => { + if (!activeWorkflowId || !canRunFromBlock) return + handleRunFromBlock(blockId, activeWorkflowId) + }, [blockId, activeWorkflowId, canRunFromBlock, handleRunFromBlock]) /** * Get appropriate tooltip message based on disabled state @@ -128,30 +158,35 @@ export const ActionBar = memo( 'dark:border-transparent dark:bg-[var(--surface-4)]' )} > - {!isNoteBlock && ( + {!isNoteBlock && !isInsideSubflow && ( - {getTooltipMessage(isEnabled ? 'Disable Block' : 'Enable Block')} + {(() => { + if (disabled) return getTooltipMessage('Run from block') + if (isExecuting) return 'Execution in progress' + if (!dependenciesSatisfied) return 'Run upstream blocks first' + return 'Run from block' + })()} )} - {isSubflowBlock && ( + {!isNoteBlock && (