Skip to content

UN-3322 [FIX] Optimize dashboard metrics aggregation task#1837

Open
athul-rs wants to merge 11 commits intomainfrom
UN-1798/optimize-metrics-aggregation
Open

UN-3322 [FIX] Optimize dashboard metrics aggregation task#1837
athul-rs wants to merge 11 commits intomainfrom
UN-1798/optimize-metrics-aggregation

Conversation

@athul-rs
Copy link
Contributor

@athul-rs athul-rs commented Mar 8, 2026

What

  • Optimize the periodic aggregate_metrics_from_sources Celery task to reduce DB queries by ~55%
  • Add self-healing Redis distributed lock to prevent stuck-lock incidents
  • Tighten active org filter window from 2 months to 7 days

Why

  • Production monitoring showed CPU spikes every 15 minutes during aggregation runs (~42 queries per active org)
  • Stuck Redis lock incidents required manual intervention (redis-cli DEL) when workers were OOM-killed
  • Dormant orgs (active 2-8 weeks ago) were being re-queried unnecessarily, overwriting identical monthly totals

How

  • Combined LLM query: 4 separate LLM metric queries merged into 1 using Django conditional aggregation (Count(filter=Q(...)), Sum) — saves 9 queries/org
  • Cached org identifier: Pre-resolve Organization.organization_id string once per org for PageUsage queries instead of per-metric DB lookup — saves 5 lookups/org
  • Daily+monthly query merge: Reuse single DAY-granularity query (from 2-month window) for both daily and monthly aggregation tiers, splitting results in Python — saves ~9 queries/org. Same pattern proven in backfill management command
  • Tightened active org filter: WorkflowExecution.created_at__gte=daily_start (7 days) instead of monthly_start (2 months). Dormant orgs re-enter the window when activity resumes; monthly totals were already persisted by prior runs
  • Self-healing lock: Store time.time() as lock value. On contention, check age > timeout → reclaim stale lock. Handles migration from old "running" string value via TypeError/ValueError branch
  • Cleanup: Removed unused org_identifier parameter from get_failed_pages (joins through workflow FK, not PageUsage)

Trade-off: Performance vs Stability

This PR trades marginal stability for significant performance gains. Specifically:

  • The combined LLM query means if the usage table has a schema issue (e.g., corrupted llm_usage_reason column), all 4 LLM metrics fail together instead of independently. Mitigation: the outer try/except per org catches this and logs the error — other orgs and non-LLM metrics are unaffected.
  • The daily+monthly merge means slightly more Python memory per org (holding ~2 months of daily rows vs 7 days). In practice this is negligible (~hundreds of rows vs tens).
  • The 7-day active org window means an org that goes dormant for >7 days stops being re-aggregated. This is correct behavior (monthly totals were already written), but if source data is retroactively corrected beyond 7 days, the monthly total won't self-heal until the org becomes active again. The backfill command covers this case.

Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)

  • No. All changes are internal to the aggregation task logic:
    • Output is identical (same metric names, values, upsert keys)
    • Thin wrapper methods (get_llm_calls, get_challenges, etc.) preserve backward compatibility for views.py and backfill command
    • Self-healing lock is backward-compatible with existing "running" string in Redis
    • Safe to deploy while task is running: current run finishes with old code, next run picks up new code

Database Migrations

  • None required

Env Config

  • No new env vars

Relevant Docs

  • N/A

Related Issues or PRs

  • UN-1798

Dependencies Versions

  • No new dependencies

Notes on Testing

  • Verify aggregation task completes successfully after deploy
  • Check logs for Aggregation: X active orgs out of Y total — X will be lower than before (7-day vs 2-month window), this is expected
  • Compare metric values in EventMetricsHourly/Daily/Monthly before and after — should be identical
  • Test lock recovery: redis-cli SET "dashboard_metrics:aggregation_lock" "running", verify next run reclaims it and logs a warning
  • Verify backfill command: python manage.py backfill_metrics --days=1 --dry-run

Screenshots

N/A (backend-only change)

Checklist

I have read and understood the Contribution Guidelines.

Reduce DB queries per org from ~42 to ~19 and add self-healing lock:

- Combine 4 LLM metric queries into 1 using conditional aggregation
- Cache org identifier lookup for PageUsage-based metrics
- Merge daily+monthly queries into single source query (backfill pattern)
- Tighten active org filter from 2 months to 7 days
- Add self-healing Redis lock to prevent stuck-lock incidents
- Remove unused org_identifier param from get_failed_pages

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 8, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Resolves org identifiers once, consolidates LLM metric queries into a combined/split flow, adds a self-healing timestamp-based aggregation lock, and introduces helper primitives to aggregate hourly, daily, and monthly metrics with fewer queries.

Changes

Cohort / File(s) Summary
Backfill Command
backend/dashboard_metrics/management/commands/backfill_metrics.py
Expanded _collect_metrics signature to accept org_identifier; extracted _ingest_results and _ingest_daily_results; queries LLM metrics in bulk via MetricsQueryService.get_llm_metrics_split; passes org_identifier into relevant non-LLM queries.
Metrics Query Service
backend/dashboard_metrics/services.py
Added _resolve_org_identifier and LLM_METRIC_KEYS; added get_llm_metrics_combined and get_llm_metrics_split; updated get_pages_processed to accept org_identifier; removed standalone get_llm_usage_cost; get_all_metrics_summary now resolves identifier once and uses combined LLM metrics.
Aggregation Tasks
backend/dashboard_metrics/tasks.py
Added _upsert_agg, _acquire_aggregation_lock (self-healing lock), _aggregate_single_metric, and _aggregate_llm_combined; refactored aggregate_metrics_from_sources to use new lock, pre-resolve org_identifier, aggregate hourly/daily/monthly via helpers, and route LLM metrics through a combined path.
Views (live-series)
backend/dashboard_metrics/views.py
Added _build_series_entry, _build_error_entry, and _fetch_live_series; live-series now batches LLM metrics via get_llm_metrics_split, supports single-metric filtering, and returns consolidated series with partial_errors for failed fetches.

Sequence Diagram(s)

sequenceDiagram
    participant Task as AggregationTask
    participant Lock as LockStore
    participant Service as MetricsQueryService
    participant DB as Database

    Task->>Lock: _acquire_aggregation_lock()
    Lock->>DB: read/update lock record (timestamp)
    alt lock acquired
        Task->>Service: _resolve_org_identifier(org_id)
        Service->>DB: fetch Organization.organization_id (if needed)
        Task->>Service: get_llm_metrics_split(org_id, start/end, granularity)
        Service->>DB: execute combined LLM query (single query per granularity)
        DB-->>Service: combined LLM rows
        Service-->>Task: split into per-metric series (llm_calls, challenges, ...)
        Task->>Service: _aggregate_single_metric(...) for each non-LLM metric
        Service->>DB: metric-specific queries (hourly + daily/monthly)
        DB-->>Service: metric rows
        Service-->>Task: aggregated buckets (hourly/daily/monthly)
        Task->>DB: bulk upsert aggregated metrics
        Task->>Lock: release lock
    else lock held or stale reclaimed
        Task-->>Task: skip or reclaim then proceed
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description check ✅ Passed The PR description comprehensively covers all template sections with detailed explanations of what changed, why, how it was done, impact analysis, and testing guidance.
Docstring Coverage ✅ Passed Docstring coverage is 93.10% which is sufficient. The required threshold is 80.00%.
Title check ✅ Passed The title "UN-3322 [FIX] Optimize dashboard metrics aggregation task" clearly summarizes the main change: optimization of the dashboard metrics aggregation task with performance improvements and a self-healing lock mechanism.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-1798/optimize-metrics-aggregation
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Tip

CodeRabbit can use TruffleHog to scan for secrets in your code with verification capabilities.

Add a TruffleHog config file (e.g. trufflehog-config.yml, trufflehog.yml) to your project to customize detectors and scanning behavior. The tool runs only when a config file is present.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/dashboard_metrics/tasks.py (1)

507-519: ⚠️ Potential issue | 🟠 Major

WorkflowExecution is too narrow for the new active-org prefilter.

In cloud deployments, hitl_completions is aggregated from HITLQueue.approved_at, so an organization can have fresh completions inside the 7-day window even when its latest WorkflowExecution.created_at is older. This filter skips that org entirely and leaves those daily/monthly buckets stale. Please include the other source tables that can emit fresh metrics in the window, or exempt those metrics from the prefilter.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/dashboard_metrics/tasks.py` around lines 507 - 519, The
active_org_ids prefilter currently gathers orgs only from
WorkflowExecution.created_at (see variable active_org_ids and daily_start),
which misses orgs with recent HITL completions recorded via
HITLQueue.approved_at and therefore causes hitl_completions buckets to be
skipped; modify the prefilter to also include organization IDs from HITLQueue
(use approved_at >= daily_start) and any other recent-metric source tables that
can emit fresh metrics in the 7-day window, or alternatively exclude
hitl_completions (and other affected metric calculations) from using
active_org_ids so those metrics always scan their source tables without the
prefilter. Ensure you update the set construction that builds active_org_ids to
union in these additional org IDs (or branch the metric path to bypass the
prefilter) so daily/monthly buckets for HITL and similar sources are not left
stale.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/dashboard_metrics/services.py`:
- Around line 246-250: The wrapper helpers get_llm_calls, get_challenges,
get_summarization_calls, and get_llm_usage_cost each call
get_llm_metrics_combined separately, causing the heavy combined aggregation to
run multiple times; update the hot callers (notably
backfill_metrics.Command._collect_metrics and views.live_series) to call
MetricsQueryService.get_llm_metrics_combined once per organization/date
range/granularity and then derive the four values from that single result
instead of invoking the single-metric wrappers repeatedly; leave the wrapper
helpers for convenience but change the callers to fetch the combined result and
map to
{"period","llm_calls","challenges","summarization_calls","llm_usage_cost"} (or
extract the specific fields needed) to avoid quadruple aggregation.

In `@backend/dashboard_metrics/tasks.py`:
- Around line 338-340: The daily cutoff comparison uses a timeful daily_start
against midnight-truncated day_ts, which excludes the boundary day; normalize
the cutoff to a day boundary by comparing dates (e.g., day_ts.date() >=
daily_start.date()) or by truncating daily_start to midnight before comparisons
so the bucket exactly 7 days ago is included; apply the same change to the other
occurrence that uses day_ts and daily_start (the block that builds the key with
(org_id, day_ts.date().isoformat(), metric_name, "default", "") and the similar
logic around the second helper).

---

Outside diff comments:
In `@backend/dashboard_metrics/tasks.py`:
- Around line 507-519: The active_org_ids prefilter currently gathers orgs only
from WorkflowExecution.created_at (see variable active_org_ids and daily_start),
which misses orgs with recent HITL completions recorded via
HITLQueue.approved_at and therefore causes hitl_completions buckets to be
skipped; modify the prefilter to also include organization IDs from HITLQueue
(use approved_at >= daily_start) and any other recent-metric source tables that
can emit fresh metrics in the 7-day window, or alternatively exclude
hitl_completions (and other affected metric calculations) from using
active_org_ids so those metrics always scan their source tables without the
prefilter. Ensure you update the set construction that builds active_org_ids to
union in these additional org IDs (or branch the metric path to bypass the
prefilter) so daily/monthly buckets for HITL and similar sources are not left
stale.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1bb907c1-14fd-4aaa-be13-db97326b34e6

📥 Commits

Reviewing files that changed from the base of the PR and between c1667cd and 27b1575.

📒 Files selected for processing (3)
  • backend/dashboard_metrics/management/commands/backfill_metrics.py
  • backend/dashboard_metrics/services.py
  • backend/dashboard_metrics/tasks.py

athul-rs and others added 2 commits March 10, 2026 22:33
- Replace 4 thin LLM wrapper methods with get_llm_metrics_split() that
  fetches combined data once and splits into per-metric series
- Update live_series view to use single combined query (4→1 DB queries)
- Update backfill_metrics command to use combined query (8→2 DB queries)
- Fix daily_start boundary day exclusion by truncating to midnight
- Rename resolved → page_usage_org_id for clarity

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
backend/dashboard_metrics/views.py (1)

840-843: Dead code: metric_name == "llm_usage" check is now unreachable.

Since LLM metrics (including llm_usage) are handled in the earlier block (lines 782-822) and excluded from metric_queries via LLM_METRIC_KEYS, this condition at line 842 will never be true for non-LLM metrics processed here.

