Skip to content

feat(mollifier): trigger burst smoothing — Phase 1 (monitoring)#3614

Merged
d-cs merged 59 commits into
mainfrom
mollifier-phase-2
May 18, 2026
Merged

feat(mollifier): trigger burst smoothing — Phase 1 (monitoring)#3614
d-cs merged 59 commits into
mainfrom
mollifier-phase-2

Conversation

@d-cs
Copy link
Copy Markdown
Collaborator

@d-cs d-cs commented May 13, 2026

Summary

  • Introduce the Mollifier: a Redis-backed buffer for trigger() API calls during traffic spikes, with a per-env trip evaluator and a drainer ack-loop.
  • Phase 1 is dual-write monitoring — every mollified trigger is buffered to Redis AND continues to engine.trigger. No customer-facing behaviour change.
  • Telemetry events: mollifier.would_mollify, mollifier.buffered, mollifier.drained, plus the mollifier.decisions counter.
  • Gated behind a feature flag (default off).

Test plan

  • pnpm run test --filter @trigger.dev/redis-worker
  • pnpm run test --filter webapp -- mollifier
  • Manual: with flag off, no behaviour change vs main
  • Manual: with flag on + threshold lowered, observe mollifier.buffered + mollifier.drained log pairs with matching runId

@d-cs d-cs self-assigned this May 13, 2026
@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented May 13, 2026

🦋 Changeset detected

Latest commit: e26f6ed

The changes in this PR will be included in the next version bump.

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 13, 2026

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

This PR implements the first two phases of a trigger burst-smoothing system ("mollifier"): it adds a Redis-backed MollifierBuffer and MollifierDrainer, Zod schemas and payload (de)serialization, a real trip evaluator and mollifier gate wired into RunEngineTriggerTaskService, OpenTelemetry metrics, worker startup drainer wiring, environment configuration and feature flag, package re-exports, and comprehensive tests (unit, integration, and fuzz) validating buffer, drainer, gate, and evaluator behavior while keeping fail-open semantics and deferring full activation to later phases.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 9.52% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ⚠️ Warning The PR description is incomplete and does not follow the required template structure. Missing required sections and structured information. Add missing template sections: fill in issue reference, complete testing details, and add screenshots if applicable. Structure should match the provided template with all checkboxes and sections.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically summarizes the main change: introducing the Mollifier feature for trigger burst smoothing with Phase 1 focusing on monitoring/dual-write behavior.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch mollifier-phase-2

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot]

This comment was marked as resolved.

@d-cs
Copy link
Copy Markdown
Collaborator Author

d-cs commented May 14, 2026

Code review

Found 2 issues:

  1. resolveOrgFlag resolves the global feature flag, not a per-org flag. flag() is called without overrides, so it reads only the global FeatureFlag row — other call sites in the webapp (e.g. canAccessAi/canAccessQuery) explicitly pass the org's featureFlags JSON as overrides. With the current code, the variable named orgFlagEnabled and the call-site comment "if the org has the mollifier feature flag enabled" don't match behaviour: the moment the global flag flips on, every org is mollified — there's no per-org rollout.

isMollifierEnabled: () => env.MOLLIFIER_ENABLED === "1",
isShadowModeOn: () => env.MOLLIFIER_SHADOW_MODE === "1",
resolveOrgFlag: () =>
flag({ key: FEATURE_FLAG.mollifierEnabled, defaultValue: false }),
evaluator: defaultEvaluator,
logShadow: (inputs, decision) =>

  1. evaluateGate is awaited outside the try block in the trigger hot path. evaluateGate itself has no internal try/catch around resolveOrgFlag() (a DB feature-flag read) or evaluator() (a Redis Lua call), so any transient failure once MOLLIFIER_ENABLED=1 is flipped will throw past the call site and fail the customer's trigger — turning a monitoring-only Phase 1 into a fleet-wide trigger outage on a Redis/DB blip. Suggest wrapping the gate call in a try/catch that falls back to pass_through.

};
const mollifierOutcome = await this.evaluateGate({
envId: environment.id,
orgId: environment.organizationId,
taskId,
});
try {

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

@d-cs
Copy link
Copy Markdown
Collaborator Author

d-cs commented May 14, 2026

Nitpicks (test-mock removal) also addressed in 98c1520:

  • apps/webapp/test/mollifierGate.test.ts: dropped vi.fn. makeDeps now returns real closure-based dependencies with plain counter/array spies; the 16-row truth table and decision-log shape tests are unchanged.
  • apps/webapp/test/mollifierTripEvaluator.test.ts: removed the vi.fn/cast fakeBuffer doubles. Each test now uses redisTest from @internal/testcontainers to spin up Redis and exercise a real MollifierBuffer. The trip case uses threshold: 2 + three real evaluateTrip calls to deterministically trip via the production Lua window; the fail-open case closes the buffer up front so the next evaluateTrip hits Connection is closed. — a real failure mode rather than a stub.

coderabbitai[bot]

This comment was marked as resolved.

@d-cs
Copy link
Copy Markdown
Collaborator Author

d-cs commented May 14, 2026

Code review (follow-up)

Four additional issues from the earlier scan, posting now since the gate-naming fix landed:

  1. Webapp CLAUDE.md says "Always use findFirst instead of findUnique" — new test introduces a findUnique. Trivial swap.

expect(result).toBeDefined();
expect(result?.run.friendlyId).toBeDefined();
const pgRun = await prisma.taskRun.findUnique({ where: { id: result!.run.id } });
expect(pgRun).not.toBeNull();
expect(pgRun!.friendlyId).toBe(result!.run.friendlyId);

  1. Drainer is started inside init() unconditionally — workerQueue.initialize() two lines above is gated on WORKER_ENABLED === "true", but getMollifierDrainer() is not. Once MOLLIFIER_ENABLED=1 flips on, every webapp replica (including API-only ones) will spin up a polling loop + Redis connection and race for the same buffer. Consider gating on WORKER_ENABLED (or a dedicated MOLLIFIER_DRAINER_ENABLED).

if (env.WORKER_ENABLED === "true") {
await workerQueue.initialize();
}
try {
const drainer = getMollifierDrainer();
if (drainer) {
// The drainer owns a polling loop and a Redis client; let it drain
// in-flight pops on shutdown rather than tearing the process down
// mid-handler. Idempotent — `drainer.stop()` short-circuits if already
// stopped, so registering on both signals is safe.
const stopDrainer = () => {
drainer.stop().catch((error) => {
logger.error("Failed to stop mollifier drainer", { error });
});
};
process.once("SIGTERM", stopDrainer);
process.once("SIGINT", stopDrainer);
}
} catch (error) {
logger.error("Failed to initialise mollifier drainer", { error });
}
}

  1. includeTtl: true was added to the delayed and pending-version re-enqueue paths, but the producer-side comment in enqueueSystem.ts still explicitly warns these callers off: "Re-enqueues (waitpoint, checkpoint, delayed, pending version) must not add TTL." Either the comment needs updating to reflect the new design, or the new callers are wrong.

const timestamp = (run.queueTimestamp ?? run.createdAt).getTime() - run.priorityMs;
// Include TTL only when explicitly requested (first enqueue from trigger).
// Re-enqueues (waitpoint, checkpoint, delayed, pending version) must not add TTL.
let ttlExpiresAt: number | undefined;
if (includeTtl && run.ttl) {
const expireAt = parseNaturalLanguageDuration(run.ttl);

batchId: run.batchId ?? undefined,
skipRunLock: true,
includeTtl: true,
});

// for a worker version). Arm TTL here so the TTL system can expire it
// if it sits queued waiting on a concurrency slot.
includeTtl: true,
});
});

  1. The comment calls count a "sliding-window counter", but the Lua is INCR + PEXPIRE-on-first — a fixed window. At a window boundary this can briefly admit ~2x threshold before tripping. Additionally PSETEX trippedKey, holdMs rearms the hold TTL on every overage call, so under sustained overload the hold never elapses while traffic stays above threshold. Both may be intentional for Phase 1, but the docstring should match the implementation.

} from "./mollifierTelemetry.server";
// `count` is the *single-instance* sliding-window counter, not a fleet-wide
// aggregate. Each webapp instance maintains its own Redis key, so the fleet
// effective ceiling is `instance_count * threshold`. Phase 2 consumers must
// not treat `count` as a global rate.

