-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker-example.php
More file actions
83 lines (64 loc) · 2.08 KB
/
worker-example.php
File metadata and controls
83 lines (64 loc) · 2.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
<?php
declare(strict_types=1);
/**
* Worker example for the Spooled PHP SDK.
*/
require_once __DIR__ . '/../vendor/autoload.php';
use Spooled\SpooledClient;
use Spooled\Config\ClientOptions;
use Spooled\Worker\SpooledWorker;
use Spooled\Worker\WorkerConfig;
use Spooled\Worker\JobContext;
// Create client
$client = new SpooledClient(new ClientOptions(
apiKey: getenv('API_KEY') ?: 'your-api-key',
));
// Create worker
$worker = new SpooledWorker($client, new WorkerConfig(
queueName: 'example-queue',
concurrency: 3,
pollInterval: 1000, // 1 second (ms)
heartbeatInterval: 30000, // 30 seconds (ms)
));
// Register event handlers
$worker->on('started', function (array $data): void {
echo "Worker started: {$data['workerId']}\n";
});
$worker->on('job:claimed', function (array $data): void {
echo "Job claimed: {$data['job']->id}\n";
});
$worker->on('job:completed', function (array $data): void {
echo "Job completed: {$data['job']->id}\n";
});
$worker->on('job:failed', function (array $data): void {
echo "Job failed: {$data['job']->id} - {$data['error']}\n";
});
$worker->on('stopped', function (array $data): void {
echo "Worker stopped. Completed: {$data['completedJobs']}, Failed: {$data['failedJobs']}\n";
});
// Handle shutdown signals
if (function_exists('pcntl_signal')) {
pcntl_signal(SIGTERM, fn () => $worker->stop());
pcntl_signal(SIGINT, fn () => $worker->stop());
}
echo "Starting worker...\n";
// Define job handler
$worker->process(function (JobContext $ctx): array {
echo "Processing job {$ctx->jobId} from queue {$ctx->queueName}\n";
echo "Payload: " . json_encode($ctx->payload) . "\n";
// Simulate work
sleep(1);
// Check if we should stop
if ($ctx->isShuttingDown()) {
echo "Shutdown requested, stopping early\n";
throw new \RuntimeException('Worker shutdown requested');
}
echo "Job {$ctx->jobId} processed successfully\n";
// Return result
return [
'processedAt' => date('c'),
'success' => true,
];
});
// Start the worker (blocking)
$worker->start();