Skip to content

Add Streaming metrics for Spark#2801

Open
hemanthsavasere wants to merge 2 commits intoapache:mainfrom
hemanthsavasere:2552-add-streaming-metrics-spark
Open

Add Streaming metrics for Spark#2801
hemanthsavasere wants to merge 2 commits intoapache:mainfrom
hemanthsavasere:2552-add-streaming-metrics-spark

Conversation

@hemanthsavasere
Copy link
Contributor

@hemanthsavasere hemanthsavasere commented Mar 6, 2026

Purpose

Linked issue: close #2552

Implement ReportsSourceMetrics.metrics() in FlussMicroBatchStream to surface streaming read health data in StreamingQueryProgress.sources[i].metrics. Previously this method returned an empty map.

Brief change log

  • Implement metrics() in FlussMicroBatchStream with 9 metrics:
    • Driver-side: plannedInputRows, numBucketsRead, numBucketsTotal, maxOffsetsBehindLatest, avgOffsetsBehindLatest
    • Executor-side (via Spark accumulators): batchFetchRequests, avgFetchLatencyMs, maxFetchLatencyMs, totalFetchErrors
    • Cache batch metadata (lastBatchPlannedInputRows, lastBatchNumBucketsRead) in planInputPartitions() on both FlussAppendMicroBatchStream and FlussUpsertMicroBatchStream
    • Add MaxLongAccumulator — a custom AccumulatorV2 for tracking maximum fetch latency across executors (Spark's built-in LongAccumulator only supports sum)
    • Instrument logScanner.poll() in FlussAppendPartitionReader and both logScanner.poll() / snapshotScanner.pollBatch() in FlussUpsertPartitionReader with timing and error tracking
    • Pass accumulators through FlussAppendPartitionReaderFactory and FlussUpsertPartitionReaderFactory to partition readers
    • Snapshot accumulator values at latestOffset() call to compute per-batch deltas; reset max accumulator per batch

Tests

  • Add SparkStreamingMetricsIT with two integration tests:
    • read: streaming metrics for log table — verifies all 9 metrics for an append-only table
    • read: streaming metrics for primary key table — verifies metrics for a primary-key (upsert) table
  • All 134 existing unit tests pas

API and Format

No API changes. New metrics keys are additive to StreamingQueryProgress.sources[i].metrics (a Map[String, String]).

Documentation

No documentation changes required — this exposes standard Spark streaming metrics infrastructure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[spark] Add streaming metrics

1 participant