diff --git a/.changeset/icy-worlds-judge.md b/.changeset/icy-worlds-judge.md new file mode 100644 index 000000000..fbcc0b81c --- /dev/null +++ b/.changeset/icy-worlds-judge.md @@ -0,0 +1,5 @@ +--- +'@openfn/ws-worker': minor +--- + +on send complete, only send final_state for multi-leaf output state. For single-leaf runs, reutrn final_dataclip_id diff --git a/packages/lexicon/lightning.d.ts b/packages/lexicon/lightning.d.ts index 84563638d..870696462 100644 --- a/packages/lexicon/lightning.d.ts +++ b/packages/lexicon/lightning.d.ts @@ -169,7 +169,8 @@ export type RunStartReply = {}; // no payload export type RunCompletePayload = ExitReason & { timestamp: TimeInMicroSeconds; - final_state?: any; // The aggregated final state from the workflow (handles branching) + final_dataclip_id?: string; // Single-leaf: reuse the step's output dataclip + final_state?: any; // Multi-leaf: aggregated final state keyed by step id }; export type RunCompleteReply = undefined; diff --git a/packages/lightning-mock/src/api-dev.ts b/packages/lightning-mock/src/api-dev.ts index d8deec027..b6de55f0a 100644 --- a/packages/lightning-mock/src/api-dev.ts +++ b/packages/lightning-mock/src/api-dev.ts @@ -201,7 +201,12 @@ const setupDevAPI = ( }) => { if (evt.runId === runId) { state.events.removeListener(RUN_COMPLETE, handler); - resolve(evt.payload.final_state); + const { final_dataclip_id, final_state } = evt.payload; + if (final_dataclip_id) { + resolve(state.dataclips?.[final_dataclip_id] ?? null); + } else { + resolve(final_state); + } } }; state.events.addListener(RUN_COMPLETE, handler); diff --git a/packages/lightning-mock/src/api-sockets.ts b/packages/lightning-mock/src/api-sockets.ts index 6f0297d0c..20cbcbeb0 100644 --- a/packages/lightning-mock/src/api-sockets.ts +++ b/packages/lightning-mock/src/api-sockets.ts @@ -393,6 +393,7 @@ const createSocketAPI = ( ) { const { ref, join_ref, topic } = evt; const { + final_dataclip_id, final_state, reason, error_type, @@ -410,7 +411,9 @@ const createSocketAPI = ( if (!state.results[runId]) { state.results[runId] = { state: null, workerId: 'mock' }; } - if (final_state) { + if (final_dataclip_id) { + state.results[runId].state = state.dataclips?.[final_dataclip_id] ?? null; + } else if (final_state) { state.results[runId].state = final_state; } diff --git a/packages/ws-worker/src/events/run-complete.ts b/packages/ws-worker/src/events/run-complete.ts index e8335ade3..0bc4d9073 100644 --- a/packages/ws-worker/src/events/run-complete.ts +++ b/packages/ws-worker/src/events/run-complete.ts @@ -14,19 +14,28 @@ export default async function onWorkflowComplete( ) { const { state, onFinish, logger } = context; - // Use the aggregated final state from the runtime - // This handles branching workflows correctly by returning all leaf states const result = event.state; const reason = calculateRunExitReason(state); await logFinalReason(context, reason); + const isSingleLeaf = + state.leafDataclipIds.length === 1 && + !state.withheldDataclips[state.leafDataclipIds[0]]; + + const payload: RunCompletePayload = { + timestamp: timeInMicroseconds(event.time), + ...reason, + }; + + if (isSingleLeaf) { + payload.final_dataclip_id = state.leafDataclipIds[0]; + } else { + payload.final_state = result; + } + try { - await sendEvent(context, RUN_COMPLETE, { - final_state: result, - timestamp: timeInMicroseconds(event.time), - ...reason, - }); + await sendEvent(context, RUN_COMPLETE, payload); } catch (e) { logger.error( `${state.plan.id} failed to send ${RUN_COMPLETE} event. This run will be lost!` diff --git a/packages/ws-worker/src/events/step-complete.ts b/packages/ws-worker/src/events/step-complete.ts index 2ac4e8eac..9db844ccf 100644 --- a/packages/ws-worker/src/events/step-complete.ts +++ b/packages/ws-worker/src/events/step-complete.ts @@ -32,13 +32,10 @@ export default async function onStepComplete( delete state.activeStep; delete state.activeJob; - // TODO right now, the last job to run will be the result for the run - // this may not stand up in the future - // I'd feel happer if the runtime could judge what the final result is - // (taking into account branches and stuff) - // The problem is that the runtime will return the object, not an id, - // so we have a bit of a mapping problem - state.lastDataclipId = dataclipId; + // Track leaf dataclips (steps with no downstream jobs) + if (!event.next?.length) { + state.leafDataclipIds.push(dataclipId); + } // Set the input dataclip id for downstream jobs event.next?.forEach((nextJobId) => { diff --git a/packages/ws-worker/src/types.d.ts b/packages/ws-worker/src/types.d.ts index 974a79439..0072af8c2 100644 --- a/packages/ws-worker/src/types.d.ts +++ b/packages/ws-worker/src/types.d.ts @@ -18,8 +18,8 @@ export type RunState = { withheldDataclips: Record; reasons: Record; - // final dataclip id - lastDataclipId?: string; + // dataclip ids for leaf nodes (steps with no downstream) + leafDataclipIds: string[]; }; export type CancelablePromise = Promise & { diff --git a/packages/ws-worker/src/util/create-run-state.ts b/packages/ws-worker/src/util/create-run-state.ts index deaa76aad..5fca6a74a 100644 --- a/packages/ws-worker/src/util/create-run-state.ts +++ b/packages/ws-worker/src/util/create-run-state.ts @@ -3,7 +3,7 @@ import type { RunState } from '../types'; export default (plan: ExecutionPlan, input?: Lazy): RunState => { const state = { - lastDataclipId: '', + leafDataclipIds: [], dataclips: {}, inputDataclips: {}, withheldDataclips: {}, diff --git a/packages/ws-worker/test/events/run-complete.test.ts b/packages/ws-worker/test/events/run-complete.test.ts index 2eed41978..840ca4ecc 100644 --- a/packages/ws-worker/test/events/run-complete.test.ts +++ b/packages/ws-worker/test/events/run-complete.test.ts @@ -8,17 +8,43 @@ import { RUN_COMPLETE, RUN_LOG } from '../../src/events'; import { createRunState } from '../../src/util'; import { createPlan } from '../util'; -test('should send a run:complete event', async (t) => { +test('should send a run:complete event with final_dataclip_id for single leaf', async (t) => { const result = { answer: 42 }; const plan = createPlan(); const state = createRunState(plan); + state.leafDataclipIds = ['clip-1']; + + const channel = mockChannel({ + [RUN_LOG]: () => true, + [RUN_COMPLETE]: (evt) => { + t.is(evt.final_dataclip_id, 'clip-1'); + t.falsy(evt.final_state); + t.falsy(evt.time); + }, + }); + + const event: any = { state: result }; + + const context: any = { channel, state, onFinish: () => {} }; + await handleRunComplete(context, event); +}); + +test('should send final_state when there are multiple leaves', async (t) => { + const result = { + 'job-a': { data: { a: true } }, + 'job-b': { data: { b: true } }, + }; + const plan = createPlan(); + + const state = createRunState(plan); + state.leafDataclipIds = ['clip-1', 'clip-2']; const channel = mockChannel({ [RUN_LOG]: () => true, [RUN_COMPLETE]: (evt) => { t.deepEqual(evt.final_state, result); - t.falsy(evt.time); // if no timestamp in the engine event, no timestamp in the worker one + t.falsy(evt.final_dataclip_id); }, }); @@ -240,9 +266,11 @@ test('should call onFinish even if the lightning event timesout', async (t) => { await handleRunComplete(context, event); }); -test('should send final_state for a linear workflow', async (t) => { +test('should send final_dataclip_id for a single-leaf workflow', async (t) => { const plan = createPlan(); const state = createRunState(plan); + state.leafDataclipIds = ['abc-123']; + const finalResult = { data: { count: 100 }, references: [] }; let completeEvent: any; @@ -264,15 +292,16 @@ test('should send final_state for a linear workflow', async (t) => { await handleRunComplete(context, event); - t.deepEqual(completeEvent.final_state, finalResult); + t.is(completeEvent.final_dataclip_id, 'abc-123'); + t.falsy(completeEvent.final_state); t.is(completeEvent.reason, 'success'); }); test('should send final_state for a branching workflow with multiple leaf nodes', async (t) => { const plan = createPlan(); const state = createRunState(plan); + state.leafDataclipIds = ['clip-1', 'clip-2', 'clip-3']; - // Simulate a branching workflow with multiple final states const branchedResult = { 'job-1': { data: { path: 'A', value: 42 } }, 'job-2': { data: { path: 'B', value: 84 } }, @@ -292,7 +321,6 @@ test('should send final_state for a branching workflow with multiple leaf nodes' channel, state, onFinish: ({ state: finalState }: any) => { - // Verify that onFinish receives the branched result t.deepEqual(finalState, branchedResult); }, }; @@ -301,19 +329,80 @@ test('should send final_state for a branching workflow with multiple leaf nodes' await handleRunComplete(context, event); - // Verify the event contains the full branched state structure t.deepEqual(completeEvent.final_state, branchedResult); + t.falsy(completeEvent.final_dataclip_id); t.is(completeEvent.reason, 'success'); - t.truthy(completeEvent.final_state['job-1']); - t.truthy(completeEvent.final_state['job-2']); - t.truthy(completeEvent.final_state['job-3']); +}); + +test('should send final_state when single leaf dataclip was withheld', async (t) => { + const plan = createPlan(); + const state = createRunState(plan); + state.leafDataclipIds = ['clip-1']; + state.withheldDataclips = { 'clip-1': true }; + + const result = { data: { big: 'data' } }; + + let completeEvent: any; + + const channel = mockChannel({ + [RUN_LOG]: () => true, + [RUN_COMPLETE]: (evt) => { + completeEvent = evt; + }, + }); + + const context: any = { + channel, + state, + onFinish: () => {}, + }; + + const event: any = { state: result }; + + await handleRunComplete(context, event); + + t.deepEqual(completeEvent.final_state, result); + t.falsy(completeEvent.final_dataclip_id); +}); + +test('should send final_state when a single leaf node is reached by two paths', async (t) => { + const plan = createPlan(); + const state = createRunState(plan); + // Same node executed twice via different paths produces two leaf dataclips + state.leafDataclipIds = ['clip-1', 'clip-2']; + + const result = { + x: { data: { from: 'a' } }, + 'x-1': { data: { from: 'b' } }, + }; + + let completeEvent: any; + + const channel = mockChannel({ + [RUN_LOG]: () => true, + [RUN_COMPLETE]: (evt) => { + completeEvent = evt; + }, + }); + + const context: any = { + channel, + state, + onFinish: () => {}, + }; + + const event: any = { state: result }; + + await handleRunComplete(context, event); + + t.deepEqual(completeEvent.final_state, result); + t.falsy(completeEvent.final_dataclip_id); }); test('should properly serialize final_state as JSON', async (t) => { const plan = createPlan(); const state = createRunState(plan); - // Test with complex state including nested objects, arrays, and special values const complexState = { data: { users: [ @@ -348,12 +437,10 @@ test('should properly serialize final_state as JSON', async (t) => { await handleRunComplete(context, event); - // Verify the state is properly preserved t.deepEqual(completeEvent.final_state, complexState); t.deepEqual(completeEvent.final_state.data.users[0], { id: 1, name: 'Alice' }); t.is(completeEvent.final_state.data.metadata.nested.deeply.value, 42); - // Verify it can be stringified (simulating what happens when sent over the wire) const jsonString = JSON.stringify(completeEvent.final_state); const parsed = JSON.parse(jsonString); t.deepEqual(parsed, complexState); @@ -363,7 +450,6 @@ test('should handle Uint8Array in final_state', async (t) => { const plan = createPlan(); const state = createRunState(plan); - // Test with Uint8Array which needs special handling const stateWithBinary = { data: { buffer: new Uint8Array([1, 2, 3, 4, 5]) }, }; @@ -387,6 +473,5 @@ test('should handle Uint8Array in final_state', async (t) => { await handleRunComplete(context, event); - // Verify the Uint8Array is preserved in the event t.deepEqual(completeEvent.final_state.data.buffer, new Uint8Array([1, 2, 3, 4, 5])); }); diff --git a/packages/ws-worker/test/events/run-error.test.ts b/packages/ws-worker/test/events/run-error.test.ts index 41034cf5d..6dd9b51f9 100644 --- a/packages/ws-worker/test/events/run-error.test.ts +++ b/packages/ws-worker/test/events/run-error.test.ts @@ -11,7 +11,7 @@ test('runError should trigger runComplete with a reason', async (t) => { const jobId = 'job-1'; const state = createRunState(plan); - state.lastDataclipId = 'x'; + state.activeStep = 'b'; state.activeJob = jobId; @@ -40,7 +40,7 @@ test('workflow error should send reason to onFinish', async (t) => { const jobId = 'job-1'; const state = createRunState(plan); - state.lastDataclipId = 'x'; + state.activeStep = 'b'; state.activeJob = jobId; @@ -72,7 +72,7 @@ test('workflow error should send reason to onFinish', async (t) => { test('runError should not call job complete if the job is not active', async (t) => { const state = createRunState(plan); - state.lastDataclipId = 'x'; + const channel = mockChannel({ [RUN_LOG]: () => true, @@ -113,7 +113,7 @@ test('runError should log the reason', async (t) => { }, options: {}, }); - state.lastDataclipId = 'x'; + state.activeStep = 'b'; state.activeJob = jobId; diff --git a/packages/ws-worker/test/events/step-complete.test.ts b/packages/ws-worker/test/events/step-complete.test.ts index af476d75b..7a74f872d 100644 --- a/packages/ws-worker/test/events/step-complete.test.ts +++ b/packages/ws-worker/test/events/step-complete.test.ts @@ -283,3 +283,114 @@ test('should include a timestamp', async (t) => { const context: any = { channel, state, onFinish: () => {} }; await handleStepComplete(context, event); }); + +test('track leaf dataclip when step has no downstream jobs', async (t) => { + const plan = createPlan(); + + const state = createRunState(plan); + state.activeJob = 'job-1'; + state.activeStep = 'b'; + + const channel = mockChannel({ + [STEP_COMPLETE]: () => true, + }); + + const event = { state: { x: 10 }, next: [] } as any; + await handleStepComplete({ channel, state } as any, event); + + t.is(state.leafDataclipIds.length, 1); +}); + +test('track leaf dataclip when step has undefined next', async (t) => { + const plan = createPlan(); + + const state = createRunState(plan); + state.activeJob = 'job-1'; + state.activeStep = 'b'; + + const channel = mockChannel({ + [STEP_COMPLETE]: () => true, + }); + + const event = { state: { x: 10 } } as any; + await handleStepComplete({ channel, state } as any, event); + + t.is(state.leafDataclipIds.length, 1); +}); + +test('do not track leaf dataclip when step has downstream jobs', async (t) => { + const plan = createPlan(); + + const state = createRunState(plan); + state.activeJob = 'job-1'; + state.activeStep = 'b'; + + const channel = mockChannel({ + [STEP_COMPLETE]: () => true, + }); + + const event = { state: { x: 10 }, next: ['job-2'] } as any; + await handleStepComplete({ channel, state } as any, event); + + t.is(state.leafDataclipIds.length, 0); +}); + +// Multiple leaf nodes: start → job-a (leaf), start → job-b (leaf) +test('accumulate multiple leaf dataclips for branching workflow', async (t) => { + const plan = createPlan(); + const state = createRunState(plan); + + const channel = mockChannel({ + [STEP_COMPLETE]: () => true, + }); + + // First leaf completes + state.activeJob = 'job-a'; + state.activeStep = 'step-a'; + await handleStepComplete( + { channel, state } as any, + { state: { a: true }, next: [] } as any + ); + + // Second leaf completes + state.activeJob = 'job-b'; + state.activeStep = 'step-b'; + await handleStepComplete( + { channel, state } as any, + { state: { b: true }, next: [] } as any + ); + + t.is(state.leafDataclipIds.length, 2); + // Each leaf gets a distinct dataclip id + t.not(state.leafDataclipIds[0], state.leafDataclipIds[1]); +}); + +// Single leaf reached by two paths: start → a → x, start → b → x +// x executes twice, both times with no downstream +test('accumulate two leaf dataclips when same node reached by two paths', async (t) => { + const plan = createPlan(); + const state = createRunState(plan); + + const channel = mockChannel({ + [STEP_COMPLETE]: () => true, + }); + + // x completes first time (via path a) + state.activeJob = 'job-x'; + state.activeStep = 'step-x'; + await handleStepComplete( + { channel, state } as any, + { state: { from: 'a' }, next: [] } as any + ); + + // x completes second time (via path b) + state.activeJob = 'job-x'; + state.activeStep = 'step-x-1'; + await handleStepComplete( + { channel, state } as any, + { state: { from: 'b' }, next: [] } as any + ); + + t.is(state.leafDataclipIds.length, 2); + t.not(state.leafDataclipIds[0], state.leafDataclipIds[1]); +}); diff --git a/packages/ws-worker/test/lightning.test.ts b/packages/ws-worker/test/lightning.test.ts index 94b595f97..4da54ffa3 100644 --- a/packages/ws-worker/test/lightning.test.ts +++ b/packages/ws-worker/test/lightning.test.ts @@ -291,8 +291,11 @@ test.serial( return new Promise((done) => { const run = getRun(); lng.onSocketEvent(e.RUN_COMPLETE, run.id, (evt: any) => { - const { final_state } = evt.payload; - t.assert(typeof final_state === 'object'); + const { final_dataclip_id, final_state } = evt.payload; + t.assert( + typeof final_dataclip_id === 'string' || + typeof final_state === 'object' + ); t.pass('run complete event received'); done(); }); diff --git a/packages/ws-worker/test/util/create-run-state.test.ts b/packages/ws-worker/test/util/create-run-state.test.ts index 8df4f5cf8..9b36f811a 100644 --- a/packages/ws-worker/test/util/create-run-state.test.ts +++ b/packages/ws-worker/test/util/create-run-state.test.ts @@ -18,7 +18,7 @@ test('create run', (t) => { const run = createRunState(plan, input); t.deepEqual(run.plan, plan); - t.deepEqual(run.lastDataclipId, ''); + t.deepEqual(run.leafDataclipIds, []); t.deepEqual(run.dataclips, {}); t.deepEqual(run.inputDataclips, {}); t.deepEqual(run.reasons, {});