Skip to content

Commit 3e85218

Browse files
improvement(hitl): streaming, async support + update docs (#4058)
* improvement(hitl): support streaming, async, update docs * update docs * fix tests * fix abort signal passthrough * module level const * fix form route * address comments * fix build
1 parent c5cc336 commit 3e85218

File tree

21 files changed

+513
-94
lines changed

21 files changed

+513
-94
lines changed

apps/docs/content/docs/en/blocks/human-in-the-loop.mdx

Lines changed: 80 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ Defines the fields approvers fill in when responding. This data becomes availabl
7878
}
7979
```
8080

81-
Access resume data in downstream blocks using `<blockId.resumeInput.fieldName>`.
81+
Access resume data in downstream blocks using `<blockId.fieldName>`.
8282

8383
## Approval Methods
8484

@@ -93,11 +93,12 @@ Access resume data in downstream blocks using `<blockId.resumeInput.fieldName>`.
9393
<Tab>
9494
### REST API
9595

96-
Programmatically resume workflows using the resume endpoint. The `contextId` is available from the block's `resumeEndpoint` output or from the paused execution detail.
96+
Programmatically resume workflows using the resume endpoint. The `contextId` is available from the block's `resumeEndpoint` output or from the `_resume` object in the paused execution response.
9797

9898
```bash
9999
POST /api/resume/{workflowId}/{executionId}/{contextId}
100100
Content-Type: application/json
101+
X-API-Key: your-api-key
101102

