Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-async-promise-antipattern.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': patch
---

Fix an issue where unhandled errors could trigger a worker crash after a compilation error.
7 changes: 7 additions & 0 deletions .changeset/fix-compile-error-event-order.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@openfn/engine-multi': patch
---

Emit compilation failure log before workflow-error event. Previously the error
event arrived first, causing the worker to tear down the channel before the
log could be delivered.
28 changes: 13 additions & 15 deletions packages/engine-multi/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,19 @@ const execute = async (context: ExecutionContext) => {
},
[workerEvents.ERROR]: (evt: workerEvents.ErrorEvent) => {
didError = true;
if (compileStatus === 'started') {
log(context, {
type: workerEvents.LOG,
workflowId: state.plan.id!,
threadId: evt.threadId || '-',
log: {
level: 'info',
message: ['Error occurred during compilation'],
name: 'RTE',
time: timestamp().toString(),
},
});
}
error(context, {
workflowId: state.plan.id,
error: evt.error,
Expand All @@ -155,21 +168,6 @@ const execute = async (context: ExecutionContext) => {
events,
workerOptions
).catch(async (e: any) => {
if (compileStatus === 'started') {
// Try and alert users that the error occurred at compile-time
// Not super keen on adding this down in the engine but it may help app users
await log(context, {
type: workerEvents.LOG,
workflowId: state.plan.id!,
threadId: '-',
log: {
level: 'info',
message: [`Error occurred during compilation`],
name: 'RTE',
time: timestamp().toString(),
},
});
}
// An error should:
// a) emit an error event (and so be handled by the error() function
// b) reject the task in the pool
Expand Down
2 changes: 2 additions & 0 deletions packages/engine-multi/src/worker/thread/mock-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ function mockRun(plan: MockExecutionPlan, input: State, _options = {}) {
const workflowId = plan.id;

// simulate compilation
publish(workerEvents.COMPILE_START, { workflowId });
try {
eval(job.expression!);
} catch (e: any) {
throw new CompileError(e, job.id!);
}
publish(workerEvents.COMPILE_COMPLETE, { workflowId, duration: 0 });

return new Promise((resolve) => {
const jobId = job.id || '<job>';
Expand Down
32 changes: 32 additions & 0 deletions packages/engine-multi/test/api/execute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,38 @@ test.serial('should emit CompileError if compilation fails', async (t) => {
await execute(context);
});

test.serial.only(
'on compile error, the error log should arrive before the workflow-error event',
async (t) => {
const state = {
id: 'compile-order',
plan: {
workflow: {
steps: [{ id: 'j', expression: 'la la la' }],
},
},
} as WorkflowState;

const context = createContext({ state, options: {} });

const orderedEvents: string[] = [];

context.on(WORKFLOW_LOG, (evt) => {
if (/error occurred during compilation/i.test(evt.message)) {
orderedEvents.push('log');
}
});

context.on(WORKFLOW_ERROR, () => {
orderedEvents.push('error');
});

await execute(context);

t.deepEqual(orderedEvents, ['log', 'error']);
}
);

test.serial('should stringify the whitelist array', async (t) => {
let passedOptions: any;

Expand Down
4 changes: 2 additions & 2 deletions packages/runtime/test/execute/step.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ test.serial('log memory usage', async (t) => {

const memory = logger._find('debug', /step memory usage/i);

// All we're looking for here is two strings of numbers in mb
t.regex(memory?.message, /\d+mb(.+)\d+mb/i);
// All we're looking for here is a number in mb
t.regex(memory?.message, /\d+mb/i);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just pushed this unrelated fix, which will not include a changeset

The test is looking for a number like 1.2mb in the output

But by chance in CI it just output 19mb, which failed the test.

I really don't care if it's a decimal or node. Just a number followed by mb is fine.

});

test.serial('log memory usage with profiler and peak', async (t) => {
Expand Down
6 changes: 2 additions & 4 deletions packages/ws-worker/src/api/destroy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,15 @@ const destroy = async (app: ServerApp, logger: Logger) => {
resolve();
});
}),
new Promise<void>(async (resolve) => {
(async () => {
// Let any active runs complete
await waitForRunsAndClaims(app, logger);

app.queueChannel?.leave();
// Kill the engine and socket
await app.engine.destroy();
app.socket?.disconnect();

resolve();
}),
})(),
]);

logger.success('Server closed');
Expand Down
17 changes: 7 additions & 10 deletions packages/ws-worker/src/events/run-log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,12 @@ export default async function onRunLog(
};
return sendEvent<RunLogPayload>(context, RUN_LOG_BATCH, payload);
} else {
return new Promise<void>(async (resolve) => {
for (const log of logs) {
const payload = {
run_id: `${state.plan.id}`,
...log,
} as LegacyRunLogPayload;
await sendEvent<LegacyRunLogPayload>(context, RUN_LOG, payload);
}
resolve();
});
for (const log of logs) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this absolutely makes sense 👍

const payload = {
run_id: `${state.plan.id}`,
...log,
} as LegacyRunLogPayload;
await sendEvent<LegacyRunLogPayload>(context, RUN_LOG, payload);
}
}
}
53 changes: 25 additions & 28 deletions packages/ws-worker/src/util/try-with-backoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,54 +25,51 @@ const tryWithBackoff = (fn: any, opts: Options = {}): CancelablePromise => {
let cancelled = false;

if (!opts.isCancelled) {
// Keep the top-level cancel flag in scope
// This way nested promises will still use the same flag and let
// themselves be cancelled
opts.isCancelled = () => cancelled;
}

const promise = new Promise<void>(async (resolve, reject) => {
const run = async () => {
try {
await fn();
resolve();
} catch (e: any) {
if (e?.abort) {
cancelled = true;
return reject();
throw e;
}

if (opts.isCancelled!()) {
return resolve();
return;
}

if (!isNaN(maxRuns as any) && runs >= (maxRuns as number)) {
return reject(new Error('max runs exceeded'));
throw new Error('max runs exceeded');
}
// failed? No problem, we'll back off and try again
setTimeout(() => {
if (opts.isCancelled!()) {
return resolve();
}
const nextOpts = {
maxRuns,
runs: runs + 1,
min: Math.min(max, min * BACKOFF_MULTIPLIER),
max: max,
isCancelled: opts.isCancelled,
};
//console.log('trying again in ', nextOpts.min);
tryWithBackoff(fn, nextOpts).then(resolve).catch(reject);
}, min);

await new Promise((resolve) => setTimeout(resolve, min));

if (opts.isCancelled!()) {
return;
}

const nextOpts = {
maxRuns,
runs: runs + 1,
min: Math.min(max, min * BACKOFF_MULTIPLIER),
max: max,
isCancelled: opts.isCancelled,
};

return tryWithBackoff(fn, nextOpts);
}
});
};

const promise = run() as CancelablePromise;

// allow the try to be cancelled
// We can't cancel the active in-flight promise but we can prevent the callback
(promise as CancelablePromise).cancel = () => {
promise.cancel = () => {
cancelled = true;
};

return promise as CancelablePromise;
return promise;
};

export default tryWithBackoff;
17 changes: 17 additions & 0 deletions packages/ws-worker/test/api/destroy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,23 @@ test.serial(
}
);

test('should reject when engine.destroy() throws', async (t) => {
initLightning();
await initWorker();

worker.engine.destroy = async () => {
// We have to manually disconnect the websocket or else mock lightning won't
// shut down down after the test
worker.socket.disconnect();

throw new Error('engine destroy failed');
};

await t.throwsAsync(() => destroy(worker, logger), {
message: 'engine destroy failed',
});
});

test("don't claim after destroy", async (t) => {
initLightning();
await initWorker();
Expand Down
38 changes: 38 additions & 0 deletions packages/ws-worker/test/events/run-log.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,41 @@ test('should work with non-batch logging', async (t) => {

await handleRunLog({ channel, state, options } as any, log);
});

test('non-batch mode: should reject when sendEvent fails', async (t) => {
const plan = { id: 'run-1' };

const log: JSONLog = {
name: 'R/T',
level: 'info',
time: getBigIntTimestamp(),
message: JSON.stringify(['ping']),
};

const state = {
plan,
} as RunState;

const options = {
batchLogs: false,
};

const channel = mockChannel({
[RUN_LOG]: () => {
throw new Error('channel error');
},
});

const logger = {
error: () => {},
warn: () => {},
info: () => {},
debug: () => {},
log: () => {},
};

await t.throwsAsync(
() => handleRunLog({ channel, state, options, logger } as any, log),
{ message: /\[run:log\] timeout|channel error/ }
);
});
15 changes: 15 additions & 0 deletions packages/ws-worker/test/util/try-with-backoff.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,19 @@ test('cancel nested promise', async (t) => {
});
});

test('should reject when isCancelled throws', async (t) => {
const fn = async () => {
throw new Error('fn error');
};

const isCancelled = () => {
throw new Error('isCancelled error');
};

await t.throwsAsync(
() => tryWithBackoff(fn, { isCancelled, maxRuns: 1 }),
{ message: 'isCancelled error' }
);
});

// TODO test increasing backoffs
Loading