You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When an external ML worker pulls tasks from a NATS-backed async job but fails to post results back, the tasks silently die in NATS after exhausting max_deliver retries. Django is never notified, no error is logged on the job record, and the job remains in STARTED status indefinitely.
Reproduction
Create an async_api job with N images
Have a worker pull tasks via GET /jobs/{id}/tasks/
Worker fails to process or crashes — never calls POST /jobs/{id}/result/
Wait for NATS ack_wait (30s default) × max_deliver (5) = ~2.5 minutes per message
Job status: STARTED (never transitions to FAILURE)
Root Cause
The result-processing path (process_nats_pipeline_result) is the only place that updates job progress and logs errors. When a worker pulls a task but never posts results, this code path is never invoked. NATS handles retries internally and eventually drops the message — but there is no callback, webhook, or polling mechanism for Django to detect that messages have been permanently dropped.
The _fail_job() function added in #1162 only triggers when Redis state is missing during result processing. It does not cover the case where result processing never happens at all.
Proposed Solution
Add a stale consumer detection mechanism. Two possible approaches:
Option A: Check inside the /tasks/ endpoint
When reserve_tasks() returns an empty list, check the NATS consumer state. If num_pending == 0 and num_ack_pending == 0 but the job still has remaining images (from Redis or the Job progress), mark the job as FAILURE with a descriptive error message.
Pros: Runs naturally as workers poll, no extra infrastructure. Cons: Requires a worker to keep polling; if all workers stop, the check never runs.
Option B: Periodic Celery beat task
Add a beat task that runs every few minutes, queries all STARTED async_api jobs, checks their NATS consumer state, and fails any job where the consumer is exhausted but progress is incomplete.
Pros: Catches stalled jobs even if no workers are polling. Can also detect jobs where the NATS stream was deleted (e.g., container restart with ephemeral storage). Cons: Adds a periodic task and requires NATS connectivity from the beat worker.
Option C: Both
Use Option A for fast detection during active polling, and Option B as a safety net.
NATS consumer config: max_deliver=5, ack_wait=30s (configurable via NATS_TASK_TTR)
The job's dispatch_mode is async_api and pipeline is set
JetStream storage is ephemeral (/tmp/nats/jetstream), so a NATS container restart also causes silent data loss — the beat task (Option B) would catch this too
Acceptance Criteria
A job whose NATS tasks have all been exhausted (dead) is detected and marked as FAILURE
An error message is logged on the job record explaining that tasks were dropped
The detection works even if no external workers are actively polling
Existing tests pass; new test covers the dead-message scenario
Summary
When an external ML worker pulls tasks from a NATS-backed async job but fails to post results back, the tasks silently die in NATS after exhausting
max_deliverretries. Django is never notified, no error is logged on the job record, and the job remains in STARTED status indefinitely.Reproduction
GET /jobs/{id}/tasks/POST /jobs/{id}/result/ack_wait(30s default) ×max_deliver(5) = ~2.5 minutes per messageObserved Behavior
num_pending=0,num_ack_pending=0,num_redelivered=756Root Cause
The result-processing path (
process_nats_pipeline_result) is the only place that updates job progress and logs errors. When a worker pulls a task but never posts results, this code path is never invoked. NATS handles retries internally and eventually drops the message — but there is no callback, webhook, or polling mechanism for Django to detect that messages have been permanently dropped.The
_fail_job()function added in #1162 only triggers when Redis state is missing during result processing. It does not cover the case where result processing never happens at all.Proposed Solution
Add a stale consumer detection mechanism. Two possible approaches:
Option A: Check inside the
/tasks/endpointWhen
reserve_tasks()returns an empty list, check the NATS consumer state. Ifnum_pending == 0andnum_ack_pending == 0but the job still has remaining images (from Redis or the Job progress), mark the job as FAILURE with a descriptive error message.Pros: Runs naturally as workers poll, no extra infrastructure.
Cons: Requires a worker to keep polling; if all workers stop, the check never runs.
Option B: Periodic Celery beat task
Add a beat task that runs every few minutes, queries all STARTED async_api jobs, checks their NATS consumer state, and fails any job where the consumer is exhausted but progress is incomplete.
Pros: Catches stalled jobs even if no workers are polling. Can also detect jobs where the NATS stream was deleted (e.g., container restart with ephemeral storage).
Cons: Adds a periodic task and requires NATS connectivity from the beat worker.
Option C: Both
Use Option A for fast detection during active polling, and Option B as a safety net.
Additional Context
max_deliver=5,ack_wait=30s(configurable viaNATS_TASK_TTR)dispatch_modeisasync_apiandpipelineis set/tmp/nats/jetstream), so a NATS container restart also causes silent data loss — the beat task (Option B) would catch this tooAcceptance Criteria