diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index 2ba9273..e036147 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -45,7 +45,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe /** * Move Job to Jobs and it's PID to the processing list. */ - $this->connection->setArray("{$queue->namespace}.jobs.{$queue->name}.{$message->getPid()}", $nextMessage); + $this->connection->setArray("{$queue->namespace}.jobs.{$queue->name}.{$message->getPid()}", $nextMessage, $queue->jobTtl); $this->connection->leftPush("{$queue->namespace}.processing.{$queue->name}", $message->getPid()); /** diff --git a/src/Queue/Connection.php b/src/Queue/Connection.php index c1310b7..de80708 100644 --- a/src/Queue/Connection.php +++ b/src/Queue/Connection.php @@ -19,9 +19,9 @@ public function listSize(string $key): int; public function listRange(string $key, int $total, int $offset): array; public function remove(string $key): bool; public function move(string $queue, string $destination): bool; - public function set(string $key, string $value): bool; + public function set(string $key, string $value, int $ttl = 0): bool; public function get(string $key): array|string|null; - public function setArray(string $key, array $value): bool; + public function setArray(string $key, array $value, int $ttl = 0): bool; public function increment(string $key): int; public function decrement(string $key): int; public function ping(): bool; diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index 7418e43..a0729b3 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -119,13 +119,16 @@ public function move(string $queue, string $destination): bool return $this->getRedis()->move($queue, $destination); } - public function setArray(string $key, array $value): bool + public function setArray(string $key, array $value, int $ttl = 0): bool { - return $this->set($key, json_encode($value)); + return $this->set($key, json_encode($value), $ttl); } - public function set(string $key, string $value): bool + public function set(string $key, string $value, int $ttl = 0): bool { + if ($ttl > 0) { + return $this->getRedis()->setex($key, $ttl, $value); + } return $this->getRedis()->set($key, $value); } diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php index 476b735..ec9ec68 100644 --- a/src/Queue/Connection/RedisCluster.php +++ b/src/Queue/Connection/RedisCluster.php @@ -114,13 +114,16 @@ public function move(string $queue, string $destination): bool return false; } - public function setArray(string $key, array $value): bool + public function setArray(string $key, array $value, int $ttl = 0): bool { - return $this->set($key, json_encode($value)); + return $this->set($key, json_encode($value), $ttl); } - public function set(string $key, string $value): bool + public function set(string $key, string $value, int $ttl = 0): bool { + if ($ttl > 0) { + return $this->getRedis()->setex($key, $ttl, $value); + } return $this->getRedis()->set($key, $value); } diff --git a/src/Queue/Queue.php b/src/Queue/Queue.php index d22d971..fae2e66 100644 --- a/src/Queue/Queue.php +++ b/src/Queue/Queue.php @@ -7,6 +7,7 @@ public function __construct( public string $name, public string $namespace = 'utopia-queue', + public int $jobTtl = 0, ) { if (empty($this->name)) { throw new \InvalidArgumentException('Cannot create queue with empty name.');