diff --git a/internal-packages/run-engine/src/engine/retrying.ts b/internal-packages/run-engine/src/engine/retrying.ts index 8001c5d8a2..6099d5b649 100644 --- a/internal-packages/run-engine/src/engine/retrying.ts +++ b/internal-packages/run-engine/src/engine/retrying.ts @@ -37,6 +37,10 @@ export type RetryOutcome = settings: TaskRunExecutionRetry; machine?: string; wasOOMError?: boolean; + // Current usage values for calculating updated totals + usageDurationMs: number; + costInCents: number; + machinePreset: string | null; }; export async function retryOutcomeFromCompletion( @@ -70,6 +74,9 @@ export async function retryOutcomeFromCompletion( machine: oomResult.machine, settings: { timestamp: Date.now() + delay, delay }, wasOOMError: true, + usageDurationMs: oomResult.usageDurationMs, + costInCents: oomResult.costInCents, + machinePreset: oomResult.machinePreset, }; } @@ -87,7 +94,7 @@ export async function retryOutcomeFromCompletion( return { outcome: "fail_run", sanitizedError }; } - // Get the run settings + // Get the run settings and current usage values const run = await prisma.taskRun.findFirst({ where: { id: runId, @@ -95,6 +102,9 @@ export async function retryOutcomeFromCompletion( select: { maxAttempts: true, lockedRetryConfig: true, + usageDurationMs: true, + costInCents: true, + machinePreset: true, }, }); @@ -151,6 +161,9 @@ export async function retryOutcomeFromCompletion( outcome: "retry", method: "queue", // we'll always retry on the queue because usually having no settings means something bad happened settings: retrySettings, + usageDurationMs: run.usageDurationMs, + costInCents: run.costInCents, + machinePreset: run.machinePreset, }; } @@ -158,13 +171,22 @@ export async function retryOutcomeFromCompletion( outcome: "retry", method: retryUsingQueue ? "queue" : "immediate", settings: retrySettings, + usageDurationMs: run.usageDurationMs, + costInCents: run.costInCents, + machinePreset: run.machinePreset, }; } async function retryOOMOnMachine( prisma: PrismaClientOrTransaction, runId: string -): Promise<{ machine: string; retrySettings: RetryOptions } | undefined> { +): Promise<{ + machine: string; + retrySettings: RetryOptions; + usageDurationMs: number; + costInCents: number; + machinePreset: string | null; +} | undefined> { try { const run = await prisma.taskRun.findFirst({ where: { @@ -173,6 +195,8 @@ async function retryOOMOnMachine( select: { machinePreset: true, lockedRetryConfig: true, + usageDurationMs: true, + costInCents: true, }, }); @@ -201,7 +225,13 @@ async function retryOOMOnMachine( return; } - return { machine: retryMachine, retrySettings: parsedRetryConfig.data }; + return { + machine: retryMachine, + retrySettings: parsedRetryConfig.data, + usageDurationMs: run.usageDurationMs, + costInCents: run.costInCents, + machinePreset: run.machinePreset, + }; } catch (error) { console.error("[FailedTaskRunRetryHelper] Failed to get execution retry", { runId, diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index 70c9e9686a..fb8c833f16 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -175,7 +175,7 @@ export class RunAttemptSystem { } public async resolveTaskRunContext(runId: string): Promise { - const run = await this.$.prisma.taskRun.findFirst({ + const run = await this.$.readOnlyPrisma.taskRun.findFirst({ where: { id: runId, }, @@ -233,9 +233,9 @@ export class RunAttemptSystem { run.lockedById ? this.#resolveTaskRunExecutionTask(run.lockedById) : Promise.resolve({ - id: run.taskIdentifier, - filePath: "unknown", - }), + id: run.taskIdentifier, + filePath: "unknown", + }), this.#resolveTaskRunExecutionQueue({ runId, lockedQueueId: run.lockedQueueId ?? undefined, @@ -247,13 +247,13 @@ export class RunAttemptSystem { run.lockedById ? this.#resolveTaskRunExecutionMachinePreset(run.lockedById, run.machinePreset) : Promise.resolve( - getMachinePreset({ - defaultMachine: this.options.machines.defaultMachine, - machines: this.options.machines.machines, - config: undefined, - run, - }) - ), + getMachinePreset({ + defaultMachine: this.options.machines.defaultMachine, + machines: this.options.machines.machines, + config: undefined, + run, + }) + ), run.lockedById ? this.#resolveTaskRunExecutionDeployment(run.lockedById) : Promise.resolve(undefined), @@ -337,7 +337,7 @@ export class RunAttemptSystem { }); } - const taskRun = await prisma.taskRun.findFirst({ + const taskRun = await this.$.readOnlyPrisma.taskRun.findFirst({ where: { id: runId, }, @@ -460,9 +460,8 @@ export class RunAttemptSystem { run, snapshot: { executionStatus: "EXECUTING", - description: `Attempt created, starting execution${ - isWarmStart ? " (warm start)" : "" - }`, + description: `Attempt created, starting execution${isWarmStart ? " (warm start)" : "" + }`, }, previousSnapshotId: latestSnapshot.id, environmentId: latestSnapshot.environmentId, @@ -621,8 +620,8 @@ export class RunAttemptSystem { deployment, batch: updatedRun.batchId ? { - id: BatchId.toFriendlyId(updatedRun.batchId), - } + id: BatchId.toFriendlyId(updatedRun.batchId), + } : undefined, }; @@ -707,9 +706,34 @@ export class RunAttemptSystem { } span.setAttribute("completionStatus", completion.ok); + span.setAttribute("runId", runId); const completedAt = new Date(); + // Read current usage values to calculate new totals (safe under runLock) + const currentRun = await this.$.readOnlyPrisma.taskRun.findFirst({ + where: { id: runId }, + select: { + usageDurationMs: true, + costInCents: true, + machinePreset: true, + }, + }); + + if (!currentRun) { + throw new ServiceValidationError("Run not found", 404); + } + + // Calculate new usage totals + const updatedUsage = this.#calculateUpdatedUsage({ + runId, + currentUsageDurationMs: currentRun.usageDurationMs, + currentCostInCents: currentRun.costInCents, + attemptDurationMs: completion.usage?.durationMs ?? 0, + machinePresetName: currentRun.machinePreset, + environmentType: latestSnapshot.environmentType, + }); + const run = await prisma.taskRun.update({ where: { id: runId }, data: { @@ -717,6 +741,8 @@ export class RunAttemptSystem { completedAt, output: completion.output, outputType: completion.outputType, + usageDurationMs: updatedUsage.usageDurationMs, + costInCents: updatedUsage.costInCents, executionSnapshots: { create: { executionStatus: "FINISHED", @@ -873,7 +899,7 @@ export class RunAttemptSystem { const failedAt = new Date(); - const retryResult = await retryOutcomeFromCompletion(prisma, { + const retryResult = await retryOutcomeFromCompletion(this.$.readOnlyPrisma, { runId, error: completion.error, retryUsingQueue: forceRequeue ?? false, @@ -883,7 +909,7 @@ export class RunAttemptSystem { // Force requeue means it was crashed so the attempt span needs to be closed if (forceRequeue) { - const minimalRun = await prisma.taskRun.findFirst({ + const minimalRun = await this.$.readOnlyPrisma.taskRun.findFirst({ where: { id: runId, }, @@ -930,6 +956,7 @@ export class RunAttemptSystem { completedAt: failedAt, reason: retryResult.reason, finalizeRun: true, + attemptDurationMs: completion.usage?.durationMs, tx: prisma, }); return { @@ -948,17 +975,31 @@ export class RunAttemptSystem { error: retryResult.sanitizedError, workerId, runnerId, + attemptDurationMs: completion.usage?.durationMs, }); } case "retry": { const retryAt = new Date(retryResult.settings.timestamp); + // Calculate new usage totals using the current machine's rate + // (retryResult includes current usage values from the read in retryOutcomeFromCompletion) + const updatedUsage = this.#calculateUpdatedUsage({ + runId, + currentUsageDurationMs: retryResult.usageDurationMs, + currentCostInCents: retryResult.costInCents, + attemptDurationMs: completion.usage?.durationMs ?? 0, + machinePresetName: retryResult.machinePreset, + environmentType: latestSnapshot.environmentType, + }); + const run = await prisma.taskRun.update({ where: { id: runId, }, data: { machinePreset: retryResult.machine, + usageDurationMs: updatedUsage.usageDurationMs, + costInCents: updatedUsage.costInCents, }, include: { runtimeEnvironment: { @@ -1248,6 +1289,7 @@ export class RunAttemptSystem { If the run is in-progress it will change it's state to PENDING_CANCEL and notify the worker. If the run is not in-progress it will finish it. You can pass `finalizeRun` in if you know it's no longer running, e.g. the worker has messaged to say it's done. + You can optionally pass `attemptDurationMs` if you have completion data with usage info. */ async cancelRun({ runId, @@ -1257,6 +1299,7 @@ export class RunAttemptSystem { reason, finalizeRun, bulkActionId, + attemptDurationMs, tx, }: { runId: string; @@ -1266,6 +1309,7 @@ export class RunAttemptSystem { reason?: string; finalizeRun?: boolean; bulkActionId?: string; + attemptDurationMs?: number; tx?: PrismaClientOrTransaction; }): Promise { const prisma = tx ?? this.$.prisma; @@ -1312,6 +1356,32 @@ export class RunAttemptSystem { raw: reason, }; + // Calculate updated usage if we have attempt duration data + let usageUpdate: { usageDurationMs: number; costInCents: number } | undefined; + if (attemptDurationMs !== undefined) { + const currentRun = await this.$.readOnlyPrisma.taskRun.findFirst({ + where: { id: runId }, + select: { + usageDurationMs: true, + costInCents: true, + machinePreset: true, + }, + }); + + if (!currentRun) { + throw new ServiceValidationError("Run not found", 404); + } + + usageUpdate = this.#calculateUpdatedUsage({ + runId, + currentUsageDurationMs: currentRun.usageDurationMs, + currentCostInCents: currentRun.costInCents, + attemptDurationMs, + machinePresetName: currentRun.machinePreset, + environmentType: latestSnapshot.environmentType, + }); + } + const run = await prisma.taskRun.update({ where: { id: runId }, data: { @@ -1320,9 +1390,13 @@ export class RunAttemptSystem { error, bulkActionGroupIds: bulkActionId ? { - push: bulkActionId, - } + push: bulkActionId, + } : undefined, + ...(usageUpdate && { + usageDurationMs: usageUpdate.usageDurationMs, + costInCents: usageUpdate.costInCents, + }), }, select: { id: true, @@ -1478,6 +1552,7 @@ export class RunAttemptSystem { error, workerId, runnerId, + attemptDurationMs, }: { runId: string; latestSnapshot: EnhancedExecutionSnapshot; @@ -1485,6 +1560,7 @@ export class RunAttemptSystem { error: TaskRunError; workerId?: string; runnerId?: string; + attemptDurationMs?: number; }): Promise { const prisma = this.$.prisma; @@ -1493,6 +1569,30 @@ export class RunAttemptSystem { const truncatedError = this.#truncateTaskRunError(error); + // Read current usage values to calculate new totals + const currentRun = await this.$.readOnlyPrisma.taskRun.findFirst({ + where: { id: runId }, + select: { + usageDurationMs: true, + costInCents: true, + machinePreset: true, + }, + }); + + if (!currentRun) { + throw new ServiceValidationError("Run not found", 404); + } + + // Calculate new usage totals + const updatedUsage = this.#calculateUpdatedUsage({ + runId, + currentUsageDurationMs: currentRun.usageDurationMs, + currentCostInCents: currentRun.costInCents, + attemptDurationMs: attemptDurationMs ?? 0, + machinePresetName: currentRun.machinePreset, + environmentType: latestSnapshot.environmentType, + }); + //run permanently failed const run = await prisma.taskRun.update({ where: { @@ -1502,6 +1602,8 @@ export class RunAttemptSystem { status, completedAt: failedAt, error: truncatedError, + usageDurationMs: updatedUsage.usageDurationMs, + costInCents: updatedUsage.costInCents, }, select: { id: true, @@ -1618,7 +1720,7 @@ export class RunAttemptSystem { backgroundWorkerTaskId: string ): Promise { const result = await this.cache.tasks.swr(backgroundWorkerTaskId, async () => { - const task = await this.$.prisma.backgroundWorkerTask.findFirstOrThrow({ + const task = await this.$.readOnlyPrisma.backgroundWorkerTask.findFirstOrThrow({ where: { id: backgroundWorkerTaskId, }, @@ -1654,7 +1756,7 @@ export class RunAttemptSystem { organizationId: string ): Promise { const result = await this.cache.orgs.swr(organizationId, async () => { - const organization = await this.$.prisma.organization.findFirstOrThrow({ + const organization = await this.$.readOnlyPrisma.organization.findFirstOrThrow({ where: { id: organizationId }, select: { id: true, @@ -1687,7 +1789,7 @@ export class RunAttemptSystem { runtimeEnvironmentId: string ): Promise { const result = await this.cache.projects.swr(runtimeEnvironmentId, async () => { - const { project } = await this.$.prisma.runtimeEnvironment.findFirstOrThrow({ + const { project } = await this.$.readOnlyPrisma.runtimeEnvironment.findFirstOrThrow({ where: { id: runtimeEnvironmentId }, select: { id: true, @@ -1735,7 +1837,7 @@ export class RunAttemptSystem { } const result = await this.cache.machinePresets.swr(backgroundWorkerTaskId, async () => { - const { machineConfig } = await this.$.prisma.backgroundWorkerTask.findFirstOrThrow({ + const { machineConfig } = await this.$.readOnlyPrisma.backgroundWorkerTask.findFirstOrThrow({ where: { id: backgroundWorkerTaskId, }, @@ -1773,27 +1875,27 @@ export class RunAttemptSystem { }): Promise { const result = await this.cache.queues.swr(params.runId, async () => { const queue = params.lockedQueueId - ? await this.$.prisma.taskQueue.findFirst({ - where: { - id: params.lockedQueueId, - }, - select: { - id: true, - friendlyId: true, - name: true, - }, - }) - : await this.$.prisma.taskQueue.findFirst({ - where: { - runtimeEnvironmentId: params.runtimeEnvironmentId, - name: params.queueName, - }, - select: { - id: true, - friendlyId: true, - name: true, - }, - }); + ? await this.$.readOnlyPrisma.taskQueue.findFirst({ + where: { + id: params.lockedQueueId, + }, + select: { + id: true, + friendlyId: true, + name: true, + }, + }) + : await this.$.readOnlyPrisma.taskQueue.findFirst({ + where: { + runtimeEnvironmentId: params.runtimeEnvironmentId, + name: params.queueName, + }, + select: { + id: true, + friendlyId: true, + name: true, + }, + }); if (!queue) { throw new ServiceValidationError( @@ -1826,7 +1928,7 @@ export class RunAttemptSystem { backgroundWorkerTaskId: string ): Promise { const result = await this.cache.deployments.swr(backgroundWorkerTaskId, async () => { - const { worker } = await this.$.prisma.backgroundWorkerTask.findFirstOrThrow({ + const { worker } = await this.$.readOnlyPrisma.backgroundWorkerTask.findFirstOrThrow({ where: { id: backgroundWorkerTaskId }, select: { worker: { @@ -1923,6 +2025,65 @@ export class RunAttemptSystem { }; } + // PostgreSQL int4 max value (~24.85 days in milliseconds) + static readonly MAX_INT4 = 2_147_483_647; + + /** + * Calculates the updated usage values for a run by adding the attempt's usage to the current totals. + * This should be called under the runLock to ensure safe read-modify-write. + * Cost is only calculated for non-dev environments. + */ + #calculateUpdatedUsage({ + runId, + currentUsageDurationMs, + currentCostInCents, + attemptDurationMs, + machinePresetName, + environmentType, + }: { + runId: string; + currentUsageDurationMs: number; + currentCostInCents: number; + attemptDurationMs: number; + machinePresetName: string | null; + environmentType: RuntimeEnvironmentType; + }): { usageDurationMs: number; costInCents: number } { + let usageDurationMs = currentUsageDurationMs + attemptDurationMs; + + // Overflow protection: cap at PostgreSQL int4 max value + if (usageDurationMs > RunAttemptSystem.MAX_INT4) { + this.$.logger.error("usageDurationMs overflow detected, capping at max int4 value", { + runId, + currentUsageDurationMs, + attemptDurationMs, + calculatedTotal: usageDurationMs, + cappedAt: RunAttemptSystem.MAX_INT4, + }); + usageDurationMs = RunAttemptSystem.MAX_INT4; + } + + // Only calculate cost for non-dev environments + let costInCents = currentCostInCents; + if (environmentType !== "DEVELOPMENT") { + const machinePreset = machinePresetName + ? machinePresetFromName( + this.options.machines.machines, + machinePresetName as MachinePresetName + ) + : machinePresetFromName( + this.options.machines.machines, + this.options.machines.defaultMachine + ); + + costInCents = currentCostInCents + attemptDurationMs * machinePreset.centsPerMs; + } + + return { + usageDurationMs, + costInCents, + }; + } + } export function safeParseGitMeta(git: unknown): GitMeta | undefined { diff --git a/references/hello-world/src/trigger/usage.ts b/references/hello-world/src/trigger/usage.ts index caacbde2ac..cd5e8dca83 100644 --- a/references/hello-world/src/trigger/usage.ts +++ b/references/hello-world/src/trigger/usage.ts @@ -1,5 +1,6 @@ -import { logger, task, wait, usage } from "@trigger.dev/sdk"; +import { logger, task, wait, usage, runs } from "@trigger.dev/sdk"; import { setTimeout } from "timers/promises"; +import assert from "node:assert"; export const usageExampleTask = task({ id: "usage-example", @@ -33,3 +34,197 @@ export const usageExampleTask = task({ }; }, }); + +/** + * Test task to verify usage tracking works correctly after moving updates to run engine. + * + * Tests: + * 1. usageDurationMs accumulates across attempts (retries) + * 2. costInCents is only calculated for non-dev environments + * 3. Usage is tracked and returned correctly + * + * Run with: + * - { causeRetry: false } for simple usage test + * - { causeRetry: true } for retry accumulation test + */ +export const usageTrackingTestTask = task({ + id: "usage-tracking-test", + retry: { + maxAttempts: 3, + minTimeoutInMs: 100, + maxTimeoutInMs: 200, + factor: 1, + }, + run: async (payload: { causeRetry?: boolean; workDurationMs?: number }, { ctx }) => { + const workDuration = payload.workDurationMs ?? 2000; + const isDev = ctx.environment.type === "DEVELOPMENT"; + + logger.info("Starting usage tracking test", { + attemptNumber: ctx.attempt.number, + environmentType: ctx.environment.type, + isDev, + workDuration, + causeRetry: payload.causeRetry, + }); + + // Get usage at start of this attempt + const usageAtStart = usage.getCurrent(); + logger.info("Usage at attempt start", { + attemptNumber: ctx.attempt.number, + usageAtStart, + }); + + // Do some "work" to accumulate usage duration + const workStart = Date.now(); + await setTimeout(workDuration); + const actualWorkTime = Date.now() - workStart; + + // Get usage after work + const usageAfterWork = usage.getCurrent(); + logger.info("Usage after work", { + attemptNumber: ctx.attempt.number, + usageAfterWork, + actualWorkTimeMs: actualWorkTime, + attemptDurationDelta: + usageAfterWork.compute.attempt.durationMs - usageAtStart.compute.attempt.durationMs, + }); + + // Cause a retry on first attempt if requested + if (payload.causeRetry && ctx.attempt.number === 1) { + logger.info("Throwing error to cause retry - usage from this attempt should accumulate"); + throw new Error("Intentional error to test usage accumulation across retries"); + } + + // Log expectations for verification + logger.info("Usage tracking test completed", { + attemptNumber: ctx.attempt.number, + finalUsage: usageAfterWork, + environmentType: ctx.environment.type, + expectations: { + usageDurationMs: "Should be > 0 and reflect actual CPU time", + costInCents: isDev + ? "Should be 0 (dev environment - no cost tracking)" + : "Should be > 0 (calculated from usageDurationMs * machine centsPerMs)", + retryAccumulation: payload.causeRetry + ? "If this is attempt 2+, usageDurationMs should include time from previous attempts" + : "N/A - no retry", + }, + }); + + return { + success: true, + attemptNumber: ctx.attempt.number, + environmentType: ctx.environment.type, + finalUsage: usageAfterWork, + isDev, + note: isDev + ? "costInCents will be 0 in dev - deploy to staging/prod to test cost calculation" + : "costInCents should reflect accumulated usage cost", + }; + }, +}); + +/** + * Parent task that triggers usageTrackingTestTask and verifies the usage values + * are correctly stored in the database via runs.retrieve(). + * + * This tests the full flow: + * 1. Trigger child task and wait for completion + * 2. Use runs.retrieve() to fetch the run from the database + * 3. Verify usageDurationMs > 0 + * 4. Verify costInCents behavior based on environment type + */ +export const usageVerificationParentTask = task({ + id: "usage-verification-parent", + run: async (payload: { causeRetry?: boolean; workDurationMs?: number }, { ctx }) => { + const isDev = ctx.environment.type === "DEVELOPMENT"; + + logger.info("Starting usage verification parent task", { + environmentType: ctx.environment.type, + isDev, + causeRetry: payload.causeRetry, + workDurationMs: payload.workDurationMs, + }); + + // Trigger the child task and wait for it to complete + const childResult = await usageTrackingTestTask.triggerAndWait({ + causeRetry: payload.causeRetry, + workDurationMs: payload.workDurationMs ?? 2000, + }); + + if (!childResult.ok) { + throw new Error(`Child task failed: ${JSON.stringify(childResult.error)}`); + } + + logger.info("Child task completed", { + childRunId: childResult.id, + childOutput: childResult.output, + }); + + // Retrieve the run from the database to verify usage values were stored + const retrievedRun = await runs.retrieve(childResult.id); + + logger.info("Retrieved run from database", { + runId: retrievedRun.id, + status: retrievedRun.status, + durationMs: retrievedRun.durationMs, + costInCents: retrievedRun.costInCents, + baseCostInCents: retrievedRun.baseCostInCents, + }); + + // Verify usageDurationMs (durationMs in the API response) is greater than 0 + assert.ok( + retrievedRun.durationMs > 0, + `Expected durationMs > 0, got ${retrievedRun.durationMs}` + ); + + // For retry test, verify duration accumulated across attempts + if (payload.causeRetry) { + // With a retry, we should have at least 2x the work duration (attempt 1 + attempt 2) + const minExpectedDuration = (payload.workDurationMs ?? 2000) * 1.5; // Allow some variance + assert.ok( + retrievedRun.durationMs >= minExpectedDuration, + `Expected durationMs >= ${minExpectedDuration} for retry test (got ${retrievedRun.durationMs})` + ); + logger.info("✅ Retry accumulation verified - duration includes time from both attempts"); + } + + // Verify costInCents based on environment type + if (isDev) { + // In dev, cost should be 0 + assert.strictEqual( + retrievedRun.costInCents, + 0, + `Expected costInCents to be 0 in dev environment, got ${retrievedRun.costInCents}` + ); + logger.info("✅ Dev environment verified - costInCents is 0 as expected"); + } else { + // In non-dev, cost should be > 0 + assert.ok( + retrievedRun.costInCents > 0, + `Expected costInCents > 0 in ${ctx.environment.type} environment, got ${retrievedRun.costInCents}` + ); + logger.info( + `✅ ${ctx.environment.type} environment verified - costInCents is ${retrievedRun.costInCents}` + ); + } + + logger.info("✅ All usage verification checks passed!", { + durationMs: retrievedRun.durationMs, + costInCents: retrievedRun.costInCents, + baseCostInCents: retrievedRun.baseCostInCents, + environmentType: ctx.environment.type, + hadRetry: payload.causeRetry, + }); + + return { + success: true, + childRunId: childResult.id, + durationMs: retrievedRun.durationMs, + costInCents: retrievedRun.costInCents, + baseCostInCents: retrievedRun.baseCostInCents, + environmentType: ctx.environment.type, + hadRetry: payload.causeRetry ?? false, + }; + }, +});