From 46ac09eac281191eff5d308286f31894b15ff58c Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 12 Mar 2026 14:20:21 +0000 Subject: [PATCH 1/8] worker: factor claim signature Put required arguments first, optional args after --- packages/ws-worker/src/api/claim.ts | 6 +++--- packages/ws-worker/src/api/workloop.ts | 2 +- packages/ws-worker/src/server.ts | 6 +++--- packages/ws-worker/test/api/claim.test.ts | 26 +++++++++++------------ 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/packages/ws-worker/src/api/claim.ts b/packages/ws-worker/src/api/claim.ts index d704a02a8..e5dad0cfb 100644 --- a/packages/ws-worker/src/api/claim.ts +++ b/packages/ws-worker/src/api/claim.ts @@ -52,12 +52,12 @@ export const resetClaimIdGen = () => { const claim = ( app: ServerApp, - logger: Logger = mockLogger, workloop: Workloop, - options?: ClaimOptions + logger: Logger = mockLogger, + options: ClaimOptions = {} ) => { return new Promise((resolve, reject) => { - const { demand = 1 } = options ?? {}; + const { demand = 1 } = options; const podName = NAME ? `[${NAME}] ` : ''; const activeInWorkloop = workloop.activeRuns.size; diff --git a/packages/ws-worker/src/api/workloop.ts b/packages/ws-worker/src/api/workloop.ts index b0307ecf9..64fe96863 100644 --- a/packages/ws-worker/src/api/workloop.ts +++ b/packages/ws-worker/src/api/workloop.ts @@ -46,7 +46,7 @@ const startWorkloop = ( const workLoop = () => { if (!cancelled) { - promise = tryWithBackoff(() => claim(app, logger, workloop), { + promise = tryWithBackoff(() => claim(app, workloop, logger), { min: minBackoff, max: maxBackoff, }); diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index 04cdede72..d88f46ced 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -177,7 +177,7 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) { if (!app.destroyed) { for (const w of app.workloops) { if (workloopHasCapacity(w)) { - claim(app, logger, w).catch(() => { + claim(app, w, logger).catch(() => { // do nothing - it's fine if claim throws here }); } @@ -424,7 +424,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { logger.info('triggering claim from POST request'); const promises = app.workloops.map((w) => { if (workloopHasCapacity(w)) { - return claim(app, logger, w); + return claim(app, w, logger); } return Promise.reject(new Error('Workloop at capacity')); }); @@ -444,7 +444,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { app.claim = () => { const promises = app.workloops.map((w) => { if (workloopHasCapacity(w)) { - return claim(app, logger, w); + return claim(app, w, logger); } return Promise.reject(new Error('Workloop at capacity')); }); diff --git a/packages/ws-worker/test/api/claim.test.ts b/packages/ws-worker/test/api/claim.test.ts index d2333fd0e..b4b27f211 100644 --- a/packages/ws-worker/test/api/claim.test.ts +++ b/packages/ws-worker/test/api/claim.test.ts @@ -174,7 +174,7 @@ test('claim: should call execute for a single run', async (t) => { }); app.runWorkloopMap = {}; - await claim(app, logger, workloop); + await claim(app, workloop, logger); t.deepEqual(executeArgs[0], { id: 'abc' }); t.true(workloop.activeRuns.has('abc')); t.is(app.runWorkloopMap['abc'], workloop); @@ -188,7 +188,7 @@ test('should not claim if workloop is at capacity', async (t) => { workflows: { a: true }, }); - await t.throwsAsync(() => claim(app, logger, workloop), { + await t.throwsAsync(() => claim(app, workloop, logger), { message: 'Workloop at capacity', }); }); @@ -200,7 +200,7 @@ test('should mark a claim when in flight', async (t) => { workflows: {}, }); - let claimPromise = claim(app, logger, workloop); + let claimPromise = claim(app, workloop, logger); t.is(workloop.openClaims['1'], 1); t.is(app.openClaims['1'], 1); @@ -217,7 +217,7 @@ test('should remove an open claim when completed', async (t) => { workflows: {}, }); - await t.throwsAsync(() => claim(app, logger, workloop), { + await t.throwsAsync(() => claim(app, workloop, logger), { message: 'No runs returned', }); @@ -235,7 +235,7 @@ test('should remove an open claim on error', async (t) => { }, }); - await t.throwsAsync(() => claim(app, logger, workloop), { + await t.throwsAsync(() => claim(app, workloop, logger), { message: 'claim error', }); @@ -254,7 +254,7 @@ test.skip('should remove an open claim on timeout', async (t) => { }, }); - await t.throwsAsync(() => claim(app, logger, workloop), { + await t.throwsAsync(() => claim(app, workloop, logger), { message: 'timeout', }); @@ -271,7 +271,7 @@ test('should mark a claim when in flight with demand: 2', async (t) => { }, }); - let claimPromise = claim(app, logger, workloop, { demand: 2 }); + let claimPromise = claim(app, workloop, logger, { demand: 2 }); t.is(workloop.openClaims['1'], 2); t.is(app.openClaims['1'], 2); @@ -310,10 +310,10 @@ test('should not claim if open claims exceeds workloop capacity', async (t) => { }; // first claim should be fine - let claimPromise = claim(app, logger, workloop); + let claimPromise = claim(app, workloop, logger); // second claim should error and stop the loop actually - await t.throwsAsync(() => claim(app, logger, workloop), { + await t.throwsAsync(() => claim(app, workloop, logger), { message: 'Workloop at capacity', }); t.true(didStopWorkloop); @@ -351,10 +351,10 @@ test('should not claim if open claims + active runs exceeds workloop capacity', }; // first claim should be fine - let claimPromise = claim(app, logger, workloop); + let claimPromise = claim(app, workloop, logger); // second claim should error - await t.throwsAsync(() => claim(app, logger, workloop), { + await t.throwsAsync(() => claim(app, workloop, logger), { message: 'Workloop at capacity', }); @@ -387,7 +387,7 @@ test('claim: should send queues in payload', async (t) => { events: new EventEmitter(), } as unknown as ServerApp; - await t.throwsAsync(() => claim(app, logger, workloop), { + await t.throwsAsync(() => claim(app, workloop, logger), { message: 'No runs returned', }); @@ -409,7 +409,7 @@ test('claim: should check per-workloop capacity, not global', async (t) => { app.runWorkloopMap = {}; // Should succeed because workloop has capacity (1/2), regardless of global count - await claim(app, logger, workloop); + await claim(app, workloop, logger); t.true(workloop.activeRuns.has('run-2')); }); From 0047675405baa4c9da1cfc8cc7964a62b3933f75 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 12 Mar 2026 14:59:45 +0000 Subject: [PATCH 2/8] refactor to remove createWorkllop and WorkloopConfig Since the Workloop inteface is just a static object, we should just create it at source in the parser. This removes some confusing typings and helps simplify the code. This miight enable a further refactor --- packages/ws-worker/src/api/claim.ts | 2 +- packages/ws-worker/src/api/workloop.ts | 14 +--- packages/ws-worker/src/server.ts | 14 +--- packages/ws-worker/src/start.ts | 24 ++++-- .../ws-worker/src/util/parse-workloops.ts | 17 ++-- packages/ws-worker/test/api/claim.test.ts | 12 ++- packages/ws-worker/test/api/workloop.test.ts | 55 +++++------- .../test/util/parse-workloops.test.ts | 83 +++++++++++++++---- 8 files changed, 130 insertions(+), 91 deletions(-) diff --git a/packages/ws-worker/src/api/claim.ts b/packages/ws-worker/src/api/claim.ts index e5dad0cfb..ef6ba89b5 100644 --- a/packages/ws-worker/src/api/claim.ts +++ b/packages/ws-worker/src/api/claim.ts @@ -78,7 +78,7 @@ const claim = ( return reject(new ClaimError('Workloop at capacity')); } else if (activeInWorkloop + pendingWorkloopClaims >= capacity) { app.workloopHandles - ?.get(workloop) + .get(workloop) ?.stop( `workloop ${workloop.id} at capacity (${activeInWorkloop}/${capacity}, ${pendingWorkloopClaims} pending)` ); diff --git a/packages/ws-worker/src/api/workloop.ts b/packages/ws-worker/src/api/workloop.ts index 64fe96863..a725bccab 100644 --- a/packages/ws-worker/src/api/workloop.ts +++ b/packages/ws-worker/src/api/workloop.ts @@ -4,9 +4,10 @@ import claim from './claim'; import type { ServerApp } from '../server'; import type { CancelablePromise } from '../types'; import type { Logger } from '@openfn/logger'; -import type { WorkloopConfig } from '../util/parse-workloops'; -export interface Workloop extends WorkloopConfig { +export interface Workloop { + queues: string[]; + capacity: number; id: string; activeRuns: Set; openClaims: Record; @@ -17,15 +18,6 @@ export interface WorkloopHandle { isStopped: () => boolean; } -export function createWorkloop(config: WorkloopConfig): Workloop { - return { - ...config, - id: `${config.queues.join('>')}:${config.capacity}`, - activeRuns: new Set(), - openClaims: {}, - }; -} - export function workloopHasCapacity(workloop: Workloop): boolean { const pendingClaims = Object.values(workloop.openClaims).reduce( (a, b) => a + b, diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index d88f46ced..47d30765e 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -17,10 +17,7 @@ import { WORK_AVAILABLE, } from './events'; import destroy from './api/destroy'; -import startWorkloop, { - createWorkloop, - workloopHasCapacity, -} from './api/workloop'; +import startWorkloop, { workloopHasCapacity } from './api/workloop'; import claim from './api/claim'; import { Context, execute } from './api/execute'; import healthcheck from './middleware/healthcheck'; @@ -32,7 +29,6 @@ import type { RuntimeEngine } from '@openfn/engine-multi'; import type { Socket, Channel } from './types'; import { convertRun } from './util'; import type { Workloop, WorkloopHandle } from './api/workloop'; -import type { WorkloopConfig } from './util/parse-workloops'; const exec = promisify(_exec); @@ -41,7 +37,7 @@ export type ServerOptions = { batchInterval?: number; batchLimit?: number; maxWorkflows?: number; - workloopConfigs?: WorkloopConfig[]; + workloopConfigs?: Workloop[]; port?: number; lightning?: string; // url to lightning instance logger?: Logger; @@ -267,11 +263,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { app.workflows = {}; app.destroyed = false; - // Initialize workloops: use provided workloopConfigs, or create a single default - const workloopDefs: WorkloopConfig[] = options.workloopConfigs ?? [ - { queues: ['manual', '*'], capacity: options.maxWorkflows ?? 5 }, - ]; - app.workloops = workloopDefs.map(createWorkloop); + app.workloops = options.workloopConfigs ?? []; app.workloopHandles = new Map(); app.runWorkloopMap = {}; diff --git a/packages/ws-worker/src/start.ts b/packages/ws-worker/src/start.ts index ecd778758..7b4a5544c 100644 --- a/packages/ws-worker/src/start.ts +++ b/packages/ws-worker/src/start.ts @@ -4,16 +4,24 @@ import createRTE from '@openfn/engine-multi'; import createMockRTE from './mock/runtime-engine'; import createWorker, { ServerOptions } from './server'; import cli from './util/cli'; -import parseWorkloops, { WorkloopConfig } from './util/parse-workloops'; +import parseWorkloops from './util/parse-workloops'; const args = cli(process.argv); -let workloopConfigs: WorkloopConfig[]; -if (args.workloops) { - workloopConfigs = parseWorkloops(args.workloops); -} else { - workloopConfigs = [{ queues: ['manual', '*'], capacity: args.capacity ?? 5 }]; -} +const defaultCapacity = args.capacity ?? 5; + +const workloopConfigs = args.workloops + ? parseWorkloops(args.workloops) + : [ + { + id: `manual>*:${defaultCapacity}`, + queues: ['manual', '*'], + capacity: defaultCapacity, + activeRuns: new Set(), + openClaims: {} as Record, + }, + ]; + const effectiveCapacity = workloopConfigs.reduce( (sum, c) => sum + c.capacity, 0 @@ -21,7 +29,7 @@ const effectiveCapacity = workloopConfigs.reduce( const logger = createLogger('SRV', { level: args.log }); -logger.info('Starting worker server...'); +logger.info('Starting worker...'); logger.info( 'Workloops:', workloopConfigs, diff --git a/packages/ws-worker/src/util/parse-workloops.ts b/packages/ws-worker/src/util/parse-workloops.ts index 9189f81e0..1baed9547 100644 --- a/packages/ws-worker/src/util/parse-workloops.ts +++ b/packages/ws-worker/src/util/parse-workloops.ts @@ -1,7 +1,4 @@ -export interface WorkloopConfig { - queues: string[]; - capacity: number; -} +import type { Workloop } from '../api/workloop'; export class WorkloopValidationError extends Error { constructor(message: string) { @@ -12,7 +9,7 @@ export class WorkloopValidationError extends Error { const VALID_NAME = /^[a-zA-Z0-9_]+$/; -export default function parseWorkloops(input: string): WorkloopConfig[] { +export default function parseWorkloops(input: string): Workloop[] { const trimmed = input.trim(); if (!trimmed) { throw new WorkloopValidationError('Workloop configuration cannot be empty'); @@ -38,7 +35,7 @@ export default function parseWorkloops(input: string): WorkloopConfig[] { return configs; } -function parseToken(token: string): WorkloopConfig { +function parseToken(token: string): Workloop { const lastColon = token.lastIndexOf(':'); if (lastColon === -1) { throw new WorkloopValidationError( @@ -92,5 +89,11 @@ function parseToken(token: string): WorkloopConfig { ); } - return { queues: names, capacity: count }; + return { + id: token, + queues: names, + capacity: count, + activeRuns: new Set(), + openClaims: {}, + }; } diff --git a/packages/ws-worker/test/api/claim.test.ts b/packages/ws-worker/test/api/claim.test.ts index b4b27f211..c79fc3069 100644 --- a/packages/ws-worker/test/api/claim.test.ts +++ b/packages/ws-worker/test/api/claim.test.ts @@ -9,7 +9,7 @@ import { ServerApp } from '../../src/server'; import { mockChannel } from '../../src/mock/sockets'; import { CLAIM } from '../../src'; import EventEmitter from 'node:events'; -import { createWorkloop, Workloop } from '../../src/api/workloop'; +import { Workloop } from '../../src/api/workloop'; let keys = { public: '.', private: '.' }; @@ -128,8 +128,13 @@ test('verifyToken should accept a token with NBF exactly 2 seconds in future (us ); }); -const createMockWorkloop = (capacity = 5): Workloop => - createWorkloop({ queues: ['manual', '*'], capacity }); +const createMockWorkloop = (capacity = 5): Workloop => ({ + id: `manual>*:${capacity}`, + queues: ['manual', '*'], + capacity, + activeRuns: new Set(), + openClaims: {}, +}); const createMockApp = (opts: any) => { const { @@ -145,6 +150,7 @@ const createMockApp = (opts: any) => { }); return { + workloopHandles: new Map(), openClaims: {}, workflows, queueChannel: channel, diff --git a/packages/ws-worker/test/api/workloop.test.ts b/packages/ws-worker/test/api/workloop.test.ts index 51413ef8b..23622e0a8 100644 --- a/packages/ws-worker/test/api/workloop.test.ts +++ b/packages/ws-worker/test/api/workloop.test.ts @@ -4,7 +4,6 @@ import { createMockLogger } from '@openfn/logger'; import { sleep } from '../util'; import { mockChannel } from '../../src/mock/sockets'; import startWorkloop, { - createWorkloop, Workloop, WorkloopHandle, workloopHasCapacity, @@ -21,8 +20,13 @@ test.afterEach(() => { currentHandle = undefined; }); -const createMockWorkloop = (capacity = 5): Workloop => - createWorkloop({ queues: ['manual', '*'], capacity }); +const createMockWorkloop = (capacity = 5): Workloop => ({ + id: `manual>*:${capacity}`, + queues: ['manual', '*'], + capacity, + activeRuns: new Set(), + openClaims: {}, +}); const createMockApp = (props: any) => ({ workflows: {}, @@ -184,62 +188,41 @@ test('stopping one workloop does not affect another', async (t) => { handleB.stop(); }); -// createWorkloop tests - -test('createWorkloop: generates correct id', (t) => { - const workloop = createWorkloop({ queues: ['fast_lane'], capacity: 1 }); - t.is(workloop.id, 'fast_lane:1'); -}); - -test('createWorkloop: generates id with multiple queues', (t) => { - const workloop = createWorkloop({ queues: ['manual', '*'], capacity: 4 }); - t.is(workloop.id, 'manual>*:4'); -}); - -test('createWorkloop: initializes empty activeRuns', (t) => { - const workloop = createWorkloop({ queues: ['*'], capacity: 5 }); - t.is(workloop.activeRuns.size, 0); -}); - -test('createWorkloop: initializes empty openClaims', (t) => { - const workloop = createWorkloop({ queues: ['*'], capacity: 5 }); - t.deepEqual(workloop.openClaims, {}); -}); +// workloopHasCapacity tests -test('createWorkloop: preserves queues and capacity', (t) => { - const workloop = createWorkloop({ queues: ['a', 'b', '*'], capacity: 3 }); - t.deepEqual(workloop.queues, ['a', 'b', '*']); - t.is(workloop.capacity, 3); +const w = (capacity: number): Workloop => ({ + id: `*:${capacity}`, + queues: ['*'], + capacity, + activeRuns: new Set(), + openClaims: {}, }); -// workloopHasCapacity tests - test('workloopHasCapacity: has capacity when empty', (t) => { - const workloop = createWorkloop({ queues: ['*'], capacity: 3 }); - t.true(workloopHasCapacity(workloop)); + t.true(workloopHasCapacity(w(3))); }); test('workloopHasCapacity: has capacity when partially filled', (t) => { - const workloop = createWorkloop({ queues: ['*'], capacity: 3 }); + const workloop = w(3); workloop.activeRuns.add('run-1'); t.true(workloopHasCapacity(workloop)); }); test('workloopHasCapacity: no capacity when activeRuns fills capacity', (t) => { - const workloop = createWorkloop({ queues: ['*'], capacity: 2 }); + const workloop = w(2); workloop.activeRuns.add('run-1'); workloop.activeRuns.add('run-2'); t.false(workloopHasCapacity(workloop)); }); test('workloopHasCapacity: no capacity when pendingClaims fills capacity', (t) => { - const workloop = createWorkloop({ queues: ['*'], capacity: 2 }); + const workloop = w(2); workloop.openClaims['claim-a'] = 2; t.false(workloopHasCapacity(workloop)); }); test('workloopHasCapacity: no capacity when activeRuns + pendingClaims fills capacity', (t) => { - const workloop = createWorkloop({ queues: ['*'], capacity: 3 }); + const workloop = w(3); workloop.activeRuns.add('run-1'); workloop.openClaims['claim-a'] = 1; workloop.openClaims['claim-b'] = 1; diff --git a/packages/ws-worker/test/util/parse-workloops.test.ts b/packages/ws-worker/test/util/parse-workloops.test.ts index 5d9bc0b59..ca3fa0a38 100644 --- a/packages/ws-worker/test/util/parse-workloops.test.ts +++ b/packages/ws-worker/test/util/parse-workloops.test.ts @@ -3,49 +3,105 @@ import parseWorkloops, { WorkloopValidationError, } from '../../src/util/parse-workloops'; -// Happy paths - test('parse "*:5" into a single wildcard workloop', (t) => { const result = parseWorkloops('*:5'); - t.deepEqual(result, [{ queues: ['*'], capacity: 5 }]); + t.deepEqual(result, [ + { + id: '*:5', + queues: ['*'], + capacity: 5, + activeRuns: new Set(), + openClaims: {}, + }, + ]); }); test('parse "manual>*:3" into a single workloop with preference chain', (t) => { const result = parseWorkloops('manual>*:3'); - t.deepEqual(result, [{ queues: ['manual', '*'], capacity: 3 }]); + t.deepEqual(result, [ + { + id: 'manual>*:3', + queues: ['manual', '*'], + capacity: 3, + activeRuns: new Set(), + openClaims: {}, + }, + ]); }); test('parse "fast_lane:1 manual>*:4" into two workloops', (t) => { const result = parseWorkloops('fast_lane:1 manual>*:4'); t.deepEqual(result, [ - { queues: ['fast_lane'], capacity: 1 }, - { queues: ['manual', '*'], capacity: 4 }, + { + id: 'fast_lane:1', + queues: ['fast_lane'], + capacity: 1, + activeRuns: new Set(), + openClaims: {}, + }, + { + id: 'manual>*:4', + queues: ['manual', '*'], + capacity: 4, + activeRuns: new Set(), + openClaims: {}, + }, ]); }); test('parse multi-preference chain "fast_lane>manual>*:1 *:4"', (t) => { const result = parseWorkloops('fast_lane>manual>*:1 *:4'); t.deepEqual(result, [ - { queues: ['fast_lane', 'manual', '*'], capacity: 1 }, - { queues: ['*'], capacity: 4 }, + { + id: 'fast_lane>manual>*:1', + queues: ['fast_lane', 'manual', '*'], + capacity: 1, + activeRuns: new Set(), + openClaims: {}, + }, + { + id: '*:4', + queues: ['*'], + capacity: 4, + activeRuns: new Set(), + openClaims: {}, + }, ]); }); test('parse single non-wildcard workloop "my_queue:10"', (t) => { const result = parseWorkloops('my_queue:10'); - t.deepEqual(result, [{ queues: ['my_queue'], capacity: 10 }]); + t.deepEqual(result, [ + { + id: 'my_queue:10', + queues: ['my_queue'], + capacity: 10, + activeRuns: new Set(), + openClaims: {}, + }, + ]); }); test('tolerate extra whitespace', (t) => { const result = parseWorkloops(' a:1 b:2 '); t.deepEqual(result, [ - { queues: ['a'], capacity: 1 }, - { queues: ['b'], capacity: 2 }, + { + id: 'a:1', + queues: ['a'], + capacity: 1, + activeRuns: new Set(), + openClaims: {}, + }, + { + id: 'b:2', + queues: ['b'], + capacity: 2, + activeRuns: new Set(), + openClaims: {}, + }, ]); }); -// Validation errors - test('throw on empty string', (t) => { const err = t.throws(() => parseWorkloops(''), { instanceOf: WorkloopValidationError, @@ -118,4 +174,3 @@ test('throw on empty name from double separator "a>>b:1"', (t) => { instanceOf: WorkloopValidationError, }); }); - From d5a6e54a86613acbfa24b9f8875f925b3f27cfaf Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 12 Mar 2026 15:13:11 +0000 Subject: [PATCH 3/8] fix default capacit to restore tests --- packages/ws-worker/src/server.ts | 8 ++++++-- packages/ws-worker/src/start.ts | 17 ++++------------- .../src/util/get-default-workloop-config.ts | 1 + 3 files changed, 11 insertions(+), 15 deletions(-) create mode 100644 packages/ws-worker/src/util/get-default-workloop-config.ts diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index 47d30765e..4c70e09e9 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -29,6 +29,8 @@ import type { RuntimeEngine } from '@openfn/engine-multi'; import type { Socket, Channel } from './types'; import { convertRun } from './util'; import type { Workloop, WorkloopHandle } from './api/workloop'; +import parseWorkloops from './util/parse-workloops'; +import getDefaultWorkloopConfig from './util/get-default-workloop-config'; const exec = promisify(_exec); @@ -263,7 +265,9 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { app.workflows = {}; app.destroyed = false; - app.workloops = options.workloopConfigs ?? []; + app.workloops = + options.workloopConfigs ?? + parseWorkloops(getDefaultWorkloopConfig(options.maxWorkflows)); app.workloopHandles = new Map(); app.runWorkloopMap = {}; @@ -278,7 +282,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { app.options = options; - // Start the workloop for a specific workloop (or all if none specified) + // Start a specific workloop (or all if none specified) // When called with a specific workloop (e.g. after a run completes), we restart // the workloop fresh so it claims immediately rather than waiting in backoff. app.resumeWorkloop = (workloop?: Workloop) => { diff --git a/packages/ws-worker/src/start.ts b/packages/ws-worker/src/start.ts index 7b4a5544c..205eb97df 100644 --- a/packages/ws-worker/src/start.ts +++ b/packages/ws-worker/src/start.ts @@ -5,22 +5,13 @@ import createMockRTE from './mock/runtime-engine'; import createWorker, { ServerOptions } from './server'; import cli from './util/cli'; import parseWorkloops from './util/parse-workloops'; +import getDefaultWorkloopConfig from './util/get-default-workloop-config'; const args = cli(process.argv); -const defaultCapacity = args.capacity ?? 5; - -const workloopConfigs = args.workloops - ? parseWorkloops(args.workloops) - : [ - { - id: `manual>*:${defaultCapacity}`, - queues: ['manual', '*'], - capacity: defaultCapacity, - activeRuns: new Set(), - openClaims: {} as Record, - }, - ]; +const workloopConfigs = parseWorkloops( + args.workloops ?? getDefaultWorkloopConfig(args.capacity) +); const effectiveCapacity = workloopConfigs.reduce( (sum, c) => sum + c.capacity, diff --git a/packages/ws-worker/src/util/get-default-workloop-config.ts b/packages/ws-worker/src/util/get-default-workloop-config.ts new file mode 100644 index 000000000..b6f084061 --- /dev/null +++ b/packages/ws-worker/src/util/get-default-workloop-config.ts @@ -0,0 +1 @@ +export default (capacity = 5) => `manual>*:${capacity}`; From 77b6ce65d49fd90908e8a70280155a9ac86d0a5d Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 12 Mar 2026 15:49:00 +0000 Subject: [PATCH 4/8] refactor Workloop into its own class A pretty big change that seriously simplifies the worklook handling. Each workloop is its own instance, created straight from the string config, which manages its own state --- packages/ws-worker/src/api/claim.ts | 13 +- packages/ws-worker/src/api/destroy.ts | 2 +- packages/ws-worker/src/api/workloop.ts | 118 +++++++------ packages/ws-worker/src/server.ts | 38 ++--- packages/ws-worker/src/start.ts | 14 +- .../ws-worker/src/util/parse-workloops.ts | 10 +- packages/ws-worker/test/api/claim.test.ts | 21 +-- packages/ws-worker/test/api/workloop.test.ts | 155 ++++++++---------- .../test/util/parse-workloops.test.ts | 94 +++-------- 9 files changed, 181 insertions(+), 284 deletions(-) diff --git a/packages/ws-worker/src/api/claim.ts b/packages/ws-worker/src/api/claim.ts index ef6ba89b5..168528143 100644 --- a/packages/ws-worker/src/api/claim.ts +++ b/packages/ws-worker/src/api/claim.ts @@ -69,19 +69,10 @@ const claim = ( ); if (activeInWorkloop >= capacity) { - // Important: stop the workloop so that we don't try and claim any more - app.workloopHandles - ?.get(workloop) - ?.stop( - `workloop ${workloop.id} at capacity (${activeInWorkloop}/${capacity})` - ); + workloop.stop(`workloop ${workloop.id} at capacity (${activeInWorkloop}/${capacity})`); return reject(new ClaimError('Workloop at capacity')); } else if (activeInWorkloop + pendingWorkloopClaims >= capacity) { - app.workloopHandles - .get(workloop) - ?.stop( - `workloop ${workloop.id} at capacity (${activeInWorkloop}/${capacity}, ${pendingWorkloopClaims} pending)` - ); + workloop.stop(`workloop ${workloop.id} at capacity (${activeInWorkloop}/${capacity}, ${pendingWorkloopClaims} pending)`); return reject(new ClaimError('Workloop at capacity')); } diff --git a/packages/ws-worker/src/api/destroy.ts b/packages/ws-worker/src/api/destroy.ts index 1765e301a..0c5b201fa 100644 --- a/packages/ws-worker/src/api/destroy.ts +++ b/packages/ws-worker/src/api/destroy.ts @@ -15,7 +15,7 @@ const destroy = async (app: ServerApp, logger: Logger) => { // Immediately stop asking for more work on all workloops for (const w of app.workloops) { - app.workloopHandles.get(w)?.stop('server closed'); + w.stop('server closed'); } // Shut down the HTTP server diff --git a/packages/ws-worker/src/api/workloop.ts b/packages/ws-worker/src/api/workloop.ts index a725bccab..055439c2f 100644 --- a/packages/ws-worker/src/api/workloop.ts +++ b/packages/ws-worker/src/api/workloop.ts @@ -5,67 +5,77 @@ import type { ServerApp } from '../server'; import type { CancelablePromise } from '../types'; import type { Logger } from '@openfn/logger'; -export interface Workloop { +export class Workloop { + id: string; queues: string[]; capacity: number; - id: string; - activeRuns: Set; - openClaims: Record; -} + activeRuns = new Set(); + openClaims: Record = {}; -export interface WorkloopHandle { - stop: (reason?: string) => void; - isStopped: () => boolean; -} + private cancelled = true; + private promise?: CancelablePromise; + private logger?: Logger; -export function workloopHasCapacity(workloop: Workloop): boolean { - const pendingClaims = Object.values(workloop.openClaims).reduce( - (a, b) => a + b, - 0 - ); - return workloop.activeRuns.size + pendingClaims < workloop.capacity; -} + constructor({ + id, + queues, + capacity, + }: { + id: string; + queues: string[]; + capacity: number; + }) { + this.id = id; + this.queues = queues; + this.capacity = capacity; + } -const startWorkloop = ( - app: ServerApp, - logger: Logger, - minBackoff: number, - maxBackoff: number, - workloop: Workloop -): WorkloopHandle => { - let promise: CancelablePromise; - let cancelled = false; + hasCapacity(): boolean { + const pendingClaims = Object.values(this.openClaims).reduce( + (a, b) => a + b, + 0 + ); + return this.activeRuns.size + pendingClaims < this.capacity; + } - const workLoop = () => { - if (!cancelled) { - promise = tryWithBackoff(() => claim(app, workloop, logger), { - min: minBackoff, - max: maxBackoff, - }); - // TODO this needs more unit tests I think - promise - .then(() => { - if (!cancelled) { - setTimeout(workLoop, minBackoff); - } - }) - .catch(() => { - // do nothing + start( + app: ServerApp, + logger: Logger, + minBackoff: number, + maxBackoff: number + ): void { + this.logger = logger; + this.cancelled = false; + + const loop = () => { + if (!this.cancelled) { + this.promise = tryWithBackoff(() => claim(app, this, logger), { + min: minBackoff, + max: maxBackoff, }); - } - }; - workLoop(); + this.promise + .then(() => { + if (!this.cancelled) { + setTimeout(loop, minBackoff); + } + }) + .catch(() => { + // do nothing + }); + } + }; + loop(); + } - const stop = (reason = 'reason unknown') => { - if (!cancelled) { - logger.info(`cancelling workloop: ${reason}`); - cancelled = true; - promise.cancel(); + stop(reason = 'reason unknown'): void { + if (!this.cancelled) { + this.logger?.info(`cancelling workloop: ${reason}`); + this.cancelled = true; + this.promise?.cancel(); } - }; - const isStopped = () => cancelled; + } - return { stop, isStopped }; -}; - -export default startWorkloop; + isStopped(): boolean { + return this.cancelled; + } +} diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index 4c70e09e9..52ccedc20 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -17,7 +17,7 @@ import { WORK_AVAILABLE, } from './events'; import destroy from './api/destroy'; -import startWorkloop, { workloopHasCapacity } from './api/workloop'; +import { Workloop } from './api/workloop'; import claim from './api/claim'; import { Context, execute } from './api/execute'; import healthcheck from './middleware/healthcheck'; @@ -28,7 +28,6 @@ import type { Server } from 'http'; import type { RuntimeEngine } from '@openfn/engine-multi'; import type { Socket, Channel } from './types'; import { convertRun } from './util'; -import type { Workloop, WorkloopHandle } from './api/workloop'; import parseWorkloops from './util/parse-workloops'; import getDefaultWorkloopConfig from './util/get-default-workloop-config'; @@ -39,7 +38,7 @@ export type ServerOptions = { batchInterval?: number; batchLimit?: number; maxWorkflows?: number; - workloopConfigs?: Workloop[]; + workloopConfigs?: string; port?: number; lightning?: string; // url to lightning instance logger?: Logger; @@ -83,7 +82,6 @@ export interface ServerApp extends Koa { options: ServerOptions; workloops: Workloop[]; - workloopHandles: Map; runWorkloopMap: Record; execute: ({ id, token }: ClaimRun) => Promise; @@ -140,9 +138,8 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) { // We were disconnected from the queue const onDisconnect = () => { for (const w of app.workloops) { - const handle = app.workloopHandles.get(w); - if (handle && !handle.isStopped()) { - handle.stop('Socket disconnected unexpectedly'); + if (!w.isStopped()) { + w.stop('Socket disconnected unexpectedly'); } } if (!app.destroyed) { @@ -174,7 +171,7 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) { if (event === WORK_AVAILABLE) { if (!app.destroyed) { for (const w of app.workloops) { - if (workloopHasCapacity(w)) { + if (w.hasCapacity()) { claim(app, w, logger).catch(() => { // do nothing - it's fine if claim throws here }); @@ -265,10 +262,9 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { app.workflows = {}; app.destroyed = false; - app.workloops = - options.workloopConfigs ?? - parseWorkloops(getDefaultWorkloopConfig(options.maxWorkflows)); - app.workloopHandles = new Map(); + app.workloops = parseWorkloops( + options.workloopConfigs ?? getDefaultWorkloopConfig(options.maxWorkflows) + ); app.runWorkloopMap = {}; app.server = app.listen(port); @@ -292,23 +288,19 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { const targets = workloop ? [workloop] : app.workloops; for (const w of targets) { - if (!workloopHasCapacity(w)) { + if (!w.hasCapacity()) { continue; } - // Stop any existing workloop so we can start fresh with immediate claim - const existingHandle = app.workloopHandles.get(w); - if (existingHandle && !existingHandle.isStopped()) { - existingHandle.stop('restarting'); + if (!w.isStopped()) { + w.stop('restarting'); } logger.info(`Starting workloop for ${w.id}`); - const handle = startWorkloop( + w.start( app, logger, options.backoff?.min || MIN_BACKOFF, - options.backoff?.max || MAX_BACKOFF, - w + options.backoff?.max || MAX_BACKOFF ); - app.workloopHandles.set(w, handle); } }; @@ -419,7 +411,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { router.post('/claim', async (ctx) => { logger.info('triggering claim from POST request'); const promises = app.workloops.map((w) => { - if (workloopHasCapacity(w)) { + if (w.hasCapacity()) { return claim(app, w, logger); } return Promise.reject(new Error('Workloop at capacity')); @@ -439,7 +431,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { app.claim = () => { const promises = app.workloops.map((w) => { - if (workloopHasCapacity(w)) { + if (w.hasCapacity()) { return claim(app, w, logger); } return Promise.reject(new Error('Workloop at capacity')); diff --git a/packages/ws-worker/src/start.ts b/packages/ws-worker/src/start.ts index 205eb97df..b22ea164e 100644 --- a/packages/ws-worker/src/start.ts +++ b/packages/ws-worker/src/start.ts @@ -4,19 +4,17 @@ import createRTE from '@openfn/engine-multi'; import createMockRTE from './mock/runtime-engine'; import createWorker, { ServerOptions } from './server'; import cli from './util/cli'; -import parseWorkloops from './util/parse-workloops'; import getDefaultWorkloopConfig from './util/get-default-workloop-config'; const args = cli(process.argv); -const workloopConfigs = parseWorkloops( - args.workloops ?? getDefaultWorkloopConfig(args.capacity) -); +const workloopConfigs = args.workloops ?? getDefaultWorkloopConfig(args.capacity); -const effectiveCapacity = workloopConfigs.reduce( - (sum, c) => sum + c.capacity, - 0 -); +// Sum the capacity from each ":" token in the workloop string +const effectiveCapacity = workloopConfigs + .trim() + .split(/\s+/) + .reduce((sum, token) => sum + (parseInt(token.split(':').pop()!) || 0), 0); const logger = createLogger('SRV', { level: args.log }); diff --git a/packages/ws-worker/src/util/parse-workloops.ts b/packages/ws-worker/src/util/parse-workloops.ts index 1baed9547..16660121e 100644 --- a/packages/ws-worker/src/util/parse-workloops.ts +++ b/packages/ws-worker/src/util/parse-workloops.ts @@ -1,4 +1,4 @@ -import type { Workloop } from '../api/workloop'; +import { Workloop } from '../api/workloop'; export class WorkloopValidationError extends Error { constructor(message: string) { @@ -89,11 +89,5 @@ function parseToken(token: string): Workloop { ); } - return { - id: token, - queues: names, - capacity: count, - activeRuns: new Set(), - openClaims: {}, - }; + return new Workloop({ id: token, queues: names, capacity: count }); } diff --git a/packages/ws-worker/test/api/claim.test.ts b/packages/ws-worker/test/api/claim.test.ts index c79fc3069..acaa8400c 100644 --- a/packages/ws-worker/test/api/claim.test.ts +++ b/packages/ws-worker/test/api/claim.test.ts @@ -128,13 +128,8 @@ test('verifyToken should accept a token with NBF exactly 2 seconds in future (us ); }); -const createMockWorkloop = (capacity = 5): Workloop => ({ - id: `manual>*:${capacity}`, - queues: ['manual', '*'], - capacity, - activeRuns: new Set(), - openClaims: {}, -}); +const createMockWorkloop = (capacity = 5): Workloop => + new Workloop({ id: `manual>*:${capacity}`, queues: ['manual', '*'], capacity }); const createMockApp = (opts: any) => { const { @@ -150,7 +145,6 @@ const createMockApp = (opts: any) => { }); return { - workloopHandles: new Map(), openClaims: {}, workflows, queueChannel: channel, @@ -288,15 +282,7 @@ test('should mark a claim when in flight with demand: 2', async (t) => { }); test('should not claim if open claims exceeds workloop capacity', async (t) => { - let didStopWorkloop = false; - const workloop = createMockWorkloop(1); - const mockHandle = { - stop: () => { - didStopWorkloop = true; - }, - isStopped: () => false, - }; const app = createMockApp({ workflows: {}, @@ -307,7 +293,6 @@ test('should not claim if open claims exceeds workloop capacity', async (t) => { setTimeout(resolve({ runs: [] }), 100); }), }); - app.workloopHandles = new Map([[workloop, mockHandle]]); app.runWorkloopMap = {}; // @ts-ignore @@ -322,7 +307,7 @@ test('should not claim if open claims exceeds workloop capacity', async (t) => { await t.throwsAsync(() => claim(app, workloop, logger), { message: 'Workloop at capacity', }); - t.true(didStopWorkloop); + t.true(workloop.isStopped()); // The prior claim should not have counted for anything t.is(Object.keys(app.workflows).length, 0); diff --git a/packages/ws-worker/test/api/workloop.test.ts b/packages/ws-worker/test/api/workloop.test.ts index 23622e0a8..1f44f8c66 100644 --- a/packages/ws-worker/test/api/workloop.test.ts +++ b/packages/ws-worker/test/api/workloop.test.ts @@ -3,30 +3,25 @@ import { createMockLogger } from '@openfn/logger'; import { sleep } from '../util'; import { mockChannel } from '../../src/mock/sockets'; -import startWorkloop, { - Workloop, - WorkloopHandle, - workloopHasCapacity, -} from '../../src/api/workloop'; +import { Workloop } from '../../src/api/workloop'; import { CLAIM } from '../../src/events'; import EventEmitter from 'node:events'; -let currentHandle: WorkloopHandle | undefined; +let workloop: Workloop | undefined; const logger = createMockLogger(); test.afterEach(() => { - currentHandle?.stop(); // cancel any workloops - currentHandle = undefined; + workloop?.stop(); + workloop = undefined; }); -const createMockWorkloop = (capacity = 5): Workloop => ({ - id: `manual>*:${capacity}`, - queues: ['manual', '*'], - capacity, - activeRuns: new Set(), - openClaims: {}, -}); +const createWorkloop = (capacity = 5) => + new Workloop({ + id: `manual>*:${capacity}`, + queues: ['manual', '*'], + capacity, + }); const createMockApp = (props: any) => ({ workflows: {}, @@ -43,34 +38,32 @@ const createMockApp = (props: any) => ({ ...props, }); -test('workloop can be cancelled', async (t) => { +test.serial('workloop can be cancelled', async (t) => { let count = 0; - const workloop = createMockWorkloop(); - let handle: WorkloopHandle; + workloop = createWorkloop(); const app = createMockApp({ queueChannel: mockChannel({ [CLAIM]: () => { count++; - handle.stop(); + workloop?.stop(); return { runs: [] }; }, }), }); - handle = startWorkloop(app as any, logger, 1, 1, workloop); - currentHandle = handle; - t.false(handle.isStopped()); + workloop.start(app as any, logger, 1, 1); + t.false(workloop.isStopped()); await sleep(100); // A quirk of how cancel works is that the loop will be called a few times t.true(count <= 5); - t.true(handle.isStopped()); + t.true(workloop.isStopped()); }); -test('workloop sends the runs:claim event', (t) => { +test.serial('workloop sends the runs:claim event', (t) => { return new Promise((done) => { - const workloop = createMockWorkloop(); + workloop = createWorkloop(); const app = createMockApp({ queueChannel: mockChannel({ [CLAIM]: () => { @@ -80,14 +73,14 @@ test('workloop sends the runs:claim event', (t) => { }, }), }); - currentHandle = startWorkloop(app as any, logger, 1, 1, workloop); + workloop.start(app as any, logger, 1, 1); }); }); -test('workloop sends the runs:claim event several times ', (t) => { +test.serial('workloop sends the runs:claim event several times ', (t) => { return new Promise((done) => { let count = 0; - const workloop = createMockWorkloop(); + workloop = createWorkloop(); const app = createMockApp({ queueChannel: mockChannel({ [CLAIM]: () => { @@ -100,13 +93,13 @@ test('workloop sends the runs:claim event several times ', (t) => { }, }), }); - currentHandle = startWorkloop(app as any, logger, 1, 1, workloop); + workloop.start(app as any, logger, 1, 1); }); }); -test('workloop calls execute if runs:claim returns runs', (t) => { +test.serial('workloop calls execute if runs:claim returns runs', (t) => { return new Promise((done) => { - const workloop = createMockWorkloop(); + workloop = createWorkloop(); const app = createMockApp({ queueChannel: mockChannel({ [CLAIM]: () => ({ @@ -120,45 +113,40 @@ test('workloop calls execute if runs:claim returns runs', (t) => { }, }); - currentHandle = startWorkloop(app as any, logger, 1, 1, workloop); + workloop.start(app as any, logger, 1, 1); }); }); -test('startWorkloop returns a handle with stop and isStopped', (t) => { +test.serial('workloop has stop and isStopped methods', (t) => { return new Promise((done) => { - const workloop = createMockWorkloop(); + workloop = createWorkloop(); const app = createMockApp({ queueChannel: mockChannel({ [CLAIM]: () => { - // After starting, isStopped returns false - t.false(handle.isStopped()); + t.false(workloop?.isStopped()); t.pass(); done(); return { runs: [] }; }, }), }); - const handle = startWorkloop(app as any, logger, 1, 1, workloop); - currentHandle = handle; + workloop.start(app as any, logger, 1, 1); - // Handle has the right shape - t.is(typeof handle.stop, 'function'); - t.is(typeof handle.isStopped, 'function'); + t.is(typeof workloop.stop, 'function'); + t.is(typeof workloop.isStopped, 'function'); }); }); -test('stopping one workloop does not affect another', async (t) => { - const wlA = createMockWorkloop(1); - const wlB = createMockWorkloop(1); +test.serial('stopping one workloop does not affect another', async (t) => { + const wlA = createWorkloop(1); + const wlB = createWorkloop(1); let countB = 0; const appA = createMockApp({ queueChannel: mockChannel({ - [CLAIM]: () => { - return { runs: [] }; - }, + [CLAIM]: () => ({ runs: [] }), }), }); @@ -171,60 +159,55 @@ test('stopping one workloop does not affect another', async (t) => { }), }); - const handleA = startWorkloop(appA as any, logger, 1, 1, wlA); - const handleB = startWorkloop(appB as any, logger, 1, 1, wlB); + wlA.start(appA as any, logger, 1, 1); + wlB.start(appB as any, logger, 1, 1); await sleep(50); - handleA.stop(); + wlA.stop(); const countBAtAStop = countB; await sleep(50); - // Workloop A should be stopped - t.true(handleA.isStopped()); - // Workloop B should still be running and claiming - t.false(handleB.isStopped()); + t.true(wlA.isStopped()); + t.false(wlB.isStopped()); t.true(countB > countBAtAStop); - handleB.stop(); -}); - -// workloopHasCapacity tests - -const w = (capacity: number): Workloop => ({ - id: `*:${capacity}`, - queues: ['*'], - capacity, - activeRuns: new Set(), - openClaims: {}, + wlB.stop(); }); -test('workloopHasCapacity: has capacity when empty', (t) => { - t.true(workloopHasCapacity(w(3))); +test.serial('hasCapacity: has capacity when empty', (t) => { + workloop = createWorkloop(3); + t.true(workloop.hasCapacity()); }); -test('workloopHasCapacity: has capacity when partially filled', (t) => { - const workloop = w(3); +test.serial('hasCapacity: has capacity when partially filled', (t) => { + workloop = createWorkloop(3); workloop.activeRuns.add('run-1'); - t.true(workloopHasCapacity(workloop)); + t.true(workloop.hasCapacity()); }); -test('workloopHasCapacity: no capacity when activeRuns fills capacity', (t) => { - const workloop = w(2); +test.serial('hasCapacity: no capacity when activeRuns fills capacity', (t) => { + workloop = createWorkloop(2); workloop.activeRuns.add('run-1'); workloop.activeRuns.add('run-2'); - t.false(workloopHasCapacity(workloop)); + t.false(workloop.hasCapacity()); }); -test('workloopHasCapacity: no capacity when pendingClaims fills capacity', (t) => { - const workloop = w(2); - workloop.openClaims['claim-a'] = 2; - t.false(workloopHasCapacity(workloop)); -}); - -test('workloopHasCapacity: no capacity when activeRuns + pendingClaims fills capacity', (t) => { - const workloop = w(3); - workloop.activeRuns.add('run-1'); - workloop.openClaims['claim-a'] = 1; - workloop.openClaims['claim-b'] = 1; - t.false(workloopHasCapacity(workloop)); -}); +test.serial( + 'hasCapacity: no capacity when pendingClaims fills capacity', + (t) => { + workloop = createWorkloop(2); + workloop.openClaims['claim-a'] = 2; + t.false(workloop.hasCapacity()); + } +); + +test.serial( + 'hasCapacity: no capacity when activeRuns + pendingClaims fills capacity', + (t) => { + workloop = createWorkloop(3); + workloop.activeRuns.add('run-1'); + workloop.openClaims['claim-a'] = 1; + workloop.openClaims['claim-b'] = 1; + t.false(workloop.hasCapacity()); + } +); diff --git a/packages/ws-worker/test/util/parse-workloops.test.ts b/packages/ws-worker/test/util/parse-workloops.test.ts index ca3fa0a38..a8030aa7e 100644 --- a/packages/ws-worker/test/util/parse-workloops.test.ts +++ b/packages/ws-worker/test/util/parse-workloops.test.ts @@ -1,104 +1,48 @@ import test from 'ava'; +import { Workloop } from '../../src/api/workloop'; import parseWorkloops, { WorkloopValidationError, } from '../../src/util/parse-workloops'; +// Extract just the parsed fields for comparison +const parsed = (w: Workloop) => ({ id: w.id, queues: w.queues, capacity: w.capacity }); + test('parse "*:5" into a single wildcard workloop', (t) => { - const result = parseWorkloops('*:5'); - t.deepEqual(result, [ - { - id: '*:5', - queues: ['*'], - capacity: 5, - activeRuns: new Set(), - openClaims: {}, - }, - ]); + const result = parseWorkloops('*:5').map(parsed); + t.deepEqual(result, [{ id: '*:5', queues: ['*'], capacity: 5 }]); }); test('parse "manual>*:3" into a single workloop with preference chain', (t) => { - const result = parseWorkloops('manual>*:3'); - t.deepEqual(result, [ - { - id: 'manual>*:3', - queues: ['manual', '*'], - capacity: 3, - activeRuns: new Set(), - openClaims: {}, - }, - ]); + const result = parseWorkloops('manual>*:3').map(parsed); + t.deepEqual(result, [{ id: 'manual>*:3', queues: ['manual', '*'], capacity: 3 }]); }); test('parse "fast_lane:1 manual>*:4" into two workloops', (t) => { - const result = parseWorkloops('fast_lane:1 manual>*:4'); + const result = parseWorkloops('fast_lane:1 manual>*:4').map(parsed); t.deepEqual(result, [ - { - id: 'fast_lane:1', - queues: ['fast_lane'], - capacity: 1, - activeRuns: new Set(), - openClaims: {}, - }, - { - id: 'manual>*:4', - queues: ['manual', '*'], - capacity: 4, - activeRuns: new Set(), - openClaims: {}, - }, + { id: 'fast_lane:1', queues: ['fast_lane'], capacity: 1 }, + { id: 'manual>*:4', queues: ['manual', '*'], capacity: 4 }, ]); }); test('parse multi-preference chain "fast_lane>manual>*:1 *:4"', (t) => { - const result = parseWorkloops('fast_lane>manual>*:1 *:4'); + const result = parseWorkloops('fast_lane>manual>*:1 *:4').map(parsed); t.deepEqual(result, [ - { - id: 'fast_lane>manual>*:1', - queues: ['fast_lane', 'manual', '*'], - capacity: 1, - activeRuns: new Set(), - openClaims: {}, - }, - { - id: '*:4', - queues: ['*'], - capacity: 4, - activeRuns: new Set(), - openClaims: {}, - }, + { id: 'fast_lane>manual>*:1', queues: ['fast_lane', 'manual', '*'], capacity: 1 }, + { id: '*:4', queues: ['*'], capacity: 4 }, ]); }); test('parse single non-wildcard workloop "my_queue:10"', (t) => { - const result = parseWorkloops('my_queue:10'); - t.deepEqual(result, [ - { - id: 'my_queue:10', - queues: ['my_queue'], - capacity: 10, - activeRuns: new Set(), - openClaims: {}, - }, - ]); + const result = parseWorkloops('my_queue:10').map(parsed); + t.deepEqual(result, [{ id: 'my_queue:10', queues: ['my_queue'], capacity: 10 }]); }); test('tolerate extra whitespace', (t) => { - const result = parseWorkloops(' a:1 b:2 '); + const result = parseWorkloops(' a:1 b:2 ').map(parsed); t.deepEqual(result, [ - { - id: 'a:1', - queues: ['a'], - capacity: 1, - activeRuns: new Set(), - openClaims: {}, - }, - { - id: 'b:2', - queues: ['b'], - capacity: 2, - activeRuns: new Set(), - openClaims: {}, - }, + { id: 'a:1', queues: ['a'], capacity: 1 }, + { id: 'b:2', queues: ['b'], capacity: 2 }, ]); }); From 2ec6ce5ebcaf2745c27f21f1196535d44b9edb07 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 12 Mar 2026 16:06:10 +0000 Subject: [PATCH 5/8] update docs --- packages/ws-worker/src/util/cli.ts | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/packages/ws-worker/src/util/cli.ts b/packages/ws-worker/src/util/cli.ts index 86971bcf4..0fa996bb3 100644 --- a/packages/ws-worker/src/util/cli.ts +++ b/packages/ws-worker/src/util/cli.ts @@ -205,13 +205,14 @@ export default function parseArgs(argv: string[]): Args { 'Claim backoff rules: min/max (in seconds). Env: WORKER_BACKOFF', }) .option('capacity', { - description: `max concurrent workers. Default ${DEFAULT_WORKER_CAPACITY}. Env: WORKER_CAPACITY`, + description: `Sets the maximum concurrent workers - but only if workloops is not set. Default ${DEFAULT_WORKER_CAPACITY}. Env: WORKER_CAPACITY`, type: 'number', }) .option('workloops', { description: - 'Workloop configuration: ": ...", e.g., "fast_lane:1 manual>*:4". Mutually exclusive with --capacity. Env: WORKER_WORKLOOPS', + 'Configure workloops with a priorised queue list and a max capacity. Syntax: ": ...". Mutually exclusive with --capacity. Env: WORKER_WORKLOOPS', type: 'string', + example: 'fast_lane:1 manual>*:4', }) .option('state-props-to-remove', { description: @@ -274,7 +275,19 @@ export default function parseArgs(argv: string[]): Args { description: 'When a websocket event receives a timeout, this option sets how log to wait before retrying Default 30000. Env: WORKER_TIMEOUT_RETRY_DELAY_MS', type: 'number', - }); + }) + .example( + 'start --queues *:5', + 'Default start configuration: a single workloop with capacity 5, claiming from all queues' + ) + .example( + 'start --queues manual>*:5', + 'A single workloop, capacity 5, which claims across two queues. Runs in the manual queue will be picked first, else any other queue will be picked.' + ) + .example( + 'start --queues fast_lane:1 manual>*:4', + 'production start configuration with 1 fast lane workloop (capacity 1) and a second workloop with capacity 4' + ); const args = parser.parse() as Args; From 5fa58fc9bcf11b20481482be4055b9d0e0f527d2 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 12 Mar 2026 16:32:34 +0000 Subject: [PATCH 6/8] remove app.openClaims, which is not useful anymore --- packages/ws-worker/src/api/claim.ts | 6 ------ packages/ws-worker/src/api/destroy.ts | 19 ++++++------------- packages/ws-worker/src/server.ts | 6 ++++-- packages/ws-worker/test/api/claim.test.ts | 7 ------- packages/ws-worker/test/api/workloop.test.ts | 1 - 5 files changed, 10 insertions(+), 29 deletions(-) diff --git a/packages/ws-worker/src/api/claim.ts b/packages/ws-worker/src/api/claim.ts index 168528143..e8720b57a 100644 --- a/packages/ws-worker/src/api/claim.ts +++ b/packages/ws-worker/src/api/claim.ts @@ -91,10 +91,7 @@ const claim = ( const claimId = ++claimIdGen; - // Track in both workloop-level and app-level openClaims for backward compat workloop.openClaims[claimId] = demand; - app.openClaims ??= {}; - app.openClaims[claimId] = demand; const { used_heap_size, heap_size_limit } = v8.getHeapStatistics(); const usedHeapMb = Math.round(used_heap_size / 1024 / 1024); @@ -114,7 +111,6 @@ const claim = ( }) .receive('ok', async ({ runs }: ClaimReply) => { delete workloop.openClaims[claimId]; - delete app.openClaims[claimId]; const duration = Date.now() - start; logger.debug( `${podName}claimed ${runs.length} runs in ${duration}ms (${ @@ -160,13 +156,11 @@ const claim = ( // What do we do if we fail to join the worker channel? .receive('error', (e) => { delete workloop.openClaims[claimId]; - delete app.openClaims[claimId]; logger.error('Error on claim', e); reject(new Error('claim error')); }) .receive('timeout', () => { delete workloop.openClaims[claimId]; - delete app.openClaims[claimId]; logger.error('TIMEOUT on claim. Runs may be lost.'); reject(new Error('timeout')); }); diff --git a/packages/ws-worker/src/api/destroy.ts b/packages/ws-worker/src/api/destroy.ts index 0c5b201fa..afeafd9d3 100644 --- a/packages/ws-worker/src/api/destroy.ts +++ b/packages/ws-worker/src/api/destroy.ts @@ -1,4 +1,4 @@ -import { ServerApp } from '../server'; +import { ServerApp, pendingClaims } from '../server'; import { INTERNAL_CLAIM_COMPLETE, INTERNAL_RUN_COMPLETE } from '../events'; import type { Logger } from '@openfn/logger'; @@ -43,18 +43,14 @@ const waitForRunsAndClaims = (app: ServerApp, logger: Logger) => new Promise((resolve) => { const log = () => { logger.debug( - `Waiting for ${Object.keys(app.workflows).length} runs and ${ - Object.keys(app.openClaims).length - } claims to complete...` + `Waiting for ${ + Object.keys(app.workflows).length + } runs and ${app.pendingClaims()} claims to complete...` ); }; const checkAllClear = () => { - if ( - Object.keys(app.workflows).length + - Object.keys(app.openClaims).length === - 0 - ) { + if (Object.keys(app.workflows).length + app.pendingClaims() === 0) { logger.debug('All runs completed!'); app.events.off(INTERNAL_RUN_COMPLETE, checkAllClear); app.events.off(INTERNAL_CLAIM_COMPLETE, checkAllClear); @@ -64,10 +60,7 @@ const waitForRunsAndClaims = (app: ServerApp, logger: Logger) => } }; - if ( - Object.keys(app.workflows).length || - Object.keys(app.openClaims).length - ) { + if (Object.keys(app.workflows).length || app.pendingClaims()) { log(); app.events.on(INTERNAL_RUN_COMPLETE, checkAllClear); app.events.on(INTERNAL_CLAIM_COMPLETE, checkAllClear); diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index 52ccedc20..187baf636 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -74,7 +74,6 @@ export interface ServerApp extends Koa { socket?: any; queueChannel?: Channel; workflows: Record; - openClaims: Record; destroyed: boolean; events: EventEmitter; server: Server; @@ -87,6 +86,7 @@ export interface ServerApp extends Koa { execute: ({ id, token }: ClaimRun) => Promise; destroy: () => void; resumeWorkloop: (workloop?: Workloop) => void; + pendingClaims: () => number; // debug API claim: () => Promise; @@ -258,7 +258,6 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { }) ); - app.openClaims = {}; app.workflows = {}; app.destroyed = false; @@ -439,6 +438,9 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { return Promise.any(promises); }; + app.pendingClaims = () => + app.workloops.reduce((sum, w) => sum + Object.keys(w.openClaims).length, 0); + app.destroy = () => destroy(app, logger); app.use(router.routes()); diff --git a/packages/ws-worker/test/api/claim.test.ts b/packages/ws-worker/test/api/claim.test.ts index acaa8400c..93681be73 100644 --- a/packages/ws-worker/test/api/claim.test.ts +++ b/packages/ws-worker/test/api/claim.test.ts @@ -145,7 +145,6 @@ const createMockApp = (opts: any) => { }); return { - openClaims: {}, workflows, queueChannel: channel, workloops: [], @@ -203,7 +202,6 @@ test('should mark a claim when in flight', async (t) => { let claimPromise = claim(app, workloop, logger); t.is(workloop.openClaims['1'], 1); - t.is(app.openClaims['1'], 1); await t.throwsAsync(claimPromise, { message: 'No runs returned', @@ -222,7 +220,6 @@ test('should remove an open claim when completed', async (t) => { }); t.falsy(workloop.openClaims['1']); - t.falsy(app.openClaims['1']); }); test('should remove an open claim on error', async (t) => { @@ -240,7 +237,6 @@ test('should remove an open claim on error', async (t) => { }); t.falsy(workloop.openClaims['1']); - t.falsy(app.openClaims['1']); }); // TODO not really sure how to check this @@ -259,7 +255,6 @@ test.skip('should remove an open claim on timeout', async (t) => { }); t.falsy(workloop.openClaims['1']); - t.falsy(app.openClaims['1']); }); test('should mark a claim when in flight with demand: 2', async (t) => { @@ -274,7 +269,6 @@ test('should mark a claim when in flight with demand: 2', async (t) => { let claimPromise = claim(app, workloop, logger, { demand: 2 }); t.is(workloop.openClaims['1'], 2); - t.is(app.openClaims['1'], 2); await t.throwsAsync(claimPromise, { message: 'No runs returned', @@ -370,7 +364,6 @@ test('claim: should send queues in payload', async (t) => { }); const app = { - openClaims: {}, workflows: {}, queueChannel: channel, runWorkloopMap: {}, diff --git a/packages/ws-worker/test/api/workloop.test.ts b/packages/ws-worker/test/api/workloop.test.ts index 1f44f8c66..38164e87d 100644 --- a/packages/ws-worker/test/api/workloop.test.ts +++ b/packages/ws-worker/test/api/workloop.test.ts @@ -25,7 +25,6 @@ const createWorkloop = (capacity = 5) => const createMockApp = (props: any) => ({ workflows: {}, - openClaims: {}, queueChannel: mockChannel({ [CLAIM]: () => { return { runs: [] }; From 463c8bef0ac9523eec95b18806af990b4f6ff2cc Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 12 Mar 2026 16:33:10 +0000 Subject: [PATCH 7/8] formatting --- packages/ws-worker/src/api/claim.ts | 8 ++++++-- packages/ws-worker/src/start.ts | 3 ++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/packages/ws-worker/src/api/claim.ts b/packages/ws-worker/src/api/claim.ts index e8720b57a..af69c17fe 100644 --- a/packages/ws-worker/src/api/claim.ts +++ b/packages/ws-worker/src/api/claim.ts @@ -69,10 +69,14 @@ const claim = ( ); if (activeInWorkloop >= capacity) { - workloop.stop(`workloop ${workloop.id} at capacity (${activeInWorkloop}/${capacity})`); + workloop.stop( + `workloop ${workloop.id} at capacity (${activeInWorkloop}/${capacity})` + ); return reject(new ClaimError('Workloop at capacity')); } else if (activeInWorkloop + pendingWorkloopClaims >= capacity) { - workloop.stop(`workloop ${workloop.id} at capacity (${activeInWorkloop}/${capacity}, ${pendingWorkloopClaims} pending)`); + workloop.stop( + `workloop ${workloop.id} at capacity (${activeInWorkloop}/${capacity}, ${pendingWorkloopClaims} pending)` + ); return reject(new ClaimError('Workloop at capacity')); } diff --git a/packages/ws-worker/src/start.ts b/packages/ws-worker/src/start.ts index b22ea164e..9d901c7af 100644 --- a/packages/ws-worker/src/start.ts +++ b/packages/ws-worker/src/start.ts @@ -8,7 +8,8 @@ import getDefaultWorkloopConfig from './util/get-default-workloop-config'; const args = cli(process.argv); -const workloopConfigs = args.workloops ?? getDefaultWorkloopConfig(args.capacity); +const workloopConfigs = + args.workloops ?? getDefaultWorkloopConfig(args.capacity); // Sum the capacity from each ":" token in the workloop string const effectiveCapacity = workloopConfigs From 796363a2a6b3d1f8a8540a31ecdff32a41100ab0 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 12 Mar 2026 16:56:05 +0000 Subject: [PATCH 8/8] types --- packages/ws-worker/src/api/destroy.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/ws-worker/src/api/destroy.ts b/packages/ws-worker/src/api/destroy.ts index afeafd9d3..85063aea0 100644 --- a/packages/ws-worker/src/api/destroy.ts +++ b/packages/ws-worker/src/api/destroy.ts @@ -1,4 +1,4 @@ -import { ServerApp, pendingClaims } from '../server'; +import { ServerApp } from '../server'; import { INTERNAL_CLAIM_COMPLETE, INTERNAL_RUN_COMPLETE } from '../events'; import type { Logger } from '@openfn/logger';