Skip to content
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
.phpunit.result.cache
.phpunit.cache
.env
/coverage
/coverage
.DS_Store
143 changes: 143 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,149 @@ If you're using separate services for dispatching and handling tasks, and your a
'disable_task_handler' => env('CLOUD_TASKS_DISABLE_TASK_HANDLER', false),
```

### Cloud Run Jobs

If you want jobs to be processed by Cloud Run Jobs instead of HTTP endpoints, you can configure the queue to trigger Cloud Run Job executions.

#### Why Cloud Run Jobs?

Cloud Run Jobs are ideal for long-running batch processing that exceeds Cloud Tasks HTTP timeout limits.

Cloud Run Jobs can run for up to 7 days.

**Tip**: Use seperate queue connections with different targets, for low latency jobs, use HTTP targets, for longer running batch jobs use Cloud Run Jobs.

#### Setup

1. **Create a Cloud Run Job** with your Laravel application container, configured to run:

```bash
php artisan cloud-tasks:work-job
```

The command reads job data from environment variables passed to the Job by Cloud Run.

2. **Configure your queue connection**:

```php
'cloudtasks' => [
'driver' => 'cloudtasks',
'project' => env('CLOUD_TASKS_PROJECT'),
'location' => env('CLOUD_TASKS_LOCATION'),
'queue' => env('CLOUD_TASKS_QUEUE', 'default'),

// Cloud Run Job configuration
'cloud_run_job' => env('CLOUD_TASKS_USE_CLOUD_RUN_JOB', false),
'cloud_run_job_name' => env('CLOUD_RUN_JOB_NAME'),
'cloud_run_job_region' => env('CLOUD_RUN_JOB_REGION'), // defaults to location
'service_account_email' => env('CLOUD_TASKS_SERVICE_EMAIL'),

// Optional: Store large payloads (>10KB) in filesystem
'payload_disk' => env('CLOUD_TASKS_PAYLOAD_DISK'), // Laravel disk name
'payload_prefix' => env('CLOUD_TASKS_PAYLOAD_PREFIX', 'cloud-tasks-payloads'),
'payload_threshold' => env('CLOUD_TASKS_PAYLOAD_THRESHOLD', 10240), // bytes
],
```

> **Note**: The command reads `CLOUD_TASKS_PAYLOAD`, `CLOUD_TASKS_TASK_NAME`, and `CLOUD_TASKS_PAYLOAD_PATH` directly from environment variables at runtime using `getenv()`. These are set automatically by Cloud Tasks via container overrides.

3. **Set environment variables**:

```dotenv
CLOUD_TASKS_USE_CLOUD_RUN_JOB=true
CLOUD_RUN_JOB_NAME=my-queue-worker-job
CLOUD_RUN_JOB_REGION=europe-west1
```

#### Large Payload Storage

For jobs with payloads exceeding environment variable limits (32KB limit enforced by Cloud Run), configure a Laravel filesystem disk:

```dotenv
CLOUD_TASKS_PAYLOAD_DISK=gcs
CLOUD_TASKS_PAYLOAD_PREFIX=cloud-tasks-payloads
CLOUD_TASKS_PAYLOAD_THRESHOLD=30000
```

When the payload exceeds the threshold, it's stored in the disk and `CLOUD_TASKS_PAYLOAD_PATH` is used instead.

> **Note**: The payloads will not be cleared up automatically, you can define lifecycle rules for the GCS bucket to delete old payloads.

#### How It Works

When you dispatch a job with Cloud Run Job target enabled:

1. Package creates a Cloud Task with HTTP target pointing to Cloud Run Jobs API
2. Cloud Tasks calls `run.googleapis.com/v2/.../jobs/{job}:run`
3. Cloud Run Jobs starts a new execution with environment variables set via container overrides:
- `CLOUD_TASKS_PAYLOAD` - Base64-encoded job payload
- `CLOUD_TASKS_TASK_NAME` - The task name
4. The container runs `php artisan cloud-tasks:work-job` which reads the env vars and processes the job

All Laravel queue functionality is retained:
- Job retries and max attempts
- Failed job handling
- Job timeouts
- Encrypted jobs
- Queue events

#### Required IAM Permissions

Cloud Run Jobs requires specific IAM permissions. Set these variables first:

```bash
export PROJECT_ID="your-project-id"
export SA_EMAIL="your-service-account@your-project-id.iam.gserviceaccount.com"
export TASKS_AGENT="service-{PROJECT_NUMBER}@gcp-sa-cloudtasks.iam.gserviceaccount.com"
```

> **Note**: Find your Cloud Tasks service agent email in the IAM console under "Include Google-provided role grants".
> **Note**: Project ID and Project Number are different. Project ID is the name of your project, Project Number is the numeric ID of your project.

**Project-Level Permissions:**

```bash
# Allow enqueuing tasks (required by PHP app running as $SA_EMAIL)
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SA_EMAIL" \
--role="roles/cloudtasks.enqueuer"

# Allow executing jobs with overrides (required for container overrides)
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SA_EMAIL" \
--role="roles/run.jobsExecutorWithOverrides"

# Allow invoking Cloud Run Services (if also using Cloud Run Services as HTTP targets)
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SA_EMAIL" \
--role="roles/run.invoker"
```

**Note**: To restrict access to specific Cloud Run instances, use IAM conditions to limit access to specific Cloud Run Jobs / services.

**Service Account Permissions:**

```bash
# Allow the SA to act as itself (required for task creation and execution)
gcloud iam service-accounts add-iam-policy-binding $SA_EMAIL \
--member="serviceAccount:$SA_EMAIL" \
--role="roles/iam.serviceAccountUser"

# Allow Cloud Tasks to act as the SA (required for OAuth token generation)
gcloud iam service-accounts add-iam-policy-binding $SA_EMAIL \
--member="serviceAccount:$TASKS_AGENT" \
--role="roles/iam.serviceAccountUser"
```

| Permission | Required By | Purpose |
|------------|-------------|---------|
| `cloudtasks.enqueuer` | PHP App | Add tasks to the queue |
| `cloudtasks.viewer` | Cloud Run Job | List queues/tasks (optional) |
| `run.jobsExecutorWithOverrides` | Cloud Task | Execute jobs with container overrides |
| `run.invoker` | Other Workloads | Invoke Cloud Run Services (if using HTTP targets) |
| `iam.serviceAccountUser` (on SA) | Both | Allow SA to create tasks as itself |
| `iam.serviceAccountUser` (Tasks Agent) | Google Infrastructure | Generate OAuth tokens for Cloud Run |

### How-To

#### Pass headers to a task
Expand Down
8 changes: 7 additions & 1 deletion src/CloudTasksConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@
* service_account_email?: string,
* backoff?: int,
* dispatch_deadline?: int,
* after_commit?: bool
* after_commit?: bool,
* cloud_run_job?: bool,
* cloud_run_job_name?: string,
* cloud_run_job_region?: string,
* payload_disk?: string,
* payload_prefix?: string,
* payload_threshold?: int
* }
*/
class CloudTasksConnector implements ConnectorInterface
Expand Down
100 changes: 100 additions & 0 deletions src/CloudTasksQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
use Illuminate\Queue\WorkerOptions;
use Google\Cloud\Tasks\V2\OidcToken;
use Google\Cloud\Tasks\V2\HttpMethod;
use Google\Cloud\Tasks\V2\OAuthToken;
use Google\Cloud\Tasks\V2\HttpRequest;
use Illuminate\Support\Facades\Storage;
use Google\Cloud\Tasks\V2\AppEngineRouting;
use Illuminate\Queue\Queue as LaravelQueue;
use Google\Cloud\Tasks\V2\AppEngineHttpRequest;
Expand Down Expand Up @@ -278,6 +280,45 @@ public function addPayloadToTask(array $payload, Task $task, $job): Task
}

$task->setAppEngineHttpRequest($appEngineRequest);
} elseif (! empty($this->config['cloud_run_job'])) {
// Cloud Run Job target - call the Cloud Run Jobs execution API
$httpRequest = new HttpRequest;
$httpRequest->setUrl($this->getCloudRunJobExecutionUrl());
$httpRequest->setHttpMethod(HttpMethod::POST);
$httpRequest->setHeaders(array_merge($headers, [
'Content-Type' => 'application/json',
]));

// Build the execution request body with container overrides
// The job payload is passed as environment variables
$taskNameShort = str($task->getName())->afterLast('/')->toString();
$encodedPayload = base64_encode(json_encode($payload));

// Build env vars for the container using fixed env var names
// These map to config keys: cloud_run_job_payload, cloud_run_job_task_name, cloud_run_job_payload_path
$envVars = $this->getCloudRunJobEnvVars($encodedPayload, $taskNameShort);

$executionBody = [
'overrides' => [
'containerOverrides' => [
[
'env' => $envVars,
],
],
],
];

$httpRequest->setBody(json_encode($executionBody));

$token = new OAuthToken;
$token->setServiceAccountEmail($this->config['service_account_email'] ?? '');
$token->setScope('https://www.googleapis.com/auth/cloud-platform');
$httpRequest->setOAuthToken($token);
$task->setHttpRequest($httpRequest);

if (! empty($this->config['dispatch_deadline'])) {
$task->setDispatchDeadline((new Duration)->setSeconds($this->config['dispatch_deadline']));
}
} else {
$httpRequest = new HttpRequest;
$httpRequest->setUrl($this->getHandler($job));
Expand Down Expand Up @@ -367,4 +408,63 @@ private function getQueueForJob(mixed $job): string

return $this->config['queue'];
}

/**
* Get the Cloud Run Jobs execution API URL.
*/
private function getCloudRunJobExecutionUrl(): string
{
$project = $this->config['project'];
$region = $this->config['cloud_run_job_region'] ?? $this->config['location'];
$jobName = $this->config['cloud_run_job_name'] ?? throw new Exception('cloud_run_job_name is required when using Cloud Run Jobs.');

return sprintf(
'https://run.googleapis.com/v2/projects/%s/locations/%s/jobs/%s:run',
$project,
$region,
$jobName
);
}

/**
* Get the environment variables for Cloud Run Job dispatch.
*
* If the payload exceeds the configured threshold, it will be stored
* in the configured disk and the path will be returned instead.
*
* Env vars set map to config keys in the queue connection:
* - CLOUD_TASKS_TASK_NAME -> cloud_run_job_task_name
* - CLOUD_TASKS_PAYLOAD -> cloud_run_job_payload
* - CLOUD_TASKS_PAYLOAD_PATH -> cloud_run_job_payload_path
*
* @return array<int, array{name: string, value: string}>
*/
private function getCloudRunJobEnvVars(string $encodedPayload, string $taskName): array
{
$disk = $this->config['payload_disk'] ?? null;
$threshold = $this->config['payload_threshold'] ?? 10240; // 10KB default

$envVars = [
['name' => 'CLOUD_TASKS_TASK_NAME', 'value' => $taskName],
];

// If no disk configured or payload is below threshold, pass payload directly
if ($disk === null || strlen($encodedPayload) <= $threshold) {
$envVars[] = ['name' => 'CLOUD_TASKS_PAYLOAD', 'value' => $encodedPayload];

return $envVars;
}

// Store payload in configured disk and pass path instead
$prefix = $this->config['payload_prefix'] ?? 'cloud-tasks-payloads';
$timestamp = now()->format('Y-m-d_H:i:s.v');
$path = sprintf('%s/%s_%s.json', $prefix, $timestamp, $taskName);

Storage::disk($disk)->put($path, $encodedPayload);

// Set the path env var for large payloads
$envVars[] = ['name' => 'CLOUD_TASKS_PAYLOAD_PATH', 'value' => $disk.':'.$path];

return $envVars;
}
}
11 changes: 11 additions & 0 deletions src/CloudTasksServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Google\Cloud\Tasks\V2\Client\CloudTasksClient;
use Stackkit\LaravelGoogleCloudTasksQueue\Events\JobReleased;
use Illuminate\Support\ServiceProvider as LaravelServiceProvider;
use Stackkit\LaravelGoogleCloudTasksQueue\Commands\WorkCloudRunJob;

class CloudTasksServiceProvider extends LaravelServiceProvider
{
Expand All @@ -24,6 +25,7 @@ public function boot(): void
$this->registerConfig();
$this->registerRoutes();
$this->registerEvents();
$this->registerCommands();
}

private function registerClient(): void
Expand Down Expand Up @@ -112,4 +114,13 @@ private function registerEvents(): void
}
});
}

private function registerCommands(): void
{
if ($this->app->runningInConsole()) {
$this->commands([
WorkCloudRunJob::class,
]);
}
}
}
Loading
Loading