numberOfKeys: 2,
lua: `
local rateKey = KEYS[1]
local trippedKey = KEYS[2]
local windowMs = tonumber(ARGV[1])
local threshold = tonumber(ARGV[2])
local holdMs = tonumber(ARGV[3])
local count = redis.call('INCR', rateKey)
if count == 1 then
redis.call('PEXPIRE', rateKey, windowMs)
end
if count > threshold then
redis.call('PSETEX', trippedKey, holdMs, '1')
end
local tripped = redis.call('EXISTS', trippedKey)
return {count, tripped}

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

@d-cs d-cs force-pushed the mollifier-phase-2 branch from b4b5719 to f43df01 Compare May 14, 2026 08:54
devin-ai-integration[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

@d-cs d-cs force-pushed the mollifier-phase-2 branch 2 times, most recently from 088e071 to 2a0d5ee Compare May 14, 2026 12:49
devin-ai-integration[bot]

This comment was marked as resolved.

@d-cs d-cs force-pushed the mollifier-phase-2 branch from e51fac9 to a9400b1 Compare May 14, 2026 14:09
devin-ai-integration[bot]

This comment was marked as resolved.

d-cs and others added 13 commits May 14, 2026 16:43
Redis-backed burst-smoothing layer behind MOLLIFIER_ENABLED=0 (default).
With the kill switch off, the gate short-circuits on its first env check
and production behaviour is identical to main.

@trigger.dev/redis-worker:
- MollifierBuffer: atomic Lua-backed FIFO with accept / pop / ack /
  requeue / fail + TTL. Per-env queues with HSET entry storage,
  atomic RPOP + status transition, FIFO retry ordering.
- MollifierDrainer: generic round-robin worker with concurrency cap,
  retry semantics, and a stop deadline to avoid livelock on a hung
  handler. Phase 3 will wire the handler to engine.trigger().
- Full testcontainers-backed test suite (21 tests).

apps/webapp:
- evaluateGate cascade-check (kill switch -> org feature flag ->
  shadow mode -> trip evaluator -> mollify / shadow_log / pass_through).
  Dependencies injected for testability; the trip evaluator stub
  returns { divert: false } in phase 1.
- Inserted into RunEngineTriggerTaskService.call() before
  traceEventConcern.traceRun. The mollify branch throws (unreachable
  in phase 1).
- Lazy MollifierBuffer + MollifierDrainer singletons; no Redis
  connection unless MOLLIFIER_ENABLED=1.
- 12 MOLLIFIER_* env vars (all safe defaults) and a mollifierEnabled
  feature flag in the global catalog.
- Drainer booted from worker.server.ts on first import.
- Read-fallback stub for phase 3.
- Gate cascade tests + .env loader so env.server validates in vitest
  workers.

Phase 2 will land the real trip evaluator; phase 3 will activate the
buffer-write + drain path.
…dual-write monitoring + drainer ack loop)

Phase 1 of the trigger-burst smoothing initiative. Adds the A-side trip
evaluator (atomic Lua sliding-window per env) and wires it into the trigger
hot path. When the per-org mollifierEnabled feature flag is on AND the
evaluator says divert, the canonical replay payload is buffered to Redis
(via buffer.accept) AND the trigger continues through engine.trigger —
i.e. dual-write. The drainer pops + acks (no-op handler) to prove the
dequeue mechanism works end-to-end. Operators audit by joining
mollifier.buffered (write) and mollifier.drained (consume) logs by runId.

Buffer primitives hardened:
- accept is idempotent on duplicate runId (Lua EXISTS guard)
- pop skips orphan queue references (entry HASH TTL'd while runId queued)
- fail no-ops on missing entry (no partial FAILED hash leak)
- mollifier:envs set pruned on draining pop, restored on requeue
- 16-row truth-table test enumerates the gate cascade
- BufferedTriggerPayload defines the canonical replay shape Phase 2 will
  use to invoke engine.trigger
- payload hash for audit-equivalence computed off the hot path (in the
  drainer) to avoid CPU during a spike

Regression tests in apps/webapp/test/engine/triggerTask.test.ts pin the
mollifier integration:
- validation throws BEFORE the gate runs (no orphan buffer write on
  rejected triggers)
- mollify dual-write happy path (Postgres + Redis both reflect the run)
- pass_through path does NOT call buffer.accept
- engine.trigger throwing AFTER buffer.accept leaves an orphan
  (documented behaviour — drainer auto-cleans; audit-trail surfaces it)
- idempotency-key match short-circuits BEFORE the gate is consulted
- debounce match produces an orphan (documented behaviour — Phase 2
  must lift handleDebounce upfront before buffer.accept)

Behaviour with MOLLIFIER_ENABLED=0 (default) is byte-identical to main.
With MOLLIFIER_ENABLED=1 and the flag off, only mollifier.would_mollify
logs fire (no buffer state). With the flag on, dual-write activates.

Includes two opt-in *.fuzz.test.ts suites (gated on FUZZ=1) that
randomise operation sequences against evaluateTrip and the drainer to
find timing edges. They are clearly marked TEMPORARY in their headers.
- changeset: drop "deferred" wording — phase-1 actively dual-writes + runs
  the drainer ack loop.
- worker.server.ts: wrap mollifier drainer init in try/catch + register
  SIGTERM/SIGINT handlers so the polling loop stops cleanly on shutdown.
- bufferedTriggerPayload: only serialise idempotencyKeyExpiresAt when an
  idempotencyKey is present (avoid impossible orphan-expiry payloads).
- mollifierTelemetry: narrow recordDecision reason to DecisionReason union
  to keep OTEL attribute cardinality bounded.
- mollifierGate: rename resolveOrgFlag → resolveFlag. The underlying
  FeatureFlag table is global by key, so the "org" prefix was misleading;
  per-org gating is out of scope for phase-1.
- tests: drop vi.fn mocks. mollifierGate now uses plain closure spies;
  mollifierTripEvaluator runs against a real MollifierBuffer backed by a
  redisTest container (closed client exercises the fail-open path).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…stacking

Worker.init() is called per request from entry.server.tsx, so the
process.once SIGTERM/SIGINT pair added in 98c1520 would stack a fresh
listener every request under dev hot-reload (process.once only removes
after firing). Gate registration on a process-global flag, matching the
existing __worker__ pattern.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tion.featureFlags

The mollifier gate's resolveOrgFlag was a global feature-flag lookup
named as if org-scoped. Phase-1 plan and design doc both intended
per-org gating; the implementation regressed because the global
flag() helper has no orgId parameter.

Adopt the existing per-org feature-flag pattern (used by canAccessAi,
canAccessPrivateConnections, compute beta gating): pass
`Organization.featureFlags` through as `flag()` overrides. Per-org
opt-in now works admin-toggleable via the existing
Organization.featureFlags JSON column — no schema migration needed.

- mollifierGate: revert resolveFlag/flagEnabled back to
  resolveOrgFlag/orgFlagEnabled (the name now matches reality).
  GateInputs gains `orgFeatureFlags`; the default resolver passes
  them as overrides to `flag()`.
- triggerTask.server.ts: thread `environment.organization.featureFlags`
  into the gate call.
- tests: three new postgresTest cases exercise the real DB-backed
  resolveOrgFlag end-to-end, proving (a) per-org opt-in isolation,
  (b) unrelated beta flags don't bleed across, (c) per-org overrides
  take precedence over the global FeatureFlag row.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…nect

The unit cascade tests in mollifierGate.test.ts import the gate module,
which transitively pulls in ~/db.server. That module constructs the
prisma singleton at import time and eagerly calls $connect(), which
fails against localhost:5432 in the unit-test shard and surfaces as an
unhandled rejection that fails the whole vitest run. Mocking the module
keeps the cascade tests pure and leaves the postgresTest cases on the
testcontainer-fixture prisma untouched.
- Gate drainer init on WORKER_ENABLED so only worker replicas run the polling loop.
- Update the enqueueSystem TTL comment now that delayed/pending-version are first enqueues.
- Correct the mollifier gate docstring to describe the fixed-window counter and tripped-key rearm.
- Swap findUnique for findFirst in the trigger task test to match the webapp Prisma rule.
…eFlags

The gate's `GateInputs` now requires `orgFeatureFlags`, but the surface type used by the trigger service was still the pre-org-scope shape, so the default evaluator wasn't assignable and the call site couldn't pass the flag overrides.
…est startup

The per-org isolation suite uses `postgresTest`, which spins up a fresh Postgres testcontainer per case. On CI the 5s vitest default regularly times out on container start before the test body runs. Match the 30s `vi.setConfig` used by other postgresTest suites in this app.
…rrors

resolveOrgFlag now checks the per-org Organization.featureFlags override
in-memory before falling back to the global flag() helper, so the common
per-org enablement path resolves without a Prisma round-trip on every
trigger call. evaluateGate also wraps the flag resolution in try/catch
and fails open to false on error, mirroring the trip evaluator.
…exit

Pass a configurable timeout to drainer.stop() so SIGTERM/SIGINT can't hang
forever if an in-flight handler is wedged. Matches the precedent set by
BATCH_TRIGGER_WORKER_SHUTDOWN_TIMEOUT_MS (default 30s).
processOneFromEnv now catches buffer.pop() failures so one env's hiccup
doesn't reject Promise.all and bubble up to the loop's outer catch. The
polling loop itself wraps each runOnce in try/catch and backs off with
capped exponential delay (up to 5s) instead of exiting permanently on the
first listEnvs/pop error. Stop semantics are unchanged: only the stopping
flag breaks the loop.

Adds two regression tests using a stub buffer (no Redis container) so
fault injection is deterministic.
The phase-1 scaffolding referenced MollifierBuffer, getMollifierBuffer,
and deserialiseMollifierSnapshot without importing them — CI typecheck
fails with TS2304. The runtime path is gated behind MOLLIFIER_ENABLED=0
so this never produced a runtime symptom, but the types must resolve.
@d-cs d-cs force-pushed the mollifier-phase-2 branch 2 times, most recently from 9483092 to 50868ff Compare May 15, 2026 16:22
devin-ai-integration[bot]

This comment was marked as resolved.

d-cs added 3 commits May 15, 2026 17:27
…oop wins the race

The Promise.race between this.loopPromise and this.delay(timeoutMs)
discarded the timeout's underlying setTimeout handle whenever the loop
branch won. The discarded timer was still ref'd by libuv and pinned the
Node event loop alive for the remainder of `timeoutMs` — exactly the
shutdown slack the timeout was supposed to bound.

Inline the timer in stop() with a captured handle and clearTimeout() it
in a finally block, so every exit path (loop-won, timeout-won, throw)
releases the ref. The in-loop delay() calls are unchanged — they're
awaited normally and their timers fire-and-clear themselves.
`process.once("SIGTERM", stopDrainer)` was the odd one out — every
other webapp service (runsReplicationInstance, llmPricingRegistry,
dynamicFlushScheduler, marqs, eventLoopMonitor) registers through
`signalsEmitter` from `~/services/signals.server`, an EventEmitter
backed by a single `process.on()` that fans out to all listeners.

Switching gets us:
  - codebase consistency;
  - `.on` (not `.once`) so a second SIGTERM, if the orchestrator emits
    one before SIGKILL, still reaches us;
  - if SIGTERM lands in the narrow gap between the listener attaching
    and drainer.start() below, the first invocation no-ops (stop()
    returns early because isRunning is false) but the listener stays
    attached for any subsequent signal, instead of being consumed and
    leaving the now-running drainer with no graceful-stop path.
This addition was applied while phase-2 was already in review and is
out of scope for the mollifier PR. The underlying clarification is
worth landing — just not on this branch.
devin-ai-integration[bot]

This comment was marked as resolved.

evaluateGate ran on every trigger regardless of TRIGGER_MOLLIFIER_ENABLED.
With the flag off (the default everywhere it hasn't been opted in), the
gate still produced a `pass_through` decision after allocating a
GateInputs object, spreading defaultGateDependencies inside evaluateGate,
and incrementing the `mollifier.decisions{outcome=pass_through}` OTel
counter. Cheap individually, but triggerTask is the hottest code path in
the system — multiply by trigger rate and the unnecessary work compounds.

Guard the gate call with a direct env.TRIGGER_MOLLIFIER_ENABLED check at
the call site. When the flag is off, mollifierOutcome is null and the
downstream `mollifierOutcome?.action === "mollify"` branch skips the
buffer dual-write entirely — zero allocation, zero counter increment on
the disabled path. When the flag is on, behaviour is unchanged.

Lost-signal note: with mollifier off, we no longer count "pass_through"
decisions in the OTel counter (the gate never runs). That's a non-issue
— "pass_through count when feature is off" is just total trigger rate,
which is already observable via the trigger handler's own spans/counters
upstream. The gate counter remains the source of truth for the
mollify/shadow/pass_through ratio when the feature is on, which is the
load-bearing signal.
devin-ai-integration[bot]

This comment was marked as resolved.

…DI hook

The previous commit added a perf short-circuit at the call site that
read `env.TRIGGER_MOLLIFIER_ENABLED` directly. That broke three
mollifier integration tests in CI: the tests inject a custom
`evaluateGate` via the existing DI seam expecting the buffer-write
branch to be reached, but CI has no `.env` (the `apps/webapp/.env`
symlink target is absent), the Zod default `"0"` wins, the call site
short-circuits to `null` before the injected gate runs, and
`buffer.accepted` stays empty.

Make the global-enabled check itself injectable:

  - New constructor opt `isMollifierGloballyEnabled?: () => boolean`,
    defaulting to `() => env.TRIGGER_MOLLIFIER_ENABLED === "1"`. Each
    DI hook now represents one decision (gate, buffer, global-enabled),
    so a test that wants the buffer-write branch reached can inject
    `isMollifierGloballyEnabled: () => true` alongside its custom gate.
  - Call site now reads `this.isMollifierGloballyEnabled()` instead of
    `env.TRIGGER_MOLLIFIER_ENABLED` directly. In production, with no DI
    override, the default closure resolves `env` exactly once per call
    just as before — same perf win when the flag is off.
  - All six mollifier DI injection sites in triggerTask.test.ts now also
    pass `isMollifierGloballyEnabled: () => true` so the tests' DI
    surface matches the new contract regardless of CI env state.
devin-ai-integration[bot]

This comment was marked as resolved.

d-cs and others added 2 commits May 18, 2026 09:24
The bootstrap in mollifierDrainerWorker.server.ts wrapped getMollifierDrainer()
in a try/catch that logged-and-continued on any error, which absorbed the two
designed-to-crash throws in initializeMollifierDrainer():

  - "MollifierDrainer initialised without a buffer" (missing buffer client)
  - "TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS must be at least ... below
    GRACEFUL_SHUTDOWN_TIMEOUT" (shutdown-timeout reconciliation)

Both are deploy-time mistakes: silently disabling the drainer means the
gate keeps writing to the buffer, the drainer never reads, and entries
TTL out in 10min. Bounded in phase 1 (monitoring-only) but customer-
visible data loss in phase 2/3 where the drainer replays into engine.trigger.
Better to fail loud now than retrofit the contract later.

Introduce MollifierConfigurationError for the two deterministic throws.
The bootstrap's catch now rethrows that class (process crashes at module
top-level → orchestrator health check fails → deploy rolls back) while
still logging-and-continuing on transient errors (Redis blip during init
shouldn't take the whole webapp down). instanceof + name fallback covers
the Remix dev hot-reload realm edge case.
devin-ai-integration[bot]

This comment was marked as resolved.

d-cs and others added 2 commits May 18, 2026 09:49
Adds the smallest DI surface to `initMollifierDrainerWorker` (`isEnabled`
and `getDrainer`, both optional, default to live env/singleton) so the
catch-block policy can be tested without manipulating module-level env:

  - rethrows MollifierConfigurationError — deterministic misconfig
    escapes, which is what makes the production-path crash on boot
    (the call site in entry.server.tsx runs sync at module top level,
    before `process.on("uncaughtException", ...)` is registered, so an
    escape becomes a Node default-handler exit-1).
  - rethrows when `name === "MollifierConfigurationError"` even when
    `instanceof` fails — covers the Remix dev hot-reload realm edge
    case where the catch holds a stale class reference.
  - swallows non-configuration errors — a transient Redis blip during
    buffer init shouldn't take the whole webapp down.
  - no-op when disabled — the factory isn't invoked when the enabled
    predicate returns false.

Also updates the existing mollifier server-changes note to: rename env
vars to TRIGGER_MOLLIFIER_* prefix, document the TRIGGER_MOLLIFIER_DRAINER_ENABLED
split for multi-replica drainer placement, and call out the new fail-loud
behaviour on drainer misconfiguration.
devin-ai-integration[bot]

This comment was marked as resolved.

d-cs and others added 3 commits May 18, 2026 11:41
…keep batch alive

If buffer.requeue() or buffer.fail() throws during error recovery inside
processEntry, the rejection used to escape processOneFromEnv and reject
runOnce's Promise.all — discarding handler results from sibling envs in
the same tick. Wrap processEntry in try/catch so the failed env is just
counted as "failed" for the tick, matching the invariant stated in the
processOneFromEnv comment.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…out test

Node's setTimeout can fire a millisecond or two early under CI load,
causing the existing `>= 500ms` lower bound to flake (saw 499ms in CI).
Loosen to `>= 450ms` — the behaviour being pinned is "stop honors the
deadline instead of waiting for the hung handler indefinitely", not
millisecond-precise timing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Shadow-mode and live-divert logs both fire on the trigger hot path;
rely on the mollifier.decisions OTel counter for production visibility.
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View 25 additional findings in Devin Review.

Open in Devin Review

Comment thread apps/webapp/app/v3/mollifier/mollifierGate.server.ts
Document why the evaluator writes Redis in both shadow-only and flag-on
modes: the trip threshold is computed from a counter, and a counter that
doesn't increment isn't a counter. Also note env↔org is 1:1 so the per-env
key is effectively per-org — no cross-org bleed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@d-cs d-cs requested a review from ericallam May 18, 2026 13:31
@d-cs d-cs merged commit 906d5fa into main May 18, 2026
36 checks passed
@d-cs d-cs deleted the mollifier-phase-2 branch May 18, 2026 13:58
ericallam pushed a commit that referenced this pull request May 21, 2026
## Summary
44 improvements, 1 bug fix.

## Improvements
- **AI Prompts** — define prompt templates as code alongside your tasks,
version them on deploy, and override the text or model from the
dashboard without redeploying. Prompts integrate with the Vercel AI SDK
via `toAISDKTelemetry()` (links every generation span back to the
prompt) and with `chat.agent` via `chat.prompt.set()` +
`chat.toStreamTextOptions()`.
([#3629](#3629))
- **Code-defined, deploy-versioned templates** — define with
`prompts.define({ id, model, config, variables, content })`. Every
deploy creates a new version visible in the dashboard. Mustache-style
placeholders (`{{var}}`, `{{#cond}}...{{/cond}}`) with Zod / ArkType /
Valibot-typed variables.
- **Dashboard overrides** — change a prompt's text or model from the
dashboard without redeploying. Overrides take priority over the deployed
"current" version and are environment-scoped (dev / staging / production
independent).
- **Resolve API** — `prompt.resolve(vars, { version?, label? })` returns
the compiled `text`, resolved `model`, `version`, and labels. Standalone
`prompts.resolve<typeof handle>(slug, vars)` for cross-file resolution
with full type inference on slug and variable shape.
- **AI SDK integration** — spread `resolved.toAISDKTelemetry({ ...extra
})` into any `generateText` / `streamText` call and every generation
span links to the prompt in the dashboard alongside its input variables,
model, tokens, and cost.
- **`chat.agent` integration** — `chat.prompt.set(resolved)` stores the
resolved prompt run-scoped; `chat.toStreamTextOptions({ registry })`
pulls `system`, `model` (resolved via the AI SDK provider registry),
`temperature` / `maxTokens` / etc., and telemetry into a single spread
for `streamText`.
- **Management SDK** — `prompts.list()`, `prompts.versions(slug)`,
`prompts.promote(slug, version)`, `prompts.createOverride(slug, body)`,
`prompts.updateOverride(slug, body)`, `prompts.removeOverride(slug)`,
`prompts.reactivateOverride(slug, version)`.
- **Dashboard** — prompts list with per-prompt usage sparklines;
per-prompt detail with Template / Details / Versions / Generations /
Metrics tabs. AI generation spans get a custom inspector showing the
linked prompt's metadata, input variables, and template content
alongside model, tokens, cost, and the message thread.
- Adds `onBoot` to `chat.agent` — a lifecycle hook that fires once per
worker process picking up the chat. Runs for the initial run, preloaded
runs, AND reactive continuation runs (post-cancel, crash, `endRun`,
`requestUpgrade`, OOM retry), before any other hook. Use it to
initialize `chat.local`, open per-process resources, or re-hydrate state
from your DB on continuation — anywhere the SAME run picking up after
suspend/resume isn't enough.
([#3543](#3543))
- **AI SDK `useChat` integration** — a custom
[`ChatTransport`](https://sdk.vercel.ai/docs/ai-sdk-ui/transport)
(`useTriggerChatTransport`) plugs straight into Vercel AI SDK's
`useChat` hook. Text streaming, tool calls, reasoning, and `data-*`
parts all work natively over Trigger.dev's realtime streams. No custom
API routes needed.
- **First-turn fast path (`chat.headStart`)** — opt-in handler that runs
the first turn's `streamText` step in your warm server process while the
agent run boots in parallel, cutting cold-start TTFC by roughly half
(measured 2801ms → 1218ms on `claude-sonnet-4-6`). The agent owns step
2+ (tool execution, persistence, hooks) so heavy deps stay where they
belong. Web Fetch handler works natively in Next.js, Hono, SvelteKit,
Remix, Workers, etc.; bridge to Express/Fastify/Koa via
`chat.toNodeListener`. New `@trigger.dev/sdk/chat-server` subpath.
- **Multi-turn durability via Sessions** — every chat is backed by a
durable Session that outlives any individual run. Conversations resume
across page refreshes, idle timeout, crashes, and deploys; `resume:
true` reconnects via `lastEventId` so clients only see new chunks.
`sessions.list` enumerates chats for inbox-style UIs.
- **Auto-accumulated history, delta-only wire** — the backend
accumulates the full conversation across turns; clients only ship the
new message each turn. Long chats never hit the 512 KiB body cap.
Register `hydrateMessages` to be the source of truth yourself.
- **Lifecycle hooks** — `onPreload`, `onChatStart`,
`onValidateMessages`, `hydrateMessages`, `onTurnStart`,
`onBeforeTurnComplete`, `onTurnComplete`, `onChatSuspend`,
`onChatResume` — for persistence, validation, and post-turn work.
- **Stop generation** — client-driven `transport.stopGeneration(chatId)`
aborts mid-stream; the run stays alive for the next message, partial
response is captured, and aborted parts (stuck `partial-call` tools,
in-progress reasoning) are auto-cleaned.
- **Tool approvals (HITL)** — tools with `needsApproval: true` pause
until the user approves or denies via `addToolApprovalResponse`. The
runtime reconciles the updated assistant message by ID and continues
`streamText`.
- **Steering and background injection** — `pendingMessages` injects user
messages between tool-call steps so users can steer the agent
mid-execution; `chat.inject()` + `chat.defer()` adds context from
background work (self-review, RAG, safety checks) between turns.
- **Actions** — non-turn frontend commands (undo, rollback, regenerate,
edit) sent via `transport.sendAction`. Fire `hydrateMessages` +
`onAction` only — no turn hooks, no `run()`. `onAction` can return a
`StreamTextResult` for a model response, or `void` for side-effect-only.
- **Typed state primitives** — `chat.local<T>` for per-run state
accessible from hooks, `run()`, tools, and subtasks (auto-serialized
through `ai.toolExecute`); `chat.store` for typed shared data between
agent and client; `chat.history` for reading and mutating the message
chain; `clientDataSchema` for typed `clientData` in every hook.
- **`chat.toStreamTextOptions()`** — one spread into `streamText` wires
up versioned system [Prompts](https://trigger.dev/docs/ai/prompts),
model resolution, telemetry metadata, compaction, steering, and
background injection.
- **Multi-tab coordination** — `multiTab: true` + `useMultiTabChat`
prevents duplicate sends and syncs state across browser tabs via
`BroadcastChannel`. Non-active tabs go read-only with live updates.
- **Network resilience** — built-in indefinite retry with bounded
backoff, reconnect on `online` / tab refocus / bfcache restore,
`Last-Event-ID` mid-stream resume. No app code needed.
- **Sessions** — a durable, run-aware stream channel keyed on a stable
`externalId`. A Session is the unit of state that owns a multi-run
conversation: messages flow through `.in`, responses through `.out`,
both survive run boundaries. Sessions back the new `chat.agent` runtime,
and you can build on them directly for any pattern that needs durable
bi-directional streaming across runs.
([#3542](#3542))
- Add `ai.toolExecute(task)` so you can wire a Trigger subtask in as the
`execute` handler of an AI SDK `tool()` while defining `description` and
`inputSchema` yourself — useful when you want full control over the tool
surface and just need Trigger's subtask machinery for the body.
([#3546](#3546))
- Type `chat.createStartSessionAction` against your chat agent so
`clientData` is typed end-to-end on the first turn:
([#3684](#3684))
- Add `region` to the runs list / retrieve API: filter runs by region
(`runs.list({ region: "..." })` / `filter[region]=<masterQueue>`) and
read each run's executing region from the new `region` field on the
response.
([#3612](#3612))
- Add `TRIGGER_BUILD_SKIP_REWRITE_TIMESTAMP=1` escape hatch for local
self-hosted builds whose buildx driver doesn't support
`rewrite-timestamp` alongside push (e.g. orbstack's default `docker`
driver).
([#3618](#3618))
- Reject overlong `idempotencyKey` values at the API boundary so they no
longer trip an internal size limit on the underlying unique index and
surface as a generic 500. Inputs are capped at 2048 characters — well
above what `idempotencyKeys.create()` produces (a 64-character hash) and
above any realistic raw key. Applies to `tasks.trigger`,
`tasks.batchTrigger`, `batch.create` (Phase 1 streaming batches),
`wait.createToken`, `wait.forDuration`, and the input/session stream
waitpoint endpoints. Over-limit requests now return a structured 400
instead.
([#3560](#3560))
- **AI SDK `useChat` integration** — a custom
[`ChatTransport`](https://sdk.vercel.ai/docs/ai-sdk-ui/transport)
(`useTriggerChatTransport`) plugs straight into Vercel AI SDK's
`useChat` hook. Text streaming, tool calls, reasoning, and `data-*`
parts all work natively over Trigger.dev's realtime streams. No custom
API routes needed.
- **First-turn fast path (`chat.headStart`)** — opt-in handler that runs
the first turn's `streamText` step in your warm server process while the
agent run boots in parallel, cutting cold-start TTFC by roughly half
(measured 2801ms → 1218ms on `claude-sonnet-4-6`). The agent owns step
2+ (tool execution, persistence, hooks) so heavy deps stay where they
belong. Web Fetch handler works natively in Next.js, Hono, SvelteKit,
Remix, Workers, etc.; bridge to Express/Fastify/Koa via
`chat.toNodeListener`. New `@trigger.dev/sdk/chat-server` subpath.
- **Multi-turn durability via Sessions** — every chat is backed by a
durable Session that outlives any individual run. Conversations resume
across page refreshes, idle timeout, crashes, and deploys; `resume:
true` reconnects via `lastEventId` so clients only see new chunks.
`sessions.list` enumerates chats for inbox-style UIs.
- **Auto-accumulated history, delta-only wire** — the backend
accumulates the full conversation across turns; clients only ship the
new message each turn. Long chats never hit the 512 KiB body cap.
Register `hydrateMessages` to be the source of truth yourself.
- **Lifecycle hooks** — `onPreload`, `onChatStart`,
`onValidateMessages`, `hydrateMessages`, `onTurnStart`,
`onBeforeTurnComplete`, `onTurnComplete`, `onChatSuspend`,
`onChatResume` — for persistence, validation, and post-turn work.
- **Stop generation** — client-driven `transport.stopGeneration(chatId)`
aborts mid-stream; the run stays alive for the next message, partial
response is captured, and aborted parts (stuck `partial-call` tools,
in-progress reasoning) are auto-cleaned.
- **Tool approvals (HITL)** — tools with `needsApproval: true` pause
until the user approves or denies via `addToolApprovalResponse`. The
runtime reconciles the updated assistant message by ID and continues
`streamText`.
- **Steering and background injection** — `pendingMessages` injects user
messages between tool-call steps so users can steer the agent
mid-execution; `chat.inject()` + `chat.defer()` adds context from
background work (self-review, RAG, safety checks) between turns.
- **Actions** — non-turn frontend commands (undo, rollback, regenerate,
edit) sent via `transport.sendAction`. Fire `hydrateMessages` +
`onAction` only — no turn hooks, no `run()`. `onAction` can return a
`StreamTextResult` for a model response, or `void` for side-effect-only.
- **Typed state primitives** — `chat.local<T>` for per-run state
accessible from hooks, `run()`, tools, and subtasks (auto-serialized
through `ai.toolExecute`); `chat.store` for typed shared data between
agent and client; `chat.history` for reading and mutating the message
chain; `clientDataSchema` for typed `clientData` in every hook.
- **`chat.toStreamTextOptions()`** — one spread into `streamText` wires
up versioned system [Prompts](https://trigger.dev/docs/ai/prompts),
model resolution, telemetry metadata, compaction, steering, and
background injection.
- **Multi-tab coordination** — `multiTab: true` + `useMultiTabChat`
prevents duplicate sends and syncs state across browser tabs via
`BroadcastChannel`. Non-active tabs go read-only with live updates.
- **Network resilience** — built-in indefinite retry with bounded
backoff, reconnect on `online` / tab refocus / bfcache restore,
`Last-Event-ID` mid-stream resume. No app code needed.
- Retry `TASK_PROCESS_SIGSEGV` task crashes under the user's retry
policy instead of failing the run on the first segfault. SIGSEGV in Node
tasks is frequently non-deterministic (native addon races, JIT/GC
interaction, near-OOM in native code, host issues), so retrying on a
fresh process often succeeds. The retry is gated by the task's existing
`retry` config + `maxAttempts` — same path `TASK_PROCESS_SIGTERM` and
uncaught exceptions already use — so tasks without a retry policy still
fail fast.
([#3552](#3552))
- The public interfaces for a plugin system. Initially consolidated
authentication and authorization interfaces.
([#3499](#3499))
- Add MollifierBuffer and MollifierDrainer primitives for trigger burst
smoothing.
([#3614](#3614))

## Bug fixes
- Fix `LocalsKey<T>` type incompatibility across dual-package builds.
The phantom value-type brand no longer uses a module-level `unique
symbol`, so a single TypeScript compilation that resolves the type from
both the ESM and CJS outputs (which can happen under certain pnpm
hoisting layouts) no longer sees two structurally-incompatible variants
of the same type.
([#3626](#3626))

<details>
<summary>Raw changeset output</summary>

⚠️⚠️⚠️⚠️⚠️⚠️

`main` is currently in **pre mode** so this branch has prereleases
rather than normal releases. If you want to exit prereleases, run
`changeset pre exit` on `main`.

⚠️⚠️⚠️⚠️⚠️⚠️

# Releases
## @trigger.dev/sdk@4.5.0-rc.0

### Minor Changes

- **AI Prompts** — define prompt templates as code alongside your tasks,
version them on deploy, and override the text or model from the
dashboard without redeploying. Prompts integrate with the Vercel AI SDK
via `toAISDKTelemetry()` (links every generation span back to the
prompt) and with `chat.agent` via `chat.prompt.set()` +
`chat.toStreamTextOptions()`.
([#3629](#3629))

    ```ts
    import { prompts } from "@trigger.dev/sdk";
    import { generateText } from "ai";
    import { openai } from "@ai-sdk/openai";
    import { z } from "zod";

    export const supportPrompt = prompts.define({
      id: "customer-support",
      model: "gpt-4o",
      config: { temperature: 0.7 },
      variables: z.object({
        customerName: z.string(),
        plan: z.string(),
        issue: z.string(),
      }),
      content: `You are a support agent for Acme.

    Customer: {{customerName}} ({{plan}} plan)
    Issue: {{issue}}`,
    });

    const resolved = await supportPrompt.resolve({
      customerName: "Alice",
      plan: "Pro",
      issue: "Can't access billing",
    });

    const result = await generateText({
      model: openai(resolved.model ?? "gpt-4o"),
      system: resolved.text,
      prompt: "Can't access billing",
      ...resolved.toAISDKTelemetry(),
    });
    ```

    **What you get:**

- **Code-defined, deploy-versioned templates** — define with
`prompts.define({ id, model, config, variables, content })`. Every
deploy creates a new version visible in the dashboard. Mustache-style
placeholders (`{{var}}`, `{{#cond}}...{{/cond}}`) with Zod / ArkType /
Valibot-typed variables.
- **Dashboard overrides** — change a prompt's text or model from the
dashboard without redeploying. Overrides take priority over the deployed
"current" version and are environment-scoped (dev / staging / production
independent).
- **Resolve API** — `prompt.resolve(vars, { version?, label? })` returns
the compiled `text`, resolved `model`, `version`, and labels. Standalone
`prompts.resolve<typeof handle>(slug, vars)` for cross-file resolution
with full type inference on slug and variable shape.
- **AI SDK integration** — spread `resolved.toAISDKTelemetry({ ...extra
})` into any `generateText` / `streamText` call and every generation
span links to the prompt in the dashboard alongside its input variables,
model, tokens, and cost.
- **`chat.agent` integration** — `chat.prompt.set(resolved)` stores the
resolved prompt run-scoped; `chat.toStreamTextOptions({ registry })`
pulls `system`, `model` (resolved via the AI SDK provider registry),
`temperature` / `maxTokens` / etc., and telemetry into a single spread
for `streamText`.
- **Management SDK** — `prompts.list()`, `prompts.versions(slug)`,
`prompts.promote(slug, version)`, `prompts.createOverride(slug, body)`,
`prompts.updateOverride(slug, body)`, `prompts.removeOverride(slug)`,
`prompts.reactivateOverride(slug, version)`.
- **Dashboard** — prompts list with per-prompt usage sparklines;
per-prompt detail with Template / Details / Versions / Generations /
Metrics tabs. AI generation spans get a custom inspector showing the
linked prompt's metadata, input variables, and template content
alongside model, tokens, cost, and the message thread.

See [/docs/ai/prompts](https://trigger.dev/docs/ai/prompts) for the full
reference — template syntax, version resolution order, override
workflow, and type utilities (`PromptHandle`, `PromptIdentifier`,
`PromptVariables`).

- Adds `onBoot` to `chat.agent` — a lifecycle hook that fires once per
worker process picking up the chat. Runs for the initial run, preloaded
runs, AND reactive continuation runs (post-cancel, crash, `endRun`,
`requestUpgrade`, OOM retry), before any other hook. Use it to
initialize `chat.local`, open per-process resources, or re-hydrate state
from your DB on continuation — anywhere the SAME run picking up after
suspend/resume isn't enough.
([#3543](#3543))

    ```ts
const userContext = chat.local<{ name: string; plan: string }>({ id:
"userContext" });

    export const myChat = chat.agent({
      id: "my-chat",
      onBoot: async ({ clientData, continuation }) => {
const user = await db.user.findUnique({ where: { id: clientData.userId }
});
        userContext.init({ name: user.name, plan: user.plan });
      },
      run: async ({ messages, signal }) =>
streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }),
    });
    ```

Use `onBoot` (not `onChatStart`) for state setup that must run every
time a worker picks up the chat — `onChatStart` fires once per chat and
won't run on continuation, leaving `chat.local` uninitialized when
`run()` tries to use it.

- **AI Agents** — run AI SDK chat completions as durable Trigger.dev
agents instead of fragile API routes. Define an agent in one function,
point `useChat` at it from React, and the conversation survives page
refreshes, network blips, and process restarts.
([#3543](#3543))

    ```ts
    import { chat } from "@trigger.dev/sdk/ai";
    import { streamText } from "ai";
    import { openai } from "@ai-sdk/openai";

    export const myChat = chat.agent({
      id: "my-chat",
      run: async ({ messages, signal }) =>
streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }),
    });
    ```

    ```tsx
    import { useChat } from "@ai-sdk/react";
import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react";

const transport = useTriggerChatTransport({ task: "my-chat",
accessToken, startSession });
    const { messages, sendMessage } = useChat({ transport });
    ```

    **What you get:**

- **AI SDK `useChat` integration** — a custom
[`ChatTransport`](https://sdk.vercel.ai/docs/ai-sdk-ui/transport)
(`useTriggerChatTransport`) plugs straight into Vercel AI SDK's
`useChat` hook. Text streaming, tool calls, reasoning, and `data-*`
parts all work natively over Trigger.dev's realtime streams. No custom
API routes needed.
- **First-turn fast path (`chat.headStart`)** — opt-in handler that runs
the first turn's `streamText` step in your warm server process while the
agent run boots in parallel, cutting cold-start TTFC by roughly half
(measured 2801ms → 1218ms on `claude-sonnet-4-6`). The agent owns step
2+ (tool execution, persistence, hooks) so heavy deps stay where they
belong. Web Fetch handler works natively in Next.js, Hono, SvelteKit,
Remix, Workers, etc.; bridge to Express/Fastify/Koa via
`chat.toNodeListener`. New `@trigger.dev/sdk/chat-server` subpath.
- **Multi-turn durability via Sessions** — every chat is backed by a
durable Session that outlives any individual run. Conversations resume
across page refreshes, idle timeout, crashes, and deploys; `resume:
true` reconnects via `lastEventId` so clients only see new chunks.
`sessions.list` enumerates chats for inbox-style UIs.
- **Auto-accumulated history, delta-only wire** — the backend
accumulates the full conversation across turns; clients only ship the
new message each turn. Long chats never hit the 512 KiB body cap.
Register `hydrateMessages` to be the source of truth yourself.
- **Lifecycle hooks** — `onPreload`, `onChatStart`,
`onValidateMessages`, `hydrateMessages`, `onTurnStart`,
`onBeforeTurnComplete`, `onTurnComplete`, `onChatSuspend`,
`onChatResume` — for persistence, validation, and post-turn work.
- **Stop generation** — client-driven `transport.stopGeneration(chatId)`
aborts mid-stream; the run stays alive for the next message, partial
response is captured, and aborted parts (stuck `partial-call` tools,
in-progress reasoning) are auto-cleaned.
- **Tool approvals (HITL)** — tools with `needsApproval: true` pause
until the user approves or denies via `addToolApprovalResponse`. The
runtime reconciles the updated assistant message by ID and continues
`streamText`.
- **Steering and background injection** — `pendingMessages` injects user
messages between tool-call steps so users can steer the agent
mid-execution; `chat.inject()` + `chat.defer()` adds context from
background work (self-review, RAG, safety checks) between turns.
- **Actions** — non-turn frontend commands (undo, rollback, regenerate,
edit) sent via `transport.sendAction`. Fire `hydrateMessages` +
`onAction` only — no turn hooks, no `run()`. `onAction` can return a
`StreamTextResult` for a model response, or `void` for side-effect-only.
- **Typed state primitives** — `chat.local<T>` for per-run state
accessible from hooks, `run()`, tools, and subtasks (auto-serialized
through `ai.toolExecute`); `chat.store` for typed shared data between
agent and client; `chat.history` for reading and mutating the message
chain; `clientDataSchema` for typed `clientData` in every hook.
- **`chat.toStreamTextOptions()`** — one spread into `streamText` wires
up versioned system [Prompts](https://trigger.dev/docs/ai/prompts),
model resolution, telemetry metadata, compaction, steering, and
background injection.
- **Multi-tab coordination** — `multiTab: true` + `useMultiTabChat`
prevents duplicate sends and syncs state across browser tabs via
`BroadcastChannel`. Non-active tabs go read-only with live updates.
- **Network resilience** — built-in indefinite retry with bounded
backoff, reconnect on `online` / tab refocus / bfcache restore,
`Last-Event-ID` mid-stream resume. No app code needed.

See [/docs/ai-chat](https://trigger.dev/docs/ai-chat/overview) for the
full surface — quick start, three backend approaches (`chat.agent`,
`chat.createSession`, raw task), persistence and code-sandbox patterns,
type-level guides, and API reference.

- Add read primitives to `chat.history` for HITL flows:
`getPendingToolCalls()`, `getResolvedToolCalls()`,
`extractNewToolResults(message)`, `getChain()`, and
`findMessage(messageId)`. These lift the accumulator-walking logic that
customers building human-in-the-loop tools were re-implementing into the
SDK. ([#3543](#3543))

Use `getPendingToolCalls()` to gate fresh user turns while a tool call
is awaiting an answer. Use `extractNewToolResults(message)` to dedup
tool results when persisting to your own store — the helper returns only
the parts whose `toolCallId` is not already resolved on the chain.

    ```ts
    const pending = chat.history.getPendingToolCalls();
    if (pending.length > 0) {
      // an addToolOutput is expected before a new user message
    }

    onTurnComplete: async ({ responseMessage }) => {
const newResults = chat.history.extractNewToolResults(responseMessage);
      for (const r of newResults) {
await db.toolResults.upsert({ id: r.toolCallId, output: r.output,
errorText: r.errorText });
      }
    };
    ```

- **Sessions** — a durable, run-aware stream channel keyed on a stable
`externalId`. A Session is the unit of state that owns a multi-run
conversation: messages flow through `.in`, responses through `.out`,
both survive run boundaries. Sessions back the new `chat.agent` runtime,
and you can build on them directly for any pattern that needs durable
bi-directional streaming across runs.
([#3542](#3542))

    ```ts
    import { sessions, tasks } from "@trigger.dev/sdk";

    // Trigger a task and subscribe to its session output in one call
const { runId, stream } = await tasks.triggerAndSubscribe("my-task",
payload, {
      externalId: "user-456",
    });

    for await (const chunk of stream) {
      // ...
    }

// Enumerate existing sessions (powers inbox-style UIs without a
separate index)
for await (const s of sessions.list({ type: "chat.agent", tag:
"user:user-456" })) {
      console.log(s.id, s.externalId, s.createdAt, s.closedAt);
    }
    ```

See [/docs/ai-chat/overview](https://trigger.dev/docs/ai-chat/overview)
for the full surface — Sessions powers the durable, resumable chat
runtime described there.

### Patch Changes

- Add Agent Skills for `chat.agent`. Drop a folder with a `SKILL.md` and
any helper scripts/references next to your task code, register it with
`skills.define({ id, path })`, and the CLI bundles it into the deploy
image automatically — no `trigger.config.ts` changes. The agent gets a
one-line summary in its system prompt and discovers full instructions on
demand via `loadSkill`, with `bash` and `readFile` tools scoped
per-skill (path-traversal guards, output caps, abort-signal
propagation).
([#3543](#3543))

    ```ts
const pdfSkill = skills.define({ id: "pdf-extract", path:
"./skills/pdf-extract" });

    chat.skills.set([await pdfSkill.local()]);
    ```

Built on the [AI SDK cookbook
pattern](https://ai-sdk.dev/cookbook/guides/agent-skills) — portable
across providers. SDK + CLI only for now; dashboard-editable `SKILL.md`
text is on the roadmap.

- Add `ai.toolExecute(task)` so you can wire a Trigger subtask in as the
`execute` handler of an AI SDK `tool()` while defining `description` and
`inputSchema` yourself — useful when you want full control over the tool
surface and just need Trigger's subtask machinery for the body.
([#3546](#3546))

    ```ts
    const myTool = tool({
      description: "...",
      inputSchema: z.object({ ... }),
      execute: ai.toolExecute(mySubtask),
    });
    ```

`ai.tool(task)` (`toolFromTask`) keeps doing the all-in-one wrap and now
aligns its return type with AI SDK's `ToolSet`. Minimum `ai` peer raised
to `^6.0.116` to avoid cross-version `ToolSet` mismatches in monorepos.

- Stamp `gen_ai.conversation.id` (the chat id) on every span and metric
emitted from inside a `chat.task` or `chat.agent` run. Lets you filter
dashboard spans, runs, and metrics by the chat conversation that
produced them — independent of the run boundary, so multi-run chats
correlate cleanly. No code changes required on the user side.
([#3543](#3543))

- Type `chat.createStartSessionAction` against your chat agent so
`clientData` is typed end-to-end on the first turn:
([#3684](#3684))

    ```ts
    import { chat } from "@trigger.dev/sdk/ai";
    import type { myChat } from "@/trigger/chat";

export const startChatSession = chat.createStartSessionAction<typeof
myChat>("my-chat");

// In the browser, threaded from the transport's typed startSession
callback:
    const transport = useTriggerChatTransport<typeof myChat>({
      task: "my-chat",
startSession: ({ chatId, clientData }) => startChatSession({ chatId,
clientData }),
      // ...
    });
    ```

`ChatStartSessionParams` gains a typed `clientData` field — folded into
the first run's `payload.metadata` so `onPreload` / `onChatStart` see
the same shape per-turn `metadata` carries via the transport. The opaque
session-level `metadata` field is unchanged.

- Unit-test `chat.agent` definitions offline with `mockChatAgent` from
`@trigger.dev/sdk/ai/test`. Drives a real agent's turn loop in-process —
no network, no task runtime — so you can send messages, actions, and
stop signals via driver methods, inspect captured output chunks, and
verify hooks fire. Pairs with `MockLanguageModelV3` from `ai/test` for
model mocking. `setupLocals` lets you pre-seed `locals` (DB clients,
service stubs) before `run()` starts.
([#3543](#3543))

The broader `runInMockTaskContext` harness it's built on lives at
`@trigger.dev/core/v3/test` — useful for unit-testing any task code, not
just chat.

- Add `region` to the runs list / retrieve API: filter runs by region
(`runs.list({ region: "..." })` / `filter[region]=<masterQueue>`) and
read each run's executing region from the new `region` field on the
response.
([#3612](#3612))

-   Updated dependencies:
    -   `@trigger.dev/core@4.5.0-rc.0`

## @trigger.dev/build@4.5.0-rc.0

### Patch Changes

- Add Agent Skills for `chat.agent`. Drop a folder with a `SKILL.md` and
any helper scripts/references next to your task code, register it with
`skills.define({ id, path })`, and the CLI bundles it into the deploy
image automatically — no `trigger.config.ts` changes. The agent gets a
one-line summary in its system prompt and discovers full instructions on
demand via `loadSkill`, with `bash` and `readFile` tools scoped
per-skill (path-traversal guards, output caps, abort-signal
propagation).
([#3543](#3543))

    ```ts
const pdfSkill = skills.define({ id: "pdf-extract", path:
"./skills/pdf-extract" });

    chat.skills.set([await pdfSkill.local()]);
    ```

Built on the [AI SDK cookbook
pattern](https://ai-sdk.dev/cookbook/guides/agent-skills) — portable
across providers. SDK + CLI only for now; dashboard-editable `SKILL.md`
text is on the roadmap.

-   Updated dependencies:
    -   `@trigger.dev/core@4.5.0-rc.0`

## trigger.dev@4.5.0-rc.0

### Patch Changes

- Add Agent Skills for `chat.agent`. Drop a folder with a `SKILL.md` and
any helper scripts/references next to your task code, register it with
`skills.define({ id, path })`, and the CLI bundles it into the deploy
image automatically — no `trigger.config.ts` changes. The agent gets a
one-line summary in its system prompt and discovers full instructions on
demand via `loadSkill`, with `bash` and `readFile` tools scoped
per-skill (path-traversal guards, output caps, abort-signal
propagation).
([#3543](#3543))

    ```ts
const pdfSkill = skills.define({ id: "pdf-extract", path:
"./skills/pdf-extract" });

    chat.skills.set([await pdfSkill.local()]);
    ```

Built on the [AI SDK cookbook
pattern](https://ai-sdk.dev/cookbook/guides/agent-skills) — portable
across providers. SDK + CLI only for now; dashboard-editable `SKILL.md`
text is on the roadmap.

- Add `TRIGGER_BUILD_SKIP_REWRITE_TIMESTAMP=1` escape hatch for local
self-hosted builds whose buildx driver doesn't support
`rewrite-timestamp` alongside push (e.g. orbstack's default `docker`
driver).
([#3618](#3618))

- The CLI MCP server's agent-chat tools (`start_agent_chat`,
`send_agent_message`, `close_agent_chat`) now run on the new Sessions
primitive, so AI assistants driving a `chat.agent` get the same
idempotent-by-`chatId`, durable-across-runs behavior the browser
transport gets. Required PAT scopes go from `write:inputStreams` to
`read:sessions` + `write:sessions`.
([#3546](#3546))

- MCP `list_runs` tool: add a `region` filter input and surface each
run's executing region in the formatted summary.
([#3612](#3612))

-   Updated dependencies:
    -   `@trigger.dev/core@4.5.0-rc.0`
    -   `@trigger.dev/build@4.5.0-rc.0`
    -   `@trigger.dev/schema-to-json@4.5.0-rc.0`

## @trigger.dev/core@4.5.0-rc.0

### Patch Changes

- Add Agent Skills for `chat.agent`. Drop a folder with a `SKILL.md` and
any helper scripts/references next to your task code, register it with
`skills.define({ id, path })`, and the CLI bundles it into the deploy
image automatically — no `trigger.config.ts` changes. The agent gets a
one-line summary in its system prompt and discovers full instructions on
demand via `loadSkill`, with `bash` and `readFile` tools scoped
per-skill (path-traversal guards, output caps, abort-signal
propagation).
([#3543](#3543))

    ```ts
const pdfSkill = skills.define({ id: "pdf-extract", path:
"./skills/pdf-extract" });

    chat.skills.set([await pdfSkill.local()]);
    ```

Built on the [AI SDK cookbook
pattern](https://ai-sdk.dev/cookbook/guides/agent-skills) — portable
across providers. SDK + CLI only for now; dashboard-editable `SKILL.md`
text is on the roadmap.

- Reject overlong `idempotencyKey` values at the API boundary so they no
longer trip an internal size limit on the underlying unique index and
surface as a generic 500. Inputs are capped at 2048 characters — well
above what `idempotencyKeys.create()` produces (a 64-character hash) and
above any realistic raw key. Applies to `tasks.trigger`,
`tasks.batchTrigger`, `batch.create` (Phase 1 streaming batches),
`wait.createToken`, `wait.forDuration`, and the input/session stream
waitpoint endpoints. Over-limit requests now return a structured 400
instead.
([#3560](#3560))

- **AI Agents** — run AI SDK chat completions as durable Trigger.dev
agents instead of fragile API routes. Define an agent in one function,
point `useChat` at it from React, and the conversation survives page
refreshes, network blips, and process restarts.
([#3543](#3543))

    ```ts
    import { chat } from "@trigger.dev/sdk/ai";
    import { streamText } from "ai";
    import { openai } from "@ai-sdk/openai";

    export const myChat = chat.agent({
      id: "my-chat",
      run: async ({ messages, signal }) =>
streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }),
    });
    ```

    ```tsx
    import { useChat } from "@ai-sdk/react";
import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react";

const transport = useTriggerChatTransport({ task: "my-chat",
accessToken, startSession });
    const { messages, sendMessage } = useChat({ transport });
    ```

    **What you get:**

- **AI SDK `useChat` integration** — a custom
[`ChatTransport`](https://sdk.vercel.ai/docs/ai-sdk-ui/transport)
(`useTriggerChatTransport`) plugs straight into Vercel AI SDK's
`useChat` hook. Text streaming, tool calls, reasoning, and `data-*`
parts all work natively over Trigger.dev's realtime streams. No custom
API routes needed.
- **First-turn fast path (`chat.headStart`)** — opt-in handler that runs
the first turn's `streamText` step in your warm server process while the
agent run boots in parallel, cutting cold-start TTFC by roughly half
(measured 2801ms → 1218ms on `claude-sonnet-4-6`). The agent owns step
2+ (tool execution, persistence, hooks) so heavy deps stay where they
belong. Web Fetch handler works natively in Next.js, Hono, SvelteKit,
Remix, Workers, etc.; bridge to Express/Fastify/Koa via
`chat.toNodeListener`. New `@trigger.dev/sdk/chat-server` subpath.
- **Multi-turn durability via Sessions** — every chat is backed by a
durable Session that outlives any individual run. Conversations resume
across page refreshes, idle timeout, crashes, and deploys; `resume:
true` reconnects via `lastEventId` so clients only see new chunks.
`sessions.list` enumerates chats for inbox-style UIs.
- **Auto-accumulated history, delta-only wire** — the backend
accumulates the full conversation across turns; clients only ship the
new message each turn. Long chats never hit the 512 KiB body cap.
Register `hydrateMessages` to be the source of truth yourself.
- **Lifecycle hooks** — `onPreload`, `onChatStart`,
`onValidateMessages`, `hydrateMessages`, `onTurnStart`,
`onBeforeTurnComplete`, `onTurnComplete`, `onChatSuspend`,
`onChatResume` — for persistence, validation, and post-turn work.
- **Stop generation** — client-driven `transport.stopGeneration(chatId)`
aborts mid-stream; the run stays alive for the next message, partial
response is captured, and aborted parts (stuck `partial-call` tools,
in-progress reasoning) are auto-cleaned.
- **Tool approvals (HITL)** — tools with `needsApproval: true` pause
until the user approves or denies via `addToolApprovalResponse`. The
runtime reconciles the updated assistant message by ID and continues
`streamText`.
- **Steering and background injection** — `pendingMessages` injects user
messages between tool-call steps so users can steer the agent
mid-execution; `chat.inject()` + `chat.defer()` adds context from
background work (self-review, RAG, safety checks) between turns.
- **Actions** — non-turn frontend commands (undo, rollback, regenerate,
edit) sent via `transport.sendAction`. Fire `hydrateMessages` +
`onAction` only — no turn hooks, no `run()`. `onAction` can return a
`StreamTextResult` for a model response, or `void` for side-effect-only.
- **Typed state primitives** — `chat.local<T>` for per-run state
accessible from hooks, `run()`, tools, and subtasks (auto-serialized
through `ai.toolExecute`); `chat.store` for typed shared data between
agent and client; `chat.history` for reading and mutating the message
chain; `clientDataSchema` for typed `clientData` in every hook.
- **`chat.toStreamTextOptions()`** — one spread into `streamText` wires
up versioned system [Prompts](https://trigger.dev/docs/ai/prompts),
model resolution, telemetry metadata, compaction, steering, and
background injection.
- **Multi-tab coordination** — `multiTab: true` + `useMultiTabChat`
prevents duplicate sends and syncs state across browser tabs via
`BroadcastChannel`. Non-active tabs go read-only with live updates.
- **Network resilience** — built-in indefinite retry with bounded
backoff, reconnect on `online` / tab refocus / bfcache restore,
`Last-Event-ID` mid-stream resume. No app code needed.

See [/docs/ai-chat](https://trigger.dev/docs/ai-chat/overview) for the
full surface — quick start, three backend approaches (`chat.agent`,
`chat.createSession`, raw task), persistence and code-sandbox patterns,
type-level guides, and API reference.

- Stamp `gen_ai.conversation.id` (the chat id) on every span and metric
emitted from inside a `chat.task` or `chat.agent` run. Lets you filter
dashboard spans, runs, and metrics by the chat conversation that
produced them — independent of the run boundary, so multi-run chats
correlate cleanly. No code changes required on the user side.
([#3543](#3543))

- Fix `LocalsKey<T>` type incompatibility across dual-package builds.
The phantom value-type brand no longer uses a module-level `unique
symbol`, so a single TypeScript compilation that resolves the type from
both the ESM and CJS outputs (which can happen under certain pnpm
hoisting layouts) no longer sees two structurally-incompatible variants
of the same type.
([#3626](#3626))

- Unit-test `chat.agent` definitions offline with `mockChatAgent` from
`@trigger.dev/sdk/ai/test`. Drives a real agent's turn loop in-process —
no network, no task runtime — so you can send messages, actions, and
stop signals via driver methods, inspect captured output chunks, and
verify hooks fire. Pairs with `MockLanguageModelV3` from `ai/test` for
model mocking. `setupLocals` lets you pre-seed `locals` (DB clients,
service stubs) before `run()` starts.
([#3543](#3543))

The broader `runInMockTaskContext` harness it's built on lives at
`@trigger.dev/core/v3/test` — useful for unit-testing any task code, not
just chat.

- Retry `TASK_PROCESS_SIGSEGV` task crashes under the user's retry
policy instead of failing the run on the first segfault. SIGSEGV in Node
tasks is frequently non-deterministic (native addon races, JIT/GC
interaction, near-OOM in native code, host issues), so retrying on a
fresh process often succeeds. The retry is gated by the task's existing
`retry` config + `maxAttempts` — same path `TASK_PROCESS_SIGTERM` and
uncaught exceptions already use — so tasks without a retry policy still
fail fast.
([#3552](#3552))

- Add `region` to the runs list / retrieve API: filter runs by region
(`runs.list({ region: "..." })` / `filter[region]=<masterQueue>`) and
read each run's executing region from the new `region` field on the
response.
([#3612](#3612))

- **Sessions** — a durable, run-aware stream channel keyed on a stable
`externalId`. A Session is the unit of state that owns a multi-run
conversation: messages flow through `.in`, responses through `.out`,
both survive run boundaries. Sessions back the new `chat.agent` runtime,
and you can build on them directly for any pattern that needs durable
bi-directional streaming across runs.
([#3542](#3542))

    ```ts
    import { sessions, tasks } from "@trigger.dev/sdk";

    // Trigger a task and subscribe to its session output in one call
const { runId, stream } = await tasks.triggerAndSubscribe("my-task",
payload, {
      externalId: "user-456",
    });

    for await (const chunk of stream) {
      // ...
    }

// Enumerate existing sessions (powers inbox-style UIs without a
separate index)
for await (const s of sessions.list({ type: "chat.agent", tag:
"user:user-456" })) {
      console.log(s.id, s.externalId, s.createdAt, s.closedAt);
    }
    ```

See [/docs/ai-chat/overview](https://trigger.dev/docs/ai-chat/overview)
for the full surface — Sessions powers the durable, resumable chat
runtime described there.

## @trigger.dev/plugins@4.5.0-rc.0

### Patch Changes

- The public interfaces for a plugin system. Initially consolidated
authentication and authorization interfaces.
([#3499](#3499))
-   Updated dependencies:
    -   `@trigger.dev/core@4.5.0-rc.0`

## @trigger.dev/python@4.5.0-rc.0

### Patch Changes

-   Updated dependencies:
    -   `@trigger.dev/sdk@4.5.0-rc.0`
    -   `@trigger.dev/core@4.5.0-rc.0`
    -   `@trigger.dev/build@4.5.0-rc.0`

## @trigger.dev/react-hooks@4.5.0-rc.0

### Patch Changes

-   Updated dependencies:
    -   `@trigger.dev/core@4.5.0-rc.0`

## @trigger.dev/redis-worker@4.5.0-rc.0

### Patch Changes

- Add MollifierBuffer and MollifierDrainer primitives for trigger burst
smoothing.
([#3614](#3614))

MollifierBuffer (`accept`, `pop`, `ack`, `requeue`, `fail`,
`evaluateTrip`) is a per-env FIFO over Redis with atomic Lua transitions
for status tracking. `evaluateTrip` is a sliding-window trip evaluator
the webapp gate uses to detect per-env trigger bursts.

MollifierDrainer pops entries through a polling loop with a
user-supplied handler. The loop survives transient Redis errors via
capped exponential backoff (up to 5s), and per-env pop failures don't
poison the rest of the batch — one env's blip is logged and counted as
failed for that tick. Rotation is two-level: orgs at the top, envs
within each org. The buffer maintains `mollifier:orgs` and
`mollifier:org-envs:${orgId}` atomically with per-env queues, so the
drainer walks orgs → envs directly without an in-memory cache. The
`maxOrgsPerTick` option (default 500) caps how many orgs are scheduled
per tick; for each picked org, one env is popped (rotating round-robin
within the org). An org with N envs gets the same per-tick scheduling
slot as an org with 1 env, so tenant-level drainage throughput is
determined by org count rather than env count.

-   Updated dependencies:
    -   `@trigger.dev/core@4.5.0-rc.0`

## @trigger.dev/rsc@4.5.0-rc.0

### Patch Changes

-   Updated dependencies:
    -   `@trigger.dev/core@4.5.0-rc.0`

## @trigger.dev/schema-to-json@4.5.0-rc.0

### Patch Changes

-   Updated dependencies:
    -   `@trigger.dev/core@4.5.0-rc.0`

</details>

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants