Skip to content

Commit c7d445e

Browse files
committed
feat: add repeating jobs with external cancellation support
1 parent 62e50d3 commit c7d445e

21 files changed

Lines changed: 1700 additions & 77 deletions

README.md

Lines changed: 101 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ npm install @boringnode/queue
2929
- **Priority Queues**: Process high-priority jobs first
3030
- **Retry with Backoff**: Automatic retries with exponential, linear, or fixed backoff strategies
3131
- **Job Timeout**: Automatically fail or retry jobs that exceed a time limit
32+
- **Repeating Jobs**: Schedule jobs to repeat at fixed intervals
3233

3334
## Quick Start
3435

@@ -51,10 +52,6 @@ export default class SendEmailJob extends Job<SendEmailPayload> {
5152
queue: 'email',
5253
}
5354

54-
constructor(payload: SendEmailPayload, context: JobContext) {
55-
super(payload, context)
56-
}
57-
5855
async execute(): Promise<void> {
5956
console.log(`[Attempt ${this.context.attempt}] Sending email to: ${this.payload.to}`)
6057
}
@@ -135,7 +132,7 @@ interface QueueManagerConfig {
135132
// Worker configuration
136133
worker: {
137134
concurrency: number
138-
pollingInterval: string
135+
idleDelay: Duration
139136
}
140137

141138
// Job discovery locations
@@ -242,9 +239,9 @@ Schedule jobs to run in the future:
242239
```typescript
243240
// Various time formats
244241
await SendEmailJob.dispatch(payload).in('30s') // 30 seconds
245-
await SendEmailJob.dispatch(payload).in('5m') // 5 minutes
246-
await SendEmailJob.dispatch(payload).in('2h') // 2 hours
247-
await SendEmailJob.dispatch(payload).in('1d') // 1 day
242+
await SendEmailJob.dispatch(payload).in('5m') // 5 minutes
243+
await SendEmailJob.dispatch(payload).in('2h') // 2 hours
244+
await SendEmailJob.dispatch(payload).in('1d') // 1 day
248245
```
249246

250247
## Priority
@@ -358,15 +355,18 @@ export default class MyJob extends Job<Payload> {
358355

359356
### Context Properties
360357

361-
| Property | Type | Description |
362-
| -------------- | ------ | ----------------------------------------------- |
363-
| `jobId` | string | Unique identifier for this job |
364-
| `name` | string | Job class name |
365-
| `attempt` | number | Current attempt number (1-based) |
366-
| `queue` | string | Queue name this job is being processed from |
367-
| `priority` | number | Job priority (lower = higher priority) |
368-
| `acquiredAt` | Date | When this job was acquired by the worker |
369-
| `stalledCount` | number | Times this job was recovered from stalled state |
358+
| Property | Type | Description |
359+
|-------------------|---------------------|---------------------------------------------------|
360+
| `jobId` | string | Unique identifier for this job |
361+
| `name` | string | Job class name |
362+
| `attempt` | number | Current attempt number (1-based) |
363+
| `queue` | string | Queue name this job is being processed from |
364+
| `priority` | number | Job priority (lower = higher priority) |
365+
| `acquiredAt` | Date | When this job was acquired by the worker |
366+
| `stalledCount` | number | Times this job was recovered from stalled state |
367+
| `isRepeating` | boolean | Whether this job is configured to repeat |
368+
| `repeatRemaining` | number \| undefined | Remaining repetitions (undefined = infinite) |
369+
| `repeatId` | string \| undefined | Unique ID for the repeat chain (for cancellation) |
370370

371371
## Dependency Injection
372372

@@ -417,6 +417,88 @@ export default class SendEmailJob extends Job<SendEmailPayload> {
417417

418418
Without a `jobFactory`, jobs are instantiated with `new JobClass(payload, context)`.
419419

420+
## Repeating Jobs
421+
422+
Schedule jobs to repeat automatically at fixed intervals:
423+
424+
```typescript
425+
// Repeat every 5 seconds indefinitely
426+
await SyncJob.dispatch({ source: 'api' }).every('5s')
427+
428+
// Repeat every hour, 10 times total
429+
await CleanupJob.dispatch({ days: 30 }).every('1h').times(10)
430+
431+
// Combine with delay (start after 30 seconds, then repeat every minute)
432+
await ReportJob.dispatch({ type: 'daily' }).in('30s').every('1m')
433+
```
434+
435+
### Cancelling a Repeating Job
436+
437+
When dispatching a repeating job, you receive a `repeatId` that can be used to cancel the entire repeat chain from anywhere:
438+
439+
```typescript
440+
import { QueueManager } from '@boringnode/queue'
441+
442+
// Dispatch returns jobId and repeatId
443+
const { jobId, repeatId } = await SyncJob.dispatch({ source: 'api' }).every('5s')
444+
445+
console.log(`Started repeating job ${jobId} with repeat chain ${repeatId}`)
446+
447+
// Later, cancel the repeat chain from anywhere
448+
if (repeatId) {
449+
await QueueManager.cancelRepeat(repeatId)
450+
}
451+
```
452+
453+
The `repeatId` is also available inside the job via `this.context.repeatId`.
454+
455+
### Stopping from Within the Job
456+
457+
A job can stop its own repetition by calling `this.stopRepeating()`:
458+
459+
```typescript
460+
import { Job } from '@boringnode/queue'
461+
import type { JobContext } from '@boringnode/queue/types'
462+
463+
export default class SyncJob extends Job<SyncPayload> {
464+
static readonly jobName = 'SyncJob'
465+
466+
async execute(): Promise<void> {
467+
const result = await this.syncData()
468+
469+
// Stop repeating when sync is complete
470+
if (result.isComplete) {
471+
this.stopRepeating()
472+
}
473+
}
474+
}
475+
```
476+
477+
### Repeat Context
478+
479+
Jobs have access to repeat information via `this.context`:
480+
481+
```typescript
482+
async execute(): Promise<void> {
483+
if (this.context.isRepeating) {
484+
console.log(`Repeating job, ${this.context.repeatRemaining ?? 'infinite'} runs remaining`)
485+
}
486+
}
487+
```
488+
489+
| Property | Type | Description |
490+
|-------------------|---------------------|---------------------------------------------------|
491+
| `isRepeating` | boolean | Whether this job is configured to repeat |
492+
| `repeatRemaining` | number \| undefined | Remaining repetitions (undefined = infinite) |
493+
| `repeatId` | string \| undefined | Unique ID for the repeat chain (for cancellation) |
494+
495+
### How Repeating Works
496+
497+
- Each repeat creates a **new job** with a new ID
498+
- The payload is **preserved** across repeats
499+
- Failed jobs do **not** repeat (only successful completions trigger the next run)
500+
- The repeat interval is the delay **between** job completions
501+
420502
## Job Discovery
421503

422504
The queue manager automatically discovers and registers jobs from the specified locations:
@@ -459,7 +541,7 @@ By default, a simple console logger is used that only outputs warnings and error
459541
Performance comparison with BullMQ using realistic jobs (5ms simulated work per job):
460542

461543
| Jobs | Concurrency | @boringnode/queue | BullMQ | Diff |
462-
| ---- | ----------- | ----------------- | ------ | ----------- |
544+
|------|-------------|-------------------|--------|-------------|
463545
| 100 | 1 | 562ms | 596ms | 5.7% faster |
464546
| 100 | 5 | 116ms | 117ms | ~same |
465547
| 100 | 10 | 62ms | 62ms | ~same |
@@ -495,4 +577,4 @@ npm run benchmark -- --duration=10
495577
[typescript-image]: https://img.shields.io/badge/Typescript-294E80.svg?style=for-the-badge&logo=typescript
496578
[typescript-url]: https://www.typescriptlang.org
497579
[license-image]: https://img.shields.io/npm/l/@boringnode/queue?color=blueviolet&style=for-the-badge
498-
[license-url]: LICENSE.md
580+
[license-url]: LICENSE.md

examples/app.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { config } from './config.js'
22
import { QueueManager } from '../src/queue_manager.js'
33
import SendEmailJob from './jobs/send_email_job.js'
44
import SyncJob from './jobs/sync_job.js'
5+
import MetricsJob from './jobs/metrics_job.js'
56

67
await QueueManager.init(config)
78

@@ -13,4 +14,13 @@ for (let i = 0; i < 10; i++) {
1314
await SendEmailJob.dispatch({ to: 'romain.lanz@pm.me' + i })
1415
}
1516

17+
// Example: Dispatch a repeating job and get the repeatId for later cancellation
18+
const { jobId, repeatId } = await MetricsJob.dispatch({ endpoint: '/api/health' }).every('10s')
19+
20+
console.log(`Started metrics collection job ${jobId}`)
21+
console.log(`To cancel this repeating job, use: await QueueManager.cancelRepeat('${repeatId}')`)
22+
23+
// Example: Cancel a repeating job after some condition
24+
// await QueueManager.cancelRepeat(repeatId)
25+
1626
await QueueManager.destroy()

examples/jobs/metrics_job.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import { Job } from '../../src/job.js'
2+
import type { JobOptions } from '../../src/types/index.js'
3+
4+
interface MetricsJobPayload {
5+
endpoint: string
6+
}
7+
8+
/**
9+
* Example job that collects metrics at regular intervals.
10+
* Demonstrates repeating jobs with external cancellation.
11+
*/
12+
export default class MetricsJob extends Job<MetricsJobPayload> {
13+
static readonly jobName = 'MetricsJob'
14+
15+
static options: JobOptions = {
16+
queue: 'metrics',
17+
}
18+
19+
async execute(): Promise<void> {
20+
const repeatInfo = this.context.isRepeating
21+
? ` (repeat ${this.context.repeatRemaining ?? '∞'} remaining, repeatId: ${this.context.repeatId})`
22+
: ''
23+
24+
console.log(
25+
`[Job ${this.context.jobId}] Collecting metrics from ${this.payload.endpoint}${repeatInfo}`
26+
)
27+
28+
// Simulate metrics collection
29+
const metrics = {
30+
timestamp: new Date().toISOString(),
31+
cpu: Math.random() * 100,
32+
memory: Math.random() * 100,
33+
}
34+
35+
console.log(`[Job ${this.context.jobId}] Metrics:`, metrics)
36+
}
37+
}

examples/jobs/send_email_job.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { setTimeout } from 'node:timers/promises'
21
import { Job } from '../../src/job.js'
32
import type { JobOptions } from '../../src/types/index.js'
43

@@ -14,7 +13,6 @@ export default class SendEmailJob extends Job<SendEmailPayload> {
1413
}
1514

1615
async execute(): Promise<void> {
17-
await setTimeout(1000)
1816
console.log(`[Attempt ${this.context.attempt}] Sending email to: ${this.payload.to}`)
1917
}
2018
}

examples/worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ import { Worker } from '../src/worker.js'
22
import { config } from './config.js'
33

44
const worker = new Worker(config)
5-
await worker.start(['default', 'email', 'reports'])
5+
await worker.start(['default', 'email', 'reports', 'metrics'])

src/contracts/adapter.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,4 +149,22 @@ export interface Adapter {
149149
* Called when the worker stops or the adapter is no longer needed.
150150
*/
151151
destroy(): Promise<void>
152+
153+
/**
154+
* Cancel a repeating job chain.
155+
*
156+
* After calling this, `isRepeatCancelled` will return true for this groupId,
157+
* and the worker will not re-dispatch jobs with this groupId.
158+
*
159+
* @param groupId - The repeat chain identifier (from RepeatConfig.groupId)
160+
*/
161+
cancelRepeat(groupId: string): Promise<void>
162+
163+
/**
164+
* Check if a repeat chain has been cancelled.
165+
*
166+
* @param groupId - The repeat chain identifier to check
167+
* @returns True if the repeat chain has been cancelled
168+
*/
169+
isRepeatCancelled(groupId: string): Promise<boolean>
152170
}

0 commit comments

Comments
 (0)