Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 9 additions & 20 deletions packages/ws-worker/src/api/claim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ export const resetClaimIdGen = () => {

const claim = (
app: ServerApp,
logger: Logger = mockLogger,
workloop: Workloop,
options?: ClaimOptions
logger: Logger = mockLogger,
options: ClaimOptions = {}
) => {
return new Promise<void>((resolve, reject) => {
const { demand = 1 } = options ?? {};
const { demand = 1 } = options;
const podName = NAME ? `[${NAME}] ` : '';

const activeInWorkloop = workloop.activeRuns.size;
Expand All @@ -69,19 +69,14 @@ const claim = (
);

if (activeInWorkloop >= capacity) {
// Important: stop the workloop so that we don't try and claim any more
app.workloopHandles
?.get(workloop)
?.stop(
`workloop ${workloop.id} at capacity (${activeInWorkloop}/${capacity})`
);
workloop.stop(
`workloop ${workloop.id} at capacity (${activeInWorkloop}/${capacity})`
);
return reject(new ClaimError('Workloop at capacity'));
} else if (activeInWorkloop + pendingWorkloopClaims >= capacity) {
app.workloopHandles
?.get(workloop)
?.stop(
`workloop ${workloop.id} at capacity (${activeInWorkloop}/${capacity}, ${pendingWorkloopClaims} pending)`
);
workloop.stop(
`workloop ${workloop.id} at capacity (${activeInWorkloop}/${capacity}, ${pendingWorkloopClaims} pending)`
);
return reject(new ClaimError('Workloop at capacity'));
}

Expand All @@ -100,10 +95,7 @@ const claim = (

const claimId = ++claimIdGen;

// Track in both workloop-level and app-level openClaims for backward compat
workloop.openClaims[claimId] = demand;
app.openClaims ??= {};
app.openClaims[claimId] = demand;

const { used_heap_size, heap_size_limit } = v8.getHeapStatistics();
const usedHeapMb = Math.round(used_heap_size / 1024 / 1024);
Expand All @@ -123,7 +115,6 @@ const claim = (
})
.receive('ok', async ({ runs }: ClaimReply) => {
delete workloop.openClaims[claimId];
delete app.openClaims[claimId];
const duration = Date.now() - start;
logger.debug(
`${podName}claimed ${runs.length} runs in ${duration}ms (${
Expand Down Expand Up @@ -169,13 +160,11 @@ const claim = (
// What do we do if we fail to join the worker channel?
.receive('error', (e) => {
delete workloop.openClaims[claimId];
delete app.openClaims[claimId];
logger.error('Error on claim', e);
reject(new Error('claim error'));
})
.receive('timeout', () => {
delete workloop.openClaims[claimId];
delete app.openClaims[claimId];
logger.error('TIMEOUT on claim. Runs may be lost.');
reject(new Error('timeout'));
});
Expand Down
19 changes: 6 additions & 13 deletions packages/ws-worker/src/api/destroy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const destroy = async (app: ServerApp, logger: Logger) => {

// Immediately stop asking for more work on all workloops
for (const w of app.workloops) {
app.workloopHandles.get(w)?.stop('server closed');
w.stop('server closed');
}

// Shut down the HTTP server
Expand Down Expand Up @@ -43,18 +43,14 @@ const waitForRunsAndClaims = (app: ServerApp, logger: Logger) =>
new Promise<void>((resolve) => {
const log = () => {
logger.debug(
`Waiting for ${Object.keys(app.workflows).length} runs and ${
Object.keys(app.openClaims).length
} claims to complete...`
`Waiting for ${
Object.keys(app.workflows).length
} runs and ${app.pendingClaims()} claims to complete...`
);
};

const checkAllClear = () => {
if (
Object.keys(app.workflows).length +
Object.keys(app.openClaims).length ===
0
) {
if (Object.keys(app.workflows).length + app.pendingClaims() === 0) {
logger.debug('All runs completed!');
app.events.off(INTERNAL_RUN_COMPLETE, checkAllClear);
app.events.off(INTERNAL_CLAIM_COMPLETE, checkAllClear);
Expand All @@ -64,10 +60,7 @@ const waitForRunsAndClaims = (app: ServerApp, logger: Logger) =>
}
};

if (
Object.keys(app.workflows).length ||
Object.keys(app.openClaims).length
) {
if (Object.keys(app.workflows).length || app.pendingClaims()) {
log();
app.events.on(INTERNAL_RUN_COMPLETE, checkAllClear);
app.events.on(INTERNAL_CLAIM_COMPLETE, checkAllClear);
Expand Down
126 changes: 64 additions & 62 deletions packages/ws-worker/src/api/workloop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,76 +4,78 @@ import claim from './claim';
import type { ServerApp } from '../server';
import type { CancelablePromise } from '../types';
import type { Logger } from '@openfn/logger';
import type { WorkloopConfig } from '../util/parse-workloops';

export interface Workloop extends WorkloopConfig {
export class Workloop {
id: string;
activeRuns: Set<string>;
openClaims: Record<string, number>;
}
queues: string[];
capacity: number;
activeRuns = new Set<string>();
openClaims: Record<string, number> = {};

export interface WorkloopHandle {
stop: (reason?: string) => void;
isStopped: () => boolean;
}
private cancelled = true;
private promise?: CancelablePromise;
private logger?: Logger;

export function createWorkloop(config: WorkloopConfig): Workloop {
return {
...config,
id: `${config.queues.join('>')}:${config.capacity}`,
activeRuns: new Set(),
openClaims: {},
};
}
constructor({
id,
queues,
capacity,
}: {
id: string;
queues: string[];
capacity: number;
}) {
this.id = id;
this.queues = queues;
this.capacity = capacity;
}

export function workloopHasCapacity(workloop: Workloop): boolean {
const pendingClaims = Object.values(workloop.openClaims).reduce(
(a, b) => a + b,
0
);
return workloop.activeRuns.size + pendingClaims < workloop.capacity;
}
hasCapacity(): boolean {
const pendingClaims = Object.values(this.openClaims).reduce(
(a, b) => a + b,
0
);
return this.activeRuns.size + pendingClaims < this.capacity;
}

const startWorkloop = (
app: ServerApp,
logger: Logger,
minBackoff: number,
maxBackoff: number,
workloop: Workloop
): WorkloopHandle => {
let promise: CancelablePromise;
let cancelled = false;
start(
app: ServerApp,
logger: Logger,
minBackoff: number,
maxBackoff: number
): void {
this.logger = logger;
this.cancelled = false;

const workLoop = () => {
if (!cancelled) {
promise = tryWithBackoff(() => claim(app, logger, workloop), {
min: minBackoff,
max: maxBackoff,
});
// TODO this needs more unit tests I think
promise
.then(() => {
if (!cancelled) {
setTimeout(workLoop, minBackoff);
}
})
.catch(() => {
// do nothing
const loop = () => {
if (!this.cancelled) {
this.promise = tryWithBackoff(() => claim(app, this, logger), {
min: minBackoff,
max: maxBackoff,
});
}
};
workLoop();
this.promise
.then(() => {
if (!this.cancelled) {
setTimeout(loop, minBackoff);
}
})
.catch(() => {
// do nothing
});
}
};
loop();
}

const stop = (reason = 'reason unknown') => {
if (!cancelled) {
logger.info(`cancelling workloop: ${reason}`);
cancelled = true;
promise.cancel();
stop(reason = 'reason unknown'): void {
if (!this.cancelled) {
this.logger?.info(`cancelling workloop: ${reason}`);
this.cancelled = true;
this.promise?.cancel();
}
};
const isStopped = () => cancelled;

return { stop, isStopped };
};
}

export default startWorkloop;
isStopped(): boolean {
return this.cancelled;
}
}
Loading