🧹 Suggested simplification
                 series.append(
                     {
                         "metric_name": metric_name,
-                        "metric_type": MetricType.HISTOGRAM
-                        if metric_name == "llm_usage"
-                        else MetricType.COUNTER,
+                        "metric_type": MetricType.COUNTER,
                         "data": [
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/dashboard_metrics/views.py` around lines 840 - 843, The conditional
that sets MetricType based on metric_name == "llm_usage" is dead code because
LLM metrics (including "llm_usage") are handled earlier and excluded from
metric_queries via LLM_METRIC_KEYS; update the block that constructs the metric
dict (the code using "metric_name" and MetricType) to always assign
MetricType.COUNTER (remove the if/else branch) so the metric creation is
simplified and no unreachable check remains.
backend/dashboard_metrics/management/commands/backfill_metrics.py (1)

61-67: Class attribute type hint for mutable defaults.

Static analysis flags LLM_METRIC_TYPES as a mutable class attribute. While this is safe since it's read-only in practice, you could add ClassVar for explicit typing if desired.

🧹 Optional type hint improvement
+from typing import Any, ClassVar
...
-    LLM_METRIC_TYPES: dict[str, bool] = {
+    LLM_METRIC_TYPES: ClassVar[dict[str, bool]] = {
         "llm_calls": False,
         "challenges": False,
         "summarization_calls": False,
         "llm_usage": True,
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/dashboard_metrics/management/commands/backfill_metrics.py` around
lines 61 - 67, LLM_METRIC_TYPES is a mutable class attribute flagged by static
analysis; annotate it as a class-level constant by adding a ClassVar type hint
(e.g., from typing import ClassVar) so its declaration becomes
ClassVar[dict[str, bool]] and makes the intent explicit; locate the
LLM_METRIC_TYPES attribute in the class in backfill_metrics.py and update its
type annotation accordingly while keeping the same value.
backend/dashboard_metrics/tasks.py (2)

285-359: Cognitive complexity is elevated but the structure is clear.

SonarCloud flags this function for complexity (16 vs 15 allowed). The logic is straightforward (hourly loop + daily/monthly loop with split). Consider extracting the daily/monthly bucket loop into a small helper if you want to address the warning, but it's not critical.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/dashboard_metrics/tasks.py` around lines 285 - 359, The function
_aggregate_single_metric exceeds allowed cognitive complexity; extract the
daily+monthly bucketing logic (the second query loop that builds daily_agg and
monthly_buckets and the subsequent monthly_agg population) into a small helper
function (e.g., _bucket_daily_and_monthly or similar) that accepts rows/params
(org_id, daily_start, metric_name, metric_type, daily_agg, monthly_agg) and
returns/updates monthly_buckets, then call that helper from
_aggregate_single_metric and keep the hourly loop as-is to reduce complexity.

362-440: Cognitive complexity warning for _aggregate_llm_combined.

SonarCloud reports complexity of 26 (vs 15 allowed). The nested loops over llm_combined_fields inside both the daily and monthly paths contribute significantly. Extracting the field iteration into a helper could reduce nesting.

♻️ Suggested refactor to reduce complexity
def _ingest_llm_row_to_agg(
    row: dict,
    ts_str: str,
    org_id: str,
    agg_dict: dict,
    llm_combined_fields: dict,
) -> None:
    """Ingest a single LLM combined row into an aggregation dict."""
    for field, (metric_name, metric_type) in llm_combined_fields.items():
        value = row[field] or 0
        key = (org_id, ts_str, metric_name, "default", "")
        if key not in agg_dict:
            agg_dict[key] = {"metric_type": metric_type, "value": 0, "count": 0}
        agg_dict[key]["value"] += value
        agg_dict[key]["count"] += 1

Then call this helper in both hourly and daily loops.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/dashboard_metrics/tasks.py` around lines 362 - 440, The
_aggregate_llm_combined function is over-complex due to duplicated nested loops
over llm_combined_fields; extract the inner per-field logic into a helper (e.g.,
_ingest_llm_row_to_agg) that accepts row, ts_str, org_id, agg_dict, and
llm_combined_fields and performs the value extraction, key construction,
metric_type init, and accumulation, then call that helper from the hourly loop
and from the daily branch to replace the duplicated code; for the monthly
bucketing extract a small helper (e.g., _ingest_llm_row_to_monthly_bucket) that
builds the (month_key_str, metric_name) bkey, initializes bucket entries and
accumulates value/count, and use it in the monthly loop—this reduces nesting and
cognitive complexity while preserving existing behavior in
_aggregate_llm_combined.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/dashboard_metrics/tasks.py`:
- Around line 203-243: _acquire_aggregation_lock has a race between
cache.delete(...) and cache.add(...) when reclaiming stale/corrupted locks;
replace the non-atomic delete+add with atomic Redis operations using the raw
client from django_redis (get_redis_connection) so you perform SET NX EX for the
initial grab and use GET/GETSET (or GETSET + check old value and set TTL) when
reclaiming stale locks to ensure you only replace the lock if it still contains
the stale timestamp you observed; update references to AGGREGATION_LOCK_KEY and
the cache.add/cache.delete usage in _acquire_aggregation_lock to use
redis.set(..., nx=True, ex=...) and redis.getset / expire logic with a
float-check of the previous value to avoid the race.

---

Nitpick comments:
In `@backend/dashboard_metrics/management/commands/backfill_metrics.py`:
- Around line 61-67: LLM_METRIC_TYPES is a mutable class attribute flagged by
static analysis; annotate it as a class-level constant by adding a ClassVar type
hint (e.g., from typing import ClassVar) so its declaration becomes
ClassVar[dict[str, bool]] and makes the intent explicit; locate the
LLM_METRIC_TYPES attribute in the class in backfill_metrics.py and update its
type annotation accordingly while keeping the same value.

In `@backend/dashboard_metrics/tasks.py`:
- Around line 285-359: The function _aggregate_single_metric exceeds allowed
cognitive complexity; extract the daily+monthly bucketing logic (the second
query loop that builds daily_agg and monthly_buckets and the subsequent
monthly_agg population) into a small helper function (e.g.,
_bucket_daily_and_monthly or similar) that accepts rows/params (org_id,
daily_start, metric_name, metric_type, daily_agg, monthly_agg) and
returns/updates monthly_buckets, then call that helper from
_aggregate_single_metric and keep the hourly loop as-is to reduce complexity.
- Around line 362-440: The _aggregate_llm_combined function is over-complex due
to duplicated nested loops over llm_combined_fields; extract the inner per-field
logic into a helper (e.g., _ingest_llm_row_to_agg) that accepts row, ts_str,
org_id, agg_dict, and llm_combined_fields and performs the value extraction, key
construction, metric_type init, and accumulation, then call that helper from the
hourly loop and from the daily branch to replace the duplicated code; for the
monthly bucketing extract a small helper (e.g.,
_ingest_llm_row_to_monthly_bucket) that builds the (month_key_str, metric_name)
bkey, initializes bucket entries and accumulates value/count, and use it in the
monthly loop—this reduces nesting and cognitive complexity while preserving
existing behavior in _aggregate_llm_combined.

In `@backend/dashboard_metrics/views.py`:
- Around line 840-843: The conditional that sets MetricType based on metric_name
== "llm_usage" is dead code because LLM metrics (including "llm_usage") are
handled earlier and excluded from metric_queries via LLM_METRIC_KEYS; update the
block that constructs the metric dict (the code using "metric_name" and
MetricType) to always assign MetricType.COUNTER (remove the if/else branch) so
the metric creation is simplified and no unreachable check remains.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 30788ac3-29a5-450c-9105-71430a26b833

📥 Commits

Reviewing files that changed from the base of the PR and between 27b1575 and 10b6b0d.

📒 Files selected for processing (4)
  • backend/dashboard_metrics/management/commands/backfill_metrics.py
  • backend/dashboard_metrics/services.py
  • backend/dashboard_metrics/tasks.py
  • backend/dashboard_metrics/views.py

athul-rs and others added 4 commits March 10, 2026 23:22
- Extract _upsert_agg helper in tasks.py to eliminate repeated
  dict-init-and-increment pattern in both aggregation functions
- Extract _build_series_entry and _build_error_entry helpers in views.py
  to deduplicate series construction in live_series endpoint

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move LLM and non-LLM metric fetching logic out of the view
method into a standalone function, bringing the view's cognitive
complexity well under the Sonar threshold of 15.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/dashboard_metrics/tasks.py (1)

450-462: ⚠️ Potential issue | 🔴 Critical

WorkflowExecution prefilter will silently skip orgs with recent PageUsage, Usage, or HITLQueue data.

The prefilter (lines 450-462) uses only WorkflowExecution with a 7-day window to select which organizations to process. However, the following metrics query their data directly and independently:

  • pages_processed → queries PageUsage directly
  • llm_calls, challenges, summarization_calls, llm_usage → query Usage directly
  • hitl_reviews, hitl_completions → query HITLQueue directly

If an org has recent activity in any of these tables but no WorkflowExecution in the past 7 days, the entire org is skipped and all its metrics are silently not aggregated. This is a data integrity issue.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/dashboard_metrics/tasks.py` around lines 450 - 462, The prefilter
active_org_ids currently only pulls org IDs from WorkflowExecution using
daily_start; update it to include org IDs with recent activity from PageUsage,
Usage, and HITLQueue as well (using the same daily_start window) so that
pages_processed, llm_calls/challenges/summarization_calls/llm_usage, and
hitl_reviews/hitl_completions aren’t skipped; implement this by computing the
union of distinct organization IDs from
WorkflowExecution.values_list("workflow__organization_id"),
PageUsage.values_list("organization_id"), Usage.values_list("organization_id"),
and HITLQueue.values_list("organization_id") into the existing active_org_ids
set (or replace active_org_ids with a combined queryset/set) to ensure all orgs
with recent activity in any of those tables are processed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/dashboard_metrics/tasks.py`:
- Around line 360-383: The combined-LLM loops are upserting zeroed metrics for
fields that weren’t actually present in the bucket; modify the fan-out in the
loops that iterate over MetricsQueryService.get_llm_metrics_combined so you only
call _upsert_agg for a given (field, metric_name) when the field was truly
present in the row: first prefer a per-field presence flag if the query exposes
one (e.g. row.get(f"{field}_present") is truthy), otherwise require row[field]
is not None (and skip when row[field] is missing/None) before computing value
and calling _upsert_agg for hourly_agg, daily_agg, and monthly_agg; keep the
surrounding timestamp handling (_truncate_to_hour/_day/_month,
llm_combined_fields) unchanged.

In `@backend/dashboard_metrics/views.py`:
- Around line 49-73: Replace the implicit llm_usage-vs-everything-else logic
with an explicit metric type map (e.g., METRIC_TYPE_MAP = {"llm_usage":
MetricType.HISTOGRAM, "pages_processed": MetricType.HISTOGRAM, ...}) and use
that map inside _build_series_entry and _build_error_entry to determine
metric_type (falling back to MetricType.COUNTER if not listed); update
_build_series_entry to read metric_type = METRIC_TYPE_MAP.get(metric_name,
MetricType.COUNTER) and include it in the returned dict, and update
_build_error_entry to also look up and include the same metric_type so partial
failures preserve the correct type.
- Around line 792-829: The metric mapping in metric_queries dropped the
"failed_pages" entry so requests to GET /live-series?metric_name=failed_pages
return empty series; restore the mapping by adding "failed_pages":
MetricsQueryService.get_failed_pages to the metric_queries dict (in the same
block where documents_processed, pages_processed, etc. are defined) so
_build_series_entry and the aggregation in backend/dashboard_metrics/tasks.py
continue to work, or alternatively add explicit validation at the start of the
handler to reject unknown metrics (using MetricsQueryService.LLM_METRIC_KEYS and
the metric_queries keys) so a missing metric is surfaced instead of returning an
empty series.

---

Outside diff comments:
In `@backend/dashboard_metrics/tasks.py`:
- Around line 450-462: The prefilter active_org_ids currently only pulls org IDs
from WorkflowExecution using daily_start; update it to include org IDs with
recent activity from PageUsage, Usage, and HITLQueue as well (using the same
daily_start window) so that pages_processed,
llm_calls/challenges/summarization_calls/llm_usage, and
hitl_reviews/hitl_completions aren’t skipped; implement this by computing the
union of distinct organization IDs from
WorkflowExecution.values_list("workflow__organization_id"),
PageUsage.values_list("organization_id"), Usage.values_list("organization_id"),
and HITLQueue.values_list("organization_id") into the existing active_org_ids
set (or replace active_org_ids with a combined queryset/set) to ensure all orgs
with recent activity in any of those tables are processed.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 269487ef-bace-4842-8c97-b0e70bda04d4

📥 Commits

Reviewing files that changed from the base of the PR and between 10b6b0d and 6341fca.

📒 Files selected for processing (2)
  • backend/dashboard_metrics/tasks.py
  • backend/dashboard_metrics/views.py

Add _HISTOGRAM_METRICS set and _metric_type helper to correctly
classify pages_processed and failed_pages as histograms, consistent
with tasks.py and backfill_metrics.py. Previously only llm_usage
was marked as histogram in the views layer.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
backend/dashboard_metrics/views.py (1)

76-127: Cognitive complexity exceeds threshold (22 vs 15 allowed).

SonarCloud flags this function for high cognitive complexity. Consider extracting the LLM metrics handling (lines 94-110) into a separate helper function to reduce nesting.

Suggested refactor
+def _fetch_llm_series(
+    org_id: str,
+    start_date: datetime,
+    end_date: datetime,
+    granularity: str,
+    requested_metric: str | None,
+    llm_metric_keys: dict,
+) -> tuple[list[dict], list[str]]:
+    """Fetch LLM metrics series."""
+    series: list[dict] = []
+    errors: list[str] = []
+    try:
+        llm_split = MetricsQueryService.get_llm_metrics_split(
+            org_id, start_date, end_date, granularity
+        )
+        for name, data in llm_split.items():
+            if not requested_metric or name == requested_metric:
+                series.append(_build_series_entry(name, data))
+    except Exception:
+        logger.exception("Failed to fetch LLM metrics")
+        for name in llm_metric_keys:
+            if not requested_metric or name == requested_metric:
+                errors.append(name)
+                series.append(_build_error_entry(name))
+    return series, errors
+
+
 def _fetch_live_series(
     org_id: str,
     start_date: datetime,
     end_date: datetime,
     granularity: str,
     metric_queries: dict,
     requested_metric: str | None,
 ) -> tuple[list[dict], list[str]]:
-    """Fetch all metric series (LLM combined + individual).
-
-    Returns:
-        Tuple of (series list, error names list).
-    """
-    series: list[dict] = []
-    errors: list[str] = []
+    """Fetch all metric series (LLM combined + individual)."""
     llm_metric_keys = MetricsQueryService.LLM_METRIC_KEYS
 
-    # Fetch all 4 LLM metrics in a single query
+    series: list[dict] = []
+    errors: list[str] = []
+
     if not requested_metric or requested_metric in llm_metric_keys:
-        try:
-            llm_split = MetricsQueryService.get_llm_metrics_split(
-                org_id,
-                start_date,
-                end_date,
-                granularity,
-            )
-            for name, data in llm_split.items():
-                if not requested_metric or name == requested_metric:
-                    series.append(_build_series_entry(name, data))
-        except Exception:
-            logger.exception("Failed to fetch LLM metrics")
-            for name in llm_metric_keys:
-                if not requested_metric or name == requested_metric:
-                    errors.append(name)
-                    series.append(_build_error_entry(name))
+        llm_series, llm_errors = _fetch_llm_series(
+            org_id, start_date, end_date, granularity, requested_metric, llm_metric_keys
+        )
+        series.extend(llm_series)
+        errors.extend(llm_errors)
 
     # Filter non-LLM metrics if a specific metric was requested
     if requested_metric:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/dashboard_metrics/views.py` around lines 76 - 127, The function
_fetch_live_series is too complex; extract the LLM-specific block (the
try/except that calls MetricsQueryService.get_llm_metrics_split, iterates
llm_split, appends _build_series_entry or on error logs via logger.exception and
appends _build_error_entry for each MetricsQueryService.LLM_METRIC_KEYS item)
into a new helper function (e.g., _fetch_llm_series) that accepts org_id,
start_date, end_date, granularity, requested_metric and returns (list_of_series,
list_of_error_names); then replace the original LLM block in _fetch_live_series
with a single call to that helper and merge its returned series/errors with the
rest of the function, keeping existing use of _build_series_entry,
_build_error_entry and logger.exception.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@backend/dashboard_metrics/views.py`:
- Around line 76-127: The function _fetch_live_series is too complex; extract
the LLM-specific block (the try/except that calls
MetricsQueryService.get_llm_metrics_split, iterates llm_split, appends
_build_series_entry or on error logs via logger.exception and appends
_build_error_entry for each MetricsQueryService.LLM_METRIC_KEYS item) into a new
helper function (e.g., _fetch_llm_series) that accepts org_id, start_date,
end_date, granularity, requested_metric and returns (list_of_series,
list_of_error_names); then replace the original LLM block in _fetch_live_series
with a single call to that helper and merge its returned series/errors with the
rest of the function, keeping existing use of _build_series_entry,
_build_error_entry and logger.exception.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 0bd16704-6ee8-43fe-a52c-ca5340d7c29e

📥 Commits

Reviewing files that changed from the base of the PR and between 6341fca and 0cece48.

📒 Files selected for processing (2)
  • backend/dashboard_metrics/tasks.py
  • backend/dashboard_metrics/views.py

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
backend/dashboard_metrics/views.py (1)

72-79: ⚠️ Potential issue | 🟡 Minor

Add metric_type to error entries for response consistency.

The error entry is missing the metric_type field. When partial failures occur, the frontend receives inconsistent response shapes—successful entries have metric_type while error entries don't. This was noted in a previous review.

Proposed fix
 def _build_error_entry(metric_name: str) -> dict:
     """Build an error series entry for a failed metric."""
     return {
         "metric_name": metric_name,
+        "metric_type": _metric_type(metric_name),
         "error": "unavailable",
         "data": [],
         "total_value": 0,
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/dashboard_metrics/views.py` around lines 72 - 79, The error entry
returned by _build_error_entry is missing the metric_type field, causing
inconsistent response shapes; update _build_error_entry (function name:
_build_error_entry) to accept an optional metric_type parameter (e.g.,
metric_type: Optional[str] = None) and include "metric_type": metric_type (or a
default like "unavailable"/None) in the returned dict so error entries match the
successful entries' shape.
🧹 Nitpick comments (1)
backend/dashboard_metrics/views.py (1)

82-134: Consider extracting helpers to reduce cognitive complexity.

SonarCloud flags this function with cognitive complexity 22 (limit 15). The logic is correct, but the nested conditionals for LLM vs non-LLM paths and per-metric error handling contribute to the complexity.

Suggested refactor: extract LLM fetching
+def _fetch_llm_series(
+    org_id: str,
+    start_date: datetime,
+    end_date: datetime,
+    granularity: str,
+    requested_metric: str | None,
+) -> tuple[list[dict], list[str]]:
+    """Fetch LLM metrics series."""
+    series: list[dict] = []
+    errors: list[str] = []
+    llm_keys = MetricsQueryService.LLM_METRIC_KEYS
+
+    try:
+        llm_split = MetricsQueryService.get_llm_metrics_split(
+            org_id, start_date, end_date, granularity
+        )
+        for name, data in llm_split.items():
+            if not requested_metric or name == requested_metric:
+                series.append(_build_series_entry(name, data))
+    except Exception:
+        logger.exception("Failed to fetch LLM metrics")
+        for name in llm_keys:
+            if not requested_metric or name == requested_metric:
+                errors.append(name)
+                series.append(_build_error_entry(name))
+
+    return series, errors
+
+
 def _fetch_live_series(
     org_id: str,
     start_date: datetime,
     end_date: datetime,
     granularity: str,
     metric_queries: dict,
     requested_metric: str | None,
 ) -> tuple[list[dict], list[str]]:
-    """Fetch all metric series (LLM combined + individual).
-
-    Returns:
-        Tuple of (series list, error names list).
-    """
+    """Fetch all metric series (LLM combined + individual)."""
     series: list[dict] = []
     errors: list[str] = []
-    llm_metric_keys = MetricsQueryService.LLM_METRIC_KEYS
 
-    # Fetch all 4 LLM metrics in a single query
-    if not requested_metric or requested_metric in llm_metric_keys:
-        try:
-            llm_split = MetricsQueryService.get_llm_metrics_split(
-                org_id,
-                start_date,
-                end_date,
-                granularity,
-            )
-            for name, data in llm_split.items():
-                if not requested_metric or name == requested_metric:
-                    series.append(_build_series_entry(name, data))
-        except Exception:
-            logger.exception("Failed to fetch LLM metrics")
-            for name in llm_metric_keys:
-                if not requested_metric or name == requested_metric:
-                    errors.append(name)
-                    series.append(_build_error_entry(name))
+    # Fetch LLM metrics
+    if not requested_metric or requested_metric in MetricsQueryService.LLM_METRIC_KEYS:
+        llm_series, llm_errors = _fetch_llm_series(
+            org_id, start_date, end_date, granularity, requested_metric
+        )
+        series.extend(llm_series)
+        errors.extend(llm_errors)
 
     # Filter non-LLM metrics if a specific metric was requested
     ...
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/dashboard_metrics/views.py` around lines 82 - 134, The
_fetch_live_series function is too complex; extract the LLM-specific fetch and
error handling into a helper to reduce nesting. Create a helper (e.g.,
fetch_llm_series(org_id, start_date, end_date, granularity, requested_metric))
that calls MetricsQueryService.get_llm_metrics_split, builds series entries with
_build_series_entry, and on exception logs and returns error names and error
entries using _build_error_entry; then simplify _fetch_live_series to call that
helper when appropriate and only handle the remaining metric_queries loop
(calling each query_fn and using _build_series_entry/_build_error_entry) so the
main function becomes a simple orchestration of fetch_llm_series(...) + the
per-metric loop.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/dashboard_metrics/views.py`:
- Around line 851-860: The metric mapping in metric_queries is missing the
failed_pages entry, causing live-series queries for "failed_pages" to return
empty; add "failed_pages": MetricsQueryService.get_failed_pages to the
metric_queries dict so that the failed_pages histogram is routed to
MetricsQueryService.get_failed_pages (update the metric_queries definition where
it lists documents_processed, pages_processed, etc., and ensure the key matches
"failed_pages").

---

Duplicate comments:
In `@backend/dashboard_metrics/views.py`:
- Around line 72-79: The error entry returned by _build_error_entry is missing
the metric_type field, causing inconsistent response shapes; update
_build_error_entry (function name: _build_error_entry) to accept an optional
metric_type parameter (e.g., metric_type: Optional[str] = None) and include
"metric_type": metric_type (or a default like "unavailable"/None) in the
returned dict so error entries match the successful entries' shape.

---

Nitpick comments:
In `@backend/dashboard_metrics/views.py`:
- Around line 82-134: The _fetch_live_series function is too complex; extract
the LLM-specific fetch and error handling into a helper to reduce nesting.
Create a helper (e.g., fetch_llm_series(org_id, start_date, end_date,
granularity, requested_metric)) that calls
MetricsQueryService.get_llm_metrics_split, builds series entries with
_build_series_entry, and on exception logs and returns error names and error
entries using _build_error_entry; then simplify _fetch_live_series to call that
helper when appropriate and only handle the remaining metric_queries loop
(calling each query_fn and using _build_series_entry/_build_error_entry) so the
main function becomes a simple orchestration of fetch_llm_series(...) + the
per-metric loop.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 04a9559c-afd2-425d-9cd7-053321776b6d

📥 Commits

Reviewing files that changed from the base of the PR and between 0cece48 and 3609c9e.

📒 Files selected for processing (1)
  • backend/dashboard_metrics/views.py

@athul-rs athul-rs changed the title UN-1798 [FIX] Optimize dashboard metrics aggregation task UN-3322 [FIX] Optimize dashboard metrics aggregation task Mar 11, 2026
Copy link
Contributor

@muhammad-ali-e muhammad-ali-e left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Findings

Critical Issues

1. Active org prefilter is too narrow (data integrity risk)

tasks.py ~line 460

The active_org_ids prefilter queries only WorkflowExecution.created_at >= daily_start (7 days). But several metrics query independent tables:

  • pages_processedPageUsage
  • llm_* metrics → Usage
  • hitl_reviews, hitl_completionsHITLQueue

An org with recent HITL completions or LLM usage but no WorkflowExecution in the last 7 days gets silently skipped. Monthly totals go stale. The PR description acknowledges the 7-day window is intentional for dormant orgs, but the dormancy signal (WorkflowExecution) doesn't cover all metric sources.

Suggestion: Union org IDs from all source tables, or at minimum add Usage and HITLQueue to the prefilter query.

2. Race condition in _acquire_aggregation_lock()

tasks.py ~lines 223-240

The cache.delete() + cache.add() sequence is non-atomic. Between delete and add, another worker can grab the lock, leading to two concurrent aggregation runs.

Suggestion: Use Redis GETSET or SET ... NX EX via get_redis_connection() for an atomic compare-and-swap. Given this runs every 15 minutes and the window is small, this is low-probability but worth fixing since the whole point of the lock is correctness.


Medium Issues

3. failed_pages missing from metric_queries in views.py

views.py ~line 792-829

The metric_queries dict in live_series doesn't include "failed_pages": MetricsQueryService.get_failed_pages. This means GET /live-series?metric_name=failed_pages returns empty series. The tasks.py aggregation still handles it, but the live fallback path is broken.

4. Zeroed metrics for absent LLM data

tasks.py ~lines 360-383

The LLM combined query always returns all 4 fields per row. When a period has LLM calls but no challenges, challenges is 0 (not NULL). _upsert_agg still creates a record with value=0. This writes zero-value rows that didn't exist before. Not incorrect per se, but inflates row counts and changes behavior from the original code which only wrote rows when data existed.


Minor / Nitpicks

5. _metric_type() in views.py is incomplete

The _HISTOGRAM_METRICS frozenset covers llm_usage, pages_processed, failed_pages. This is correct today but is a second source of truth alongside LLM_METRIC_TYPES in backfill and llm_combined_fields in tasks. Consider a single canonical mapping.

6. ClassVar annotation for LLM_METRIC_TYPES

backfill_metrics.py line 61: LLM_METRIC_TYPES: dict[str, bool] is a mutable class attribute. Adding ClassVar would make the intent explicit for static analyzers.

7. Cognitive complexity

_aggregate_single_metric (16) and _aggregate_llm_combined (26) exceed typical thresholds. The shared _upsert_agg helper was a good step — extracting the daily+monthly fan-out into a similar helper would reduce nesting further.


What's Good

  • Merging 4 LLM queries into 1 via Django conditional aggregation is clean and well-proven
  • The daily+monthly query merge (reusing DAY granularity for both) is a sound optimization
  • _resolve_org_identifier with pre-resolution avoids N+1 lookups on PageUsage
  • Self-healing lock concept is the right approach for handling OOM-killed workers
  • PR description is exemplary — trade-offs, testing plan, and backwards compatibility are all addressed
  • The _upsert_agg extraction reduces code duplication nicely

Severity Count Key Items
Critical 2 Narrow org prefilter skips metrics; lock race condition
Medium 2 Missing failed_pages in views; zero-value row inflation
Minor 3 Duplicate metric type sources; ClassVar; complexity

@github-actions
Copy link
Contributor

Test Results

Summary
  • Runner Tests: 11 passed, 0 failed (11 total)
  • SDK1 Tests: 63 passed, 0 failed (63 total)

Runner Tests - Full Report
filepath function $$\textcolor{#23d18b}{\tt{passed}}$$ SUBTOTAL
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_logs}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_cleanup}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_cleanup\_skip}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_client\_init}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_image\_exists}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_image}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_container\_run\_config}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_container\_run\_config\_without\_mount}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_run\_container}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_image\_for\_sidecar}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_sidecar\_container}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{TOTAL}}$$ $$\textcolor{#23d18b}{\tt{11}}$$ $$\textcolor{#23d18b}{\tt{11}}$$
SDK1 Tests - Full Report
filepath function $$\textcolor{#23d18b}{\tt{passed}}$$ SUBTOTAL
$$\textcolor{#23d18b}{\tt{tests/test\_platform.py}}$$ $$\textcolor{#23d18b}{\tt{TestPlatformHelperRetry.test\_success\_on\_first\_attempt}}$$ $$\textcolor{#23d18b}{\tt{2}}$$ $$\textcolor{#23d18b}{\tt{2}}$$
$$\textcolor{#23d18b}{\tt{tests/test\_platform.py}}$$ $$\textcolor{#23d18b}{\tt{TestPlatformHelperRetry.test\_retry\_on\_connection\_error}}$$ $$\textcolor{#23d18b}{\tt{2}}$$ $$\textcolor{#23d18b}{\tt{2}}$$
$$\textcolor{#23d18b}{\tt{tests/test\_platform.py}}$$ $$\textcolor{#23d18b}{\tt{TestPlatformHelperRetry.test\_non\_retryable\_http\_error}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/test\_platform.py}}$$ $$\textcolor{#23d18b}{\tt{TestPlatformHelperRetry.test\_retryable\_http\_errors}}$$ $$\textcolor{#23d18b}{\tt{3}}$$ $$\textcolor{#23d18b}{\tt{3}}$$
$$\textcolor{#23d18b}{\tt{tests/test\_platform.py}}$$ $$\textcolor{#23d18b}{\tt{TestPlatformHelperRetry.test\_post\_method\_retry}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/test\_platform.py}}$$ $$\textcolor{#23d18b}{\tt{TestPlatformHelperRetry.test\_retry\_logging}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/test\_prompt.py}}$$ $$\textcolor{#23d18b}{\tt{TestPromptToolRetry.test\_success\_on\_first\_attempt}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/test\_prompt.py}}$$ $$\textcolor{#23d18b}{\tt{TestPromptToolRetry.test\_retry\_on\_errors}}$$ $$\textcolor{#23d18b}{\tt{2}}$$ $$\textcolor{#23d18b}{\tt{2}}$$
$$\textcolor{#23d18b}{\tt{tests/test\_prompt.py}}$$ $$\textcolor{#23d18b}{\tt{TestPromptToolRetry.test\_wrapper\_methods\_retry}}$$ $$\textcolor{#23d18b}{\tt{4}}$$ $$\textcolor{#23d18b}{\tt{4}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestIsRetryableError.test\_connection\_error\_is\_retryable}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestIsRetryableError.test\_timeout\_is\_retryable}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestIsRetryableError.test\_http\_error\_retryable\_status\_codes}}$$ $$\textcolor{#23d18b}{\tt{3}}$$ $$\textcolor{#23d18b}{\tt{3}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestIsRetryableError.test\_http\_error\_non\_retryable\_status\_codes}}$$ $$\textcolor{#23d18b}{\tt{5}}$$ $$\textcolor{#23d18b}{\tt{5}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestIsRetryableError.test\_http\_error\_without\_response}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestIsRetryableError.test\_os\_error\_retryable\_errno}}$$ $$\textcolor{#23d18b}{\tt{5}}$$ $$\textcolor{#23d18b}{\tt{5}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestIsRetryableError.test\_os\_error\_non\_retryable\_errno}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestIsRetryableError.test\_other\_exception\_not\_retryable}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCalculateDelay.test\_exponential\_backoff\_without\_jitter}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCalculateDelay.test\_exponential\_backoff\_with\_jitter}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCalculateDelay.test\_max\_delay\_cap}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCalculateDelay.test\_max\_delay\_cap\_with\_jitter}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryWithExponentialBackoff.test\_successful\_call\_first\_attempt}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryWithExponentialBackoff.test\_retry\_after\_transient\_failure}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryWithExponentialBackoff.test\_max\_retries\_exceeded}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryWithExponentialBackoff.test\_retry\_with\_custom\_predicate}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryWithExponentialBackoff.test\_no\_retry\_with\_predicate\_false}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryWithExponentialBackoff.test\_exception\_not\_in\_tuple\_not\_retried}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_default\_configuration}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_environment\_variable\_configuration}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_invalid\_max\_retries}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_invalid\_base\_delay}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_invalid\_multiplier}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_jitter\_values}}$$ $$\textcolor{#23d18b}{\tt{2}}$$ $$\textcolor{#23d18b}{\tt{2}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_custom\_exceptions\_only}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_custom\_predicate\_only}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_both\_exceptions\_and\_predicate}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestCreateRetryDecorator.test\_exceptions\_match\_but\_predicate\_false}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestPreconfiguredDecorators.test\_retry\_platform\_service\_call\_exists}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestPreconfiguredDecorators.test\_retry\_prompt\_service\_call\_exists}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestPreconfiguredDecorators.test\_platform\_service\_decorator\_retries\_on\_connection\_error}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestPreconfiguredDecorators.test\_prompt\_service\_decorator\_retries\_on\_timeout}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryLogging.test\_warning\_logged\_on\_retry}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryLogging.test\_info\_logged\_on\_success\_after\_retry}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{tests/utils/test\_retry\_utils.py}}$$ $$\textcolor{#23d18b}{\tt{TestRetryLogging.test\_exception\_logged\_on\_giving\_up}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{TOTAL}}$$ $$\textcolor{#23d18b}{\tt{63}}$$ $$\textcolor{#23d18b}{\tt{63}}$$

@sonarqubecloud
Copy link

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 17, 2026

Greptile Summary

This PR optimizes the periodic aggregate_metrics_from_sources Celery task by reducing DB queries per active org by ~55%, tightening the active-org filter window from 2 months to 7 days, and adding a self-healing Redis distributed lock to recover from stuck-lock incidents caused by OOM-killed workers.

Key changes:

  • Combined LLM query (get_llm_metrics_combined): Merges llm_calls, challenges, summarization_calls, and llm_usage into a single Django conditional aggregation query (Count(filter=Q(...)), Sum), saving 3 queries per org per granularity tier.
  • Daily+monthly query merge: A single DAY-granularity query spanning monthly_start replaces separate daily and monthly queries; results are split in Python — saving ~9 queries/org per task run.
  • Cached org_identifier: Organization.organization_id is resolved once per org (via org.organization_id on the pre-fetched ORM object) and passed to get_pages_processed as org_identifier, avoiding redundant DB lookups.
  • Self-healing lock: The lock now stores a Unix timestamp instead of the string "running". On contention, if the stored age exceeds AGGREGATION_LOCK_TIMEOUT, the lock is reclaimed; the old "running" string value is handled via TypeError/ValueError fallback.
  • Two issues found in the lock implementation:
    • When cache.add fails but the subsequent cache.get returns None (lock expired between the two calls), the function returns False and skips the run. Since the lock is free at that point, it should retry cache.add instead.
    • The stale-lock reclaim uses a non-atomic cache.delete + cache.add sequence combined with an application-clock age check. With clock drift across Celery workers, two workers can independently conclude the lock is stale and the delete step can remove a lock held by a still-running legitimate worker before either add succeeds — creating a window for duplicate concurrent runs. An atomic Lua CAS script on the Redis client would eliminate this race.

Confidence Score: 3/5

  • The aggregation logic and service layer changes are sound, but two logic issues in the self-healing lock implementation could cause a missed aggregation cycle or, in multi-worker deployments with clock drift, allow two aggregation runs to overlap briefly.
  • The query-reduction work (combined LLM query, daily+monthly merge, org_identifier caching) is correct and well-structured. The services.py, views.py, and backfill_metrics.py changes are safe. The score is lowered to 3 because of the two issues in _acquire_aggregation_lock: (1) returning False when the lock expires between cache.add and cache.get unnecessarily skips a run, and (2) the non-atomic cache.delete + cache.add stale-reclaim combined with a wall-clock age check creates a TOCTOU window where clock-drifted workers could both reclaim a still-valid lock, potentially running two concurrent aggregations.
  • Pay close attention to backend/dashboard_metrics/tasks.py, specifically the _acquire_aggregation_lock function (lines 211–251).

Important Files Changed

Filename Overview
backend/dashboard_metrics/tasks.py Core aggregation task refactored with self-healing Redis lock, combined LLM query path, and daily+monthly query merge; two logic issues found in the lock implementation (missed re-acquisition on race, non-atomic stale reclaim).
backend/dashboard_metrics/services.py Added get_llm_metrics_combined and get_llm_metrics_split using Django conditional aggregation; removed org_identifier from get_failed_pages (correct, uses FK join); changes are functionally equivalent to the original four separate queries.
backend/dashboard_metrics/management/commands/backfill_metrics.py LLM metrics removed from METRIC_CONFIGS and handled via get_llm_metrics_split; new LLM_METRIC_TYPES dict tracks histogram/counter classification; logic is correct and consistent with the tasks.py changes.
backend/dashboard_metrics/views.py Extracted _fetch_live_series helper and _metric_type utility; live_series now uses combined LLM query; _HISTOGRAM_METRICS frozenset correctly broadened to include pages_processed and failed_pages in addition to llm_usage.

Sequence Diagram

sequenceDiagram
    participant Celery as Celery Scheduler
    participant Task as aggregate_metrics_from_sources
    participant Lock as _acquire_aggregation_lock
    participant Redis as Redis Cache
    participant DB as PostgreSQL
    participant Agg as Aggregation Tables

    Celery->>Task: trigger (every 15 min)
    Task->>Lock: _acquire_aggregation_lock()
    Lock->>Redis: cache.add(key, timestamp, 900s)
    alt Lock free
        Redis-->>Lock: True (acquired)
        Lock-->>Task: True
    else Lock held
        Redis-->>Lock: False
        Lock->>Redis: cache.get(key)
        alt Value is None (expired in race)
            Redis-->>Lock: None
            Lock-->>Task: False (skip — missed re-acquire opportunity)
        else Value is old "running" string
            Lock->>Redis: cache.delete + cache.add
            Lock-->>Task: True (reclaimed)
        else Timestamp value, age > 900s
            Lock->>Redis: cache.delete + cache.add (non-atomic)
            Lock-->>Task: True (stale reclaim)
        else Lock is legitimate
            Lock-->>Task: False (skip)
        end
    end

    Task->>DB: WorkflowExecution filter (last 7 days) — active org IDs
    DB-->>Task: active_org_ids set

    loop For each active org
        Task->>DB: get_llm_metrics_combined (HOUR granularity)
        DB-->>Task: {period, llm_calls, challenges, summarization_calls, llm_usage}
        Task->>DB: get_llm_metrics_combined (DAY granularity, from monthly_start)
        DB-->>Task: rows split in Python → daily_agg + monthly_agg

        loop For each non-LLM metric
            Task->>DB: query_method (HOUR granularity)
            DB-->>Task: hourly rows
            Task->>DB: query_method (DAY granularity, from monthly_start)
            DB-->>Task: rows split in Python → daily_agg + monthly_agg
        end

        Task->>Agg: bulk_create (EventMetricsHourly, ON CONFLICT UPDATE)
        Task->>Agg: bulk_create (EventMetricsDaily, ON CONFLICT UPDATE)
        Task->>Agg: bulk_create (EventMetricsMonthly, ON CONFLICT UPDATE)
    end

    Task->>Redis: cache.delete(lock_key)
Loading

Comments Outside Diff (1)

  1. backend/dashboard_metrics/tasks.py, line 562-563 (link)

    P2 Inconsistent logging style

    All other logger calls in this file use the %s lazy-interpolation style (e.g., logger.exception("Error querying %s for org %s", metric_name, org_id)). Using an f-string here eagerly evaluates the string even when the log level is suppressed and prevents log aggregation tools from grouping identical messages by their template.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: backend/dashboard_metrics/tasks.py
    Line: 562-563
    
    Comment:
    **Inconsistent logging style**
    
    All other `logger` calls in this file use the `%s` lazy-interpolation style (e.g., `logger.exception("Error querying %s for org %s", metric_name, org_id)`). Using an f-string here eagerly evaluates the string even when the log level is suppressed and prevents log aggregation tools from grouping identical messages by their template.
    
    
    
    How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix All With AI
This is a comment left during a code review.
Path: backend/dashboard_metrics/tasks.py
Line: 228-231

Comment:
**Missed re-acquisition on expired lock**

When `cache.add` fails (lock held) and the subsequent `cache.get` returns `None`, the lock expired naturally between those two Redis calls — meaning the lock is now free. Returning `False` here causes the current invocation to skip aggregation entirely, even though there is no competing run. The next scheduled invocation 15 minutes later is the first that will successfully acquire.

Since the lock is demonstrably free at this point, the function should retry `cache.add` rather than aborting:

```suggestion
    lock_value = cache.get(AGGREGATION_LOCK_KEY)
    if lock_value is None:
        # Expired between our check and get — lock is now free, try to acquire it
        return cache.add(AGGREGATION_LOCK_KEY, str(now), AGGREGATION_LOCK_TIMEOUT)
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: backend/dashboard_metrics/tasks.py
Line: 241-249

Comment:
**TOCTOU race in stale lock reclaim may allow duplicate concurrent runs**

The stale-lock reclaim uses a non-atomic `cache.delete` + `cache.add` pattern combined with a wall-clock age check. This creates a real race window:

1. The stale detection compares `now - lock_time` (both from application-server clocks) against `AGGREGATION_LOCK_TIMEOUT`.
2. If two Celery workers run on hosts whose clocks differ by even a few seconds, both workers can independently compute `age > AGGREGATION_LOCK_TIMEOUT` while the legitimate holder is still running.
3. Worker A deletes the lock and wins the `cache.add`. Worker B's `add` fails and skips (good). But the original holder (Worker C, whose task is still running) no longer has a valid lock — Worker A is now running in parallel.

The `delete``add` two-step is not atomic in Redis. A correct approach would use a Lua CAS script:
```python
# Atomic CAS: only delete and re-set if the current value matches what we read
LUA_RECLAIM = """
if redis.call('get', KEYS[1]) == ARGV[1] then
    return redis.call('set', KEYS[1], ARGV[2], 'EX', ARGV[3])
else
    return nil
end
"""
```
If using Django's `cache` abstraction (which wraps Redis), calling `cache.client.eval(LUA_RECLAIM, ...)` or exposing the raw Redis client for this operation would make the reclaim atomic and eliminate the TOCTOU window.

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: backend/dashboard_metrics/tasks.py
Line: 562-563

Comment:
**Inconsistent logging style**

All other `logger` calls in this file use the `%s` lazy-interpolation style (e.g., `logger.exception("Error querying %s for org %s", metric_name, org_id)`). Using an f-string here eagerly evaluates the string even when the log level is suppressed and prevents log aggregation tools from grouping identical messages by their template.

```suggestion
            logger.exception("Error processing org %s: %s", org_id, e)
```

How can I resolve this? If you propose a fix, please make it concise.

Last reviewed commit: fdc791f

Comment on lines +228 to +231
lock_value = cache.get(AGGREGATION_LOCK_KEY)
if lock_value is None:
# Expired between our check and get — race is fine, next run will pick up
return False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Missed re-acquisition on expired lock

When cache.add fails (lock held) and the subsequent cache.get returns None, the lock expired naturally between those two Redis calls — meaning the lock is now free. Returning False here causes the current invocation to skip aggregation entirely, even though there is no competing run. The next scheduled invocation 15 minutes later is the first that will successfully acquire.

Since the lock is demonstrably free at this point, the function should retry cache.add rather than aborting:

Suggested change
lock_value = cache.get(AGGREGATION_LOCK_KEY)
if lock_value is None:
# Expired between our check and get — race is fine, next run will pick up
return False
lock_value = cache.get(AGGREGATION_LOCK_KEY)
if lock_value is None:
# Expired between our check and get — lock is now free, try to acquire it
return cache.add(AGGREGATION_LOCK_KEY, str(now), AGGREGATION_LOCK_TIMEOUT)
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/dashboard_metrics/tasks.py
Line: 228-231

Comment:
**Missed re-acquisition on expired lock**

When `cache.add` fails (lock held) and the subsequent `cache.get` returns `None`, the lock expired naturally between those two Redis calls — meaning the lock is now free. Returning `False` here causes the current invocation to skip aggregation entirely, even though there is no competing run. The next scheduled invocation 15 minutes later is the first that will successfully acquire.

Since the lock is demonstrably free at this point, the function should retry `cache.add` rather than aborting:

```suggestion
    lock_value = cache.get(AGGREGATION_LOCK_KEY)
    if lock_value is None:
        # Expired between our check and get — lock is now free, try to acquire it
        return cache.add(AGGREGATION_LOCK_KEY, str(now), AGGREGATION_LOCK_TIMEOUT)
```

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code already has an inline comment acknowledging this: "race is fine, next run will pick up". The task runs every 15 minutes with
idempotent upserts — skipping one cycle has zero impact.

Comment on lines +241 to +249
age = now - lock_time
if age > AGGREGATION_LOCK_TIMEOUT:
logger.warning(
"Reclaiming stale aggregation lock (age=%.0fs, timeout=%ds)",
age,
AGGREGATION_LOCK_TIMEOUT,
)
cache.delete(AGGREGATION_LOCK_KEY)
return cache.add(AGGREGATION_LOCK_KEY, str(now), AGGREGATION_LOCK_TIMEOUT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 TOCTOU race in stale lock reclaim may allow duplicate concurrent runs

The stale-lock reclaim uses a non-atomic cache.delete + cache.add pattern combined with a wall-clock age check. This creates a real race window:

  1. The stale detection compares now - lock_time (both from application-server clocks) against AGGREGATION_LOCK_TIMEOUT.
  2. If two Celery workers run on hosts whose clocks differ by even a few seconds, both workers can independently compute age > AGGREGATION_LOCK_TIMEOUT while the legitimate holder is still running.
  3. Worker A deletes the lock and wins the cache.add. Worker B's add fails and skips (good). But the original holder (Worker C, whose task is still running) no longer has a valid lock — Worker A is now running in parallel.

The deleteadd two-step is not atomic in Redis. A correct approach would use a Lua CAS script:

# Atomic CAS: only delete and re-set if the current value matches what we read
LUA_RECLAIM = """
if redis.call('get', KEYS[1]) == ARGV[1] then
    return redis.call('set', KEYS[1], ARGV[2], 'EX', ARGV[3])
else
    return nil
end
"""

If using Django's cache abstraction (which wraps Redis), calling cache.client.eval(LUA_RECLAIM, ...) or exposing the raw Redis client for this operation would make the reclaim atomic and eliminate the TOCTOU window.

Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/dashboard_metrics/tasks.py
Line: 241-249

Comment:
**TOCTOU race in stale lock reclaim may allow duplicate concurrent runs**

The stale-lock reclaim uses a non-atomic `cache.delete` + `cache.add` pattern combined with a wall-clock age check. This creates a real race window:

1. The stale detection compares `now - lock_time` (both from application-server clocks) against `AGGREGATION_LOCK_TIMEOUT`.
2. If two Celery workers run on hosts whose clocks differ by even a few seconds, both workers can independently compute `age > AGGREGATION_LOCK_TIMEOUT` while the legitimate holder is still running.
3. Worker A deletes the lock and wins the `cache.add`. Worker B's `add` fails and skips (good). But the original holder (Worker C, whose task is still running) no longer has a valid lock — Worker A is now running in parallel.

The `delete``add` two-step is not atomic in Redis. A correct approach would use a Lua CAS script:
```python
# Atomic CAS: only delete and re-set if the current value matches what we read
LUA_RECLAIM = """
if redis.call('get', KEYS[1]) == ARGV[1] then
    return redis.call('set', KEYS[1], ARGV[2], 'EX', ARGV[3])
else
    return nil
end
"""
```
If using Django's `cache` abstraction (which wraps Redis), calling `cache.client.eval(LUA_RECLAIM, ...)` or exposing the raw Redis client for this operation would make the reclaim atomic and eliminate the TOCTOU window.

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on what TOCTOU race condition is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TOCTOU = Time-of-Check, Time-of-Use race condition.

It happens when you check a condition, then act on it later, but the condition can change between the check and the action.

In your code:

Time-of-Check (lines 234-240):

lock_time = float(lock_value)
age = now - lock_time
if age > AGGREGATION_LOCK_TIMEOUT:  # ← CHECK: "is this stale?"

Time-of-Use (lines 241-249):

    cache.delete(AGGREGATION_LOCK_KEY)  # ← USE: delete the "stale" lock
    return cache.add(AGGREGATION_LOCK_KEY, str(now), AGGREGATION_LOCK_TIMEOUT)

The race window:

Two workers (A and B) both read the same stale lock_value at roughly the same time. Both compute age > AGGREGATION_LOCK_TIMEOUT and decide "this is stale, I should reclaim it." Then:

  1. Worker A deletes the lock
  2. Worker A adds its own lock (succeeds)
  3. Worker B deletes the lock (deletes A's new lock!)
  4. Worker B adds its own lock (succeeds)

Now both A and B think they own the lock and run the aggregation in parallel, which can corrupt your metrics.

The fix is to make the check-and-swap atomic using a Lua script (like a database compare-and-swap), so only the worker that sees the exact stale value can reclaim it. If the value changed (because another worker already reclaimed it), the operation fails.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if two workers run
concurrently, all writes are INSERT...ON CONFLICT upserts, so results are correct (just redundant work). The suggested fix (Lua CAS script) is
over-engineered for a 15-minute batch job with idempotent operations.

Copy link
Contributor

@muhammad-ali-e muhammad-ali-e left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: UN-3322 [FIX] Optimize dashboard metrics aggregation task

Summary

Well-structured optimization PR — consolidates 4 LLM queries into 1 via Django conditional aggregation, merges daily+monthly queries, adds self-healing Redis lock, and tightens the active-org window from 2 months to 7 days. PR description is exemplary with explicit trade-off documentation. Findings below went through adversarial debate + fact-checking to filter false positives.

Automated Checks

  • ruff check: FAILservices.py:11 unsorted imports (I001)

Findings

Medium

# File Finding Suggestion
1 tasks.py ~line 463 Active-org prefilter narrowed to 7 days may skip HITL-only orgs. The WorkflowExecution-based filter (same pattern as main) misses orgs with recent HITLQueue activity but no WorkflowExecution in the past 7 days. Stale hitl_reviews/hitl_completions metrics for those orgs. Consider unioning HITLQueue org IDs into active_org_ids for completeness. Low urgency — same base pattern exists on main.
2 tasks.py ~line 240 Lock reclaim uses non-atomic cache.delete() + cache.add(). Narrow race window where two workers both detect staleness. Worst case is redundant idempotent upserts (last-writer-wins), not data corruption. Strictly better than main (which has zero stale-lock recovery). Consider cache.set() directly after staleness check instead of delete+add, or use django-redis lock primitive. Follow-up, not urgent.

Minor (pre-existing patterns, not introduced by this PR)

# File Finding Suggestion
3 services.py:219,227-228 Hardcoded strings "llm", "challenge", "summarize"UsageType and LLMUsageReason enums exist in usage_v2.models. Pre-existing from old code, carried into combined query. Use existing enums when convenient.
4 services.py:337,625 Hardcoded "ETL" stringPipeline.PipelineType.ETL enum exists. Pre-existing. Use the enum.
5 tasks.py + backfill_metrics.py Three _bulk_upsert_* functions duplicated (~90 lines) between tasks.py and backfill command. Also _truncate_to_hour duplicated. Extract to shared dashboard_metrics/aggregation.py module.
6 tasks.py:563 logger.exception(f"...{e}") anti-pattern — redundant since logger.exception already captures traceback. Bypasses lazy formatting. Use logger.exception("Error processing org %s", org_id).
7 views.py:698 Stale docstring: "all 9 metrics" — there are now 12. Update count or drop the number.

What's Good

  • Merging 4 LLM queries into 1 via Count(filter=Q(...)) conditional aggregation is clean
  • Daily+monthly query merge (reusing DAY granularity for both tiers) is a sound optimization
  • _resolve_org_identifier with pre-resolution avoids N+1 lookups on PageUsage
  • Self-healing lock is the right approach for OOM-killed workers — strictly better than main
  • _upsert_agg extraction reduces code duplication nicely
  • PR description documents trade-offs, testing plan, and backward compatibility thoroughly

Debated & Dismissed (false positives)

  • UUID vs string mismatch in backfill org_identifiers — Organization.id is BigAutoField (integer), not UUID. The isdigit() + int() conversion handles it correctly.
  • failed_pages missing from live_series — was never in live_series on main either. Not a regression.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants