Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/icy-worlds-judge.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': minor
---

on send complete, only send final_state for multi-leaf output state. For single-leaf runs, reutrn final_dataclip_id
3 changes: 2 additions & 1 deletion packages/lexicon/lightning.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ export type RunStartReply = {}; // no payload

export type RunCompletePayload = ExitReason & {
timestamp: TimeInMicroSeconds;
final_state?: any; // The aggregated final state from the workflow (handles branching)
final_dataclip_id?: string; // Single-leaf: reuse the step's output dataclip
final_state?: any; // Multi-leaf: aggregated final state keyed by step id
};
export type RunCompleteReply = undefined;

Expand Down
7 changes: 6 additions & 1 deletion packages/lightning-mock/src/api-dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ const setupDevAPI = (
}) => {
if (evt.runId === runId) {
state.events.removeListener(RUN_COMPLETE, handler);
resolve(evt.payload.final_state);
const { final_dataclip_id, final_state } = evt.payload;
if (final_dataclip_id) {
resolve(state.dataclips?.[final_dataclip_id] ?? null);
} else {
resolve(final_state);
}
}
};
state.events.addListener(RUN_COMPLETE, handler);
Expand Down
5 changes: 4 additions & 1 deletion packages/lightning-mock/src/api-sockets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ const createSocketAPI = (
) {
const { ref, join_ref, topic } = evt;
const {
final_dataclip_id,
final_state,
reason,
error_type,
Expand All @@ -410,7 +411,9 @@ const createSocketAPI = (
if (!state.results[runId]) {
state.results[runId] = { state: null, workerId: 'mock' };
}
if (final_state) {
if (final_dataclip_id) {
state.results[runId].state = state.dataclips?.[final_dataclip_id] ?? null;
} else if (final_state) {
state.results[runId].state = final_state;
}

Expand Down
23 changes: 16 additions & 7 deletions packages/ws-worker/src/events/run-complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,28 @@ export default async function onWorkflowComplete(
) {
const { state, onFinish, logger } = context;

// Use the aggregated final state from the runtime
// This handles branching workflows correctly by returning all leaf states
const result = event.state;

const reason = calculateRunExitReason(state);
await logFinalReason(context, reason);

const isSingleLeaf =
state.leafDataclipIds.length === 1 &&
!state.withheldDataclips[state.leafDataclipIds[0]];

const payload: RunCompletePayload = {
timestamp: timeInMicroseconds(event.time),
...reason,
};

if (isSingleLeaf) {
payload.final_dataclip_id = state.leafDataclipIds[0];
} else {
payload.final_state = result;
}

try {
await sendEvent<RunCompletePayload>(context, RUN_COMPLETE, {
final_state: result,
timestamp: timeInMicroseconds(event.time),
...reason,
});
await sendEvent<RunCompletePayload>(context, RUN_COMPLETE, payload);
} catch (e) {
logger.error(
`${state.plan.id} failed to send ${RUN_COMPLETE} event. This run will be lost!`
Expand Down
11 changes: 4 additions & 7 deletions packages/ws-worker/src/events/step-complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,10 @@ export default async function onStepComplete(
delete state.activeStep;
delete state.activeJob;

// TODO right now, the last job to run will be the result for the run
// this may not stand up in the future
// I'd feel happer if the runtime could judge what the final result is
// (taking into account branches and stuff)
// The problem is that the runtime will return the object, not an id,
// so we have a bit of a mapping problem
state.lastDataclipId = dataclipId;
// Track leaf dataclips (steps with no downstream jobs)
if (!event.next?.length) {
state.leafDataclipIds.push(dataclipId);
}

// Set the input dataclip id for downstream jobs
event.next?.forEach((nextJobId) => {
Expand Down
4 changes: 2 additions & 2 deletions packages/ws-worker/src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ export type RunState = {
withheldDataclips: Record<string, true>;
reasons: Record<string, ExitReason>;

// final dataclip id
lastDataclipId?: string;
// dataclip ids for leaf nodes (steps with no downstream)
leafDataclipIds: string[];
};

export type CancelablePromise = Promise<void> & {
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/src/util/create-run-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { RunState } from '../types';

export default (plan: ExecutionPlan, input?: Lazy<State>): RunState => {
const state = {
lastDataclipId: '',
leafDataclipIds: [],
dataclips: {},
inputDataclips: {},
withheldDataclips: {},
Expand Down
115 changes: 100 additions & 15 deletions packages/ws-worker/test/events/run-complete.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,43 @@ import { RUN_COMPLETE, RUN_LOG } from '../../src/events';
import { createRunState } from '../../src/util';
import { createPlan } from '../util';

test('should send a run:complete event', async (t) => {
test('should send a run:complete event with final_dataclip_id for single leaf', async (t) => {
const result = { answer: 42 };
const plan = createPlan();

const state = createRunState(plan);
state.leafDataclipIds = ['clip-1'];

const channel = mockChannel({
[RUN_LOG]: () => true,
[RUN_COMPLETE]: (evt) => {
t.is(evt.final_dataclip_id, 'clip-1');
t.falsy(evt.final_state);
t.falsy(evt.time);
},
});

const event: any = { state: result };

const context: any = { channel, state, onFinish: () => {} };
await handleRunComplete(context, event);
});

test('should send final_state when there are multiple leaves', async (t) => {
const result = {
'job-a': { data: { a: true } },
'job-b': { data: { b: true } },
};
const plan = createPlan();

const state = createRunState(plan);
state.leafDataclipIds = ['clip-1', 'clip-2'];

const channel = mockChannel({
[RUN_LOG]: () => true,
[RUN_COMPLETE]: (evt) => {
t.deepEqual(evt.final_state, result);
t.falsy(evt.time); // if no timestamp in the engine event, no timestamp in the worker one
t.falsy(evt.final_dataclip_id);
},
});

Expand Down Expand Up @@ -240,9 +266,11 @@ test('should call onFinish even if the lightning event timesout', async (t) => {
await handleRunComplete(context, event);
});

test('should send final_state for a linear workflow', async (t) => {
test('should send final_dataclip_id for a single-leaf workflow', async (t) => {
const plan = createPlan();
const state = createRunState(plan);
state.leafDataclipIds = ['abc-123'];

const finalResult = { data: { count: 100 }, references: [] };

let completeEvent: any;
Expand All @@ -264,15 +292,16 @@ test('should send final_state for a linear workflow', async (t) => {

await handleRunComplete(context, event);

t.deepEqual(completeEvent.final_state, finalResult);
t.is(completeEvent.final_dataclip_id, 'abc-123');
t.falsy(completeEvent.final_state);
t.is(completeEvent.reason, 'success');
});

test('should send final_state for a branching workflow with multiple leaf nodes', async (t) => {
const plan = createPlan();
const state = createRunState(plan);
state.leafDataclipIds = ['clip-1', 'clip-2', 'clip-3'];

// Simulate a branching workflow with multiple final states
const branchedResult = {
'job-1': { data: { path: 'A', value: 42 } },
'job-2': { data: { path: 'B', value: 84 } },
Expand All @@ -292,7 +321,6 @@ test('should send final_state for a branching workflow with multiple leaf nodes'
channel,
state,
onFinish: ({ state: finalState }: any) => {
// Verify that onFinish receives the branched result
t.deepEqual(finalState, branchedResult);
},
};
Expand All @@ -301,19 +329,80 @@ test('should send final_state for a branching workflow with multiple leaf nodes'

await handleRunComplete(context, event);

// Verify the event contains the full branched state structure
t.deepEqual(completeEvent.final_state, branchedResult);
t.falsy(completeEvent.final_dataclip_id);
t.is(completeEvent.reason, 'success');
t.truthy(completeEvent.final_state['job-1']);
t.truthy(completeEvent.final_state['job-2']);
t.truthy(completeEvent.final_state['job-3']);
});

test('should send final_state when single leaf dataclip was withheld', async (t) => {
const plan = createPlan();
const state = createRunState(plan);
state.leafDataclipIds = ['clip-1'];
state.withheldDataclips = { 'clip-1': true };

const result = { data: { big: 'data' } };

let completeEvent: any;

const channel = mockChannel({
[RUN_LOG]: () => true,
[RUN_COMPLETE]: (evt) => {
completeEvent = evt;
},
});

const context: any = {
channel,
state,
onFinish: () => {},
};

const event: any = { state: result };

await handleRunComplete(context, event);

t.deepEqual(completeEvent.final_state, result);
t.falsy(completeEvent.final_dataclip_id);
});

test('should send final_state when a single leaf node is reached by two paths', async (t) => {
const plan = createPlan();
const state = createRunState(plan);
// Same node executed twice via different paths produces two leaf dataclips
state.leafDataclipIds = ['clip-1', 'clip-2'];

const result = {
x: { data: { from: 'a' } },
'x-1': { data: { from: 'b' } },
};

let completeEvent: any;

const channel = mockChannel({
[RUN_LOG]: () => true,
[RUN_COMPLETE]: (evt) => {
completeEvent = evt;
},
});

const context: any = {
channel,
state,
onFinish: () => {},
};

const event: any = { state: result };

await handleRunComplete(context, event);

t.deepEqual(completeEvent.final_state, result);
t.falsy(completeEvent.final_dataclip_id);
});

test('should properly serialize final_state as JSON', async (t) => {
const plan = createPlan();
const state = createRunState(plan);

// Test with complex state including nested objects, arrays, and special values
const complexState = {
data: {
users: [
Expand Down Expand Up @@ -348,12 +437,10 @@ test('should properly serialize final_state as JSON', async (t) => {

await handleRunComplete(context, event);

// Verify the state is properly preserved
t.deepEqual(completeEvent.final_state, complexState);
t.deepEqual(completeEvent.final_state.data.users[0], { id: 1, name: 'Alice' });
t.is(completeEvent.final_state.data.metadata.nested.deeply.value, 42);

// Verify it can be stringified (simulating what happens when sent over the wire)
const jsonString = JSON.stringify(completeEvent.final_state);
const parsed = JSON.parse(jsonString);
t.deepEqual(parsed, complexState);
Expand All @@ -363,7 +450,6 @@ test('should handle Uint8Array in final_state', async (t) => {
const plan = createPlan();
const state = createRunState(plan);

// Test with Uint8Array which needs special handling
const stateWithBinary = {
data: { buffer: new Uint8Array([1, 2, 3, 4, 5]) },
};
Expand All @@ -387,6 +473,5 @@ test('should handle Uint8Array in final_state', async (t) => {

await handleRunComplete(context, event);

// Verify the Uint8Array is preserved in the event
t.deepEqual(completeEvent.final_state.data.buffer, new Uint8Array([1, 2, 3, 4, 5]));
});
8 changes: 4 additions & 4 deletions packages/ws-worker/test/events/run-error.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ test('runError should trigger runComplete with a reason', async (t) => {
const jobId = 'job-1';

const state = createRunState(plan);
state.lastDataclipId = 'x';

state.activeStep = 'b';
state.activeJob = jobId;

Expand Down Expand Up @@ -40,7 +40,7 @@ test('workflow error should send reason to onFinish', async (t) => {
const jobId = 'job-1';

const state = createRunState(plan);
state.lastDataclipId = 'x';

state.activeStep = 'b';
state.activeJob = jobId;

Expand Down Expand Up @@ -72,7 +72,7 @@ test('workflow error should send reason to onFinish', async (t) => {

test('runError should not call job complete if the job is not active', async (t) => {
const state = createRunState(plan);
state.lastDataclipId = 'x';


const channel = mockChannel({
[RUN_LOG]: () => true,
Expand Down Expand Up @@ -113,7 +113,7 @@ test('runError should log the reason', async (t) => {
},
options: {},
});
state.lastDataclipId = 'x';

state.activeStep = 'b';
state.activeJob = jobId;

Expand Down
Loading