102103
{
103104
"input": {
@@ -107,23 +108,44 @@ Access resume data in downstream blocks using `<blockId.resumeInput.fieldName>`.
107108
}
108109
```
109110

110-
The response includes a new `executionId` for the resumed execution:
111+
The resume endpoint automatically respects the execution mode used in the original execute call:
112+
113+
- **Sync mode** (default) — The response waits for the remaining workflow to complete and returns the full result:
114+
115+
```json
116+
{
117+
"success": true,
118+
"status": "completed",
119+
"executionId": "<resumeExecutionId>",
120+
"output": { ... },
121+
"metadata": { "duration": 1234, "startTime": "...", "endTime": "..." }
122+
}
123+
```
124+
125+
If the resumed workflow hits another HITL block, the response returns `"status": "paused"` with new `_resume` URLs in the output.
126+
127+
- **Stream mode** (`stream: true` on the original execute call) — The resume response streams SSE events with `selectedOutputs` chunks, just like the initial execution.
128+
129+
- **Async mode** (`X-Execution-Mode: async` on the original execute call) — The resume dispatches execution to a background worker and returns immediately with `202`:
111130

112131
```json
113132
{
114133
"status": "started",
115134
"executionId": "<resumeExecutionId>",
116-
"message": "Resume execution started."
135+
"message": "Resume execution started asynchronously."
117136
}
118137
```
119138

120-
To poll execution progress after resuming, connect to the SSE stream:
139+
#### Polling execution status
140+
141+
To check on a paused execution or poll for completion after an async resume:
121142

122143
```bash
123-
GET /api/workflows/{workflowId}/executions/{resumeExecutionId}/stream
144+
GET /api/resume/{workflowId}/{executionId}
145+
X-API-Key: your-api-key
124146
```
125147

126-
Build custom approval UIs or integrate with existing systems.
148+
Returns the full paused execution detail with all pause points, their statuses, and resume links. Returns `404` when the execution has completed and is no longer paused.
127149
</Tab>
128150
<Tab>
129151
### Webhook
@@ -132,6 +154,53 @@ Access resume data in downstream blocks using `<blockId.resumeInput.fieldName>`.
132154
</Tab>
133155
</Tabs>
134156

157+
## API Execute Behavior
158+
159+
When triggering a workflow via the execute API (`POST /api/workflows/{id}/execute`), HITL blocks cause the execution to pause and return the `_resume` data in the response:
160+
161+
<Tabs items={['Sync (JSON)', 'Stream (SSE)', 'Async']}>
162+
<Tab>
163+
The response includes the full pause data with resume URLs:
164+
165+
```json
166+
{
167+
"success": true,
168+
"executionId": "<executionId>",
169+
"output": {
170+
"data": {
171+
"operation": "human",
172+
"_resume": {
173+
"apiUrl": "/api/resume/{workflowId}/{executionId}/{contextId}",
174+
"uiUrl": "/resume/{workflowId}/{executionId}",
175+
"contextId": "<contextId>",
176+
"executionId": "<executionId>",
177+
"workflowId": "<workflowId>"
178+
}
179+
}
180+
}
181+
}
182+
```
183+
</Tab>
184+
<Tab>
185+
Blocks before the HITL stream their `selectedOutputs` normally. When execution pauses, the final SSE event includes `status: "paused"` and the `_resume` data:
186+
187+
```
188+
data: {"blockId":"agent1","chunk":"streamed content..."}
189+
data: {"event":"final","data":{"success":true,"output":{...,"_resume":{...}},"status":"paused"}}
190+
data: "[DONE]"
191+
```
192+
193+
On resume, blocks after the HITL stream their `selectedOutputs` the same way.
194+
195+
<Callout type="info">
196+
HITL blocks are automatically excluded from the `selectedOutputs` dropdown since their data is always included in the pause response.
197+
</Callout>
198+
</Tab>
199+
<Tab>
200+
Returns `202` immediately. Use the polling endpoint to check when the execution pauses.
201+
</Tab>
202+
</Tabs>
203+
135204
## Common Use Cases
136205

137206
**Content Approval** - Review AI-generated content before publishing
@@ -161,9 +230,9 @@ Agent (Generate) → Human in the Loop (QA) → Gmail (Send)
161230
**`response`** - Display data shown to the approver (json)
162231
**`submission`** - Form submission data from the approver (json)
163232
**`submittedAt`** - ISO timestamp when the workflow was resumed
164-
**`resumeInput.*`** - All fields defined in Resume Form become available after the workflow resumes
233+
**`<fieldName>`** - All fields defined in Resume Form become available at the top level after the workflow resumes
165234

166-
Access using `<blockId.resumeInput.fieldName>`.
235+
Access using `<blockId.fieldName>`.
167236

168237
## Example
169238

@@ -187,7 +256,7 @@ Access using `<blockId.resumeInput.fieldName>`.
187256
**Downstream Usage:**
188257
```javascript
189258
// Condition block
190-
<approval1.resumeInput.approved> === true
259+
<approval1.approved> === true
191260
```
192261
The example below shows an approval portal as seen by an approver after the workflow is paused. Approvers can review the data and provide inputs as a part of the workflow resumption. The approval portal can be accessed directly via the unique URL, `<blockId.url>`.
193262

@@ -204,7 +273,7 @@ The example below shows an approval portal as seen by an approver after the work
204273
<FAQ items={[
205274
{ question: "How long does the workflow stay paused?", answer: "The workflow pauses indefinitely until a human provides input through the approval portal, the REST API, or a webhook. There is no automatic timeout — it will wait until someone responds." },
206275
{ question: "What notification channels can I use to alert approvers?", answer: "You can configure notifications through Slack, Gmail, Microsoft Teams, SMS (via Twilio), or custom webhooks. Include the approval URL in your notification message so approvers can access the portal directly." },
207-
{ question: "How do I access the approver's input in downstream blocks?", answer: "Use the syntax <blockId.resumeInput.fieldName> to reference specific fields from the resume form. For example, if your block ID is 'approval1' and the form has an 'approved' field, use <approval1.resumeInput.approved>." },
276+
{ question: "How do I access the approver's input in downstream blocks?", answer: "Use the syntax <blockId.fieldName> to reference specific fields from the resume form. For example, if your block name is 'approval1' and the form has an 'approved' field, use <approval1.approved>." },
208277
{ question: "Can I chain multiple Human in the Loop blocks for multi-stage approvals?", answer: "Yes. You can place multiple Human in the Loop blocks in sequence to create multi-stage approval workflows. Each block pauses independently and can have its own notification configuration and resume form fields." },
209278
{ question: "Can I resume the workflow programmatically without the portal?", answer: "Yes. Each block exposes a resume API endpoint that you can call with a POST request containing the form data as JSON. This lets you build custom approval UIs or integrate with existing systems like Jira or ServiceNow." },
210279
{ question: "What outputs are available after the workflow resumes?", answer: "The block outputs include the approval portal URL, the resume API endpoint URL, the display data shown to the approver, the form submission data, the raw resume input, and an ISO timestamp of when the workflow was resumed." },

apps/sim/app/api/chat/[identifier]/route.test.ts

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -410,14 +410,7 @@ describe('Chat Identifier API Route', () => {
410410

411411
expect(createStreamingResponse).toHaveBeenCalledWith(
412412
expect.objectContaining({
413-
workflow: expect.objectContaining({
414-
id: 'workflow-id',
415-
userId: 'user-id',
416-
}),
417-
input: expect.objectContaining({
418-
input: 'Hello world',
419-
conversationId: 'conv-123',
420-
}),
413+
executeFn: expect.any(Function),
421414
streamConfig: expect.objectContaining({
422415
isSecureMode: true,
423416
workflowTriggerType: 'chat',
@@ -494,9 +487,9 @@ describe('Chat Identifier API Route', () => {
494487

495488
expect(createStreamingResponse).toHaveBeenCalledWith(
496489
expect.objectContaining({
497-
input: expect.objectContaining({
498-
input: 'Hello world',
499-
conversationId: 'test-conversation-123',
490+
executeFn: expect.any(Function),
491+
streamConfig: expect.objectContaining({
492+
workflowTriggerType: 'chat',
500493
}),
501494
})
502495
)
@@ -510,9 +503,7 @@ describe('Chat Identifier API Route', () => {
510503

511504
expect(createStreamingResponse).toHaveBeenCalledWith(
512505
expect.objectContaining({
513-
input: expect.objectContaining({
514-
input: 'Hello world',
515-
}),
506+
executeFn: expect.any(Function),
516507
})
517508
)
518509
})

apps/sim/app/api/chat/[identifier]/route.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ export async function POST(
199199
}
200200

201201
const { createStreamingResponse } = await import('@/lib/workflows/streaming/streaming')
202+
const { executeWorkflow } = await import('@/lib/workflows/executor/execute-workflow')
202203
const { SSE_HEADERS } = await import('@/lib/core/utils/sse')
203204

204205
const workflowInput: any = { input, conversationId }
@@ -252,15 +253,31 @@ export async function POST(
252253

253254
const stream = await createStreamingResponse({
254255
requestId,
255-
workflow: workflowForExecution,
256-
input: workflowInput,
257-
executingUserId: workspaceOwnerId,
258256
streamConfig: {
259257
selectedOutputs,
260258
isSecureMode: true,
261259
workflowTriggerType: 'chat',
262260
},
263261
executionId,
262+
executeFn: async ({ onStream, onBlockComplete, abortSignal }) =>
263+
executeWorkflow(
264+
workflowForExecution,
265+
requestId,
266+
workflowInput,
267+
workspaceOwnerId,
268+
{
269+
enabled: true,
270+
selectedOutputs,
271+
isSecureMode: true,
272+
workflowTriggerType: 'chat',
273+
onStream,
274+
onBlockComplete,
275+
skipLoggingComplete: true,
276+
abortSignal,
277+
executionMode: 'stream',
278+
},
279+
executionId
280+
),
264281
})
265282

266283
const streamResponse = new NextResponse(stream, {

apps/sim/app/api/form/[identifier]/route.ts

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { generateRequestId } from '@/lib/core/utils/request'
99
import { generateId } from '@/lib/core/utils/uuid'
1010
import { preprocessExecution } from '@/lib/execution/preprocessing'
1111
import { LoggingSession } from '@/lib/logs/execution/logging-session'
12+
import { executeWorkflow } from '@/lib/workflows/executor/execute-workflow'
1213
import { normalizeInputFormatValue } from '@/lib/workflows/input-format'
1314
import { createStreamingResponse } from '@/lib/workflows/streaming/streaming'
1415
import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers'
@@ -216,18 +217,33 @@ export async function POST(
216217
...formData, // Spread form fields at top level for convenience
217218
}
218219

219-
// Execute workflow using streaming (for consistency with chat)
220220
const stream = await createStreamingResponse({
221221
requestId,
222-
workflow: workflowForExecution,
223-
input: workflowInput,
224-
executingUserId: workspaceOwnerId,
225222
streamConfig: {
226223
selectedOutputs: [],
227224
isSecureMode: true,
228-
workflowTriggerType: 'api', // Use 'api' type since form is similar
225+
workflowTriggerType: 'api',
229226
},
230227
executionId,
228+
executeFn: async ({ onStream, onBlockComplete, abortSignal }) =>
229+
executeWorkflow(
230+
workflowForExecution,
231+
requestId,
232+
workflowInput,
233+
workspaceOwnerId,
234+
{
235+
enabled: true,
236+
selectedOutputs: [],
237+
isSecureMode: true,
238+
workflowTriggerType: 'api',
239+
onStream,
240+
onBlockComplete,
241+
skipLoggingComplete: true,
242+
abortSignal,
243+
executionMode: 'sync',
244+
},
245+
executionId
246+
),
231247
})
232248

233249
// For forms, we don't stream back - we wait for completion and return success

0 commit comments

Comments
 (0)