From 5d8de783ece055806ee33474fe7c0da51b839f1c Mon Sep 17 00:00:00 2001 From: hemanthsavasere Date: Fri, 6 Mar 2026 06:09:45 +0000 Subject: [PATCH 1/2] Add Streaming metrics for Spark --- .../read/FlussAppendPartitionReader.scala | 23 +- .../spark/read/FlussMicroBatchStream.scala | 106 ++++++- .../read/FlussPartitionReaderFactory.scala | 27 +- .../read/FlussUpsertPartitionReader.scala | 37 ++- .../fluss/spark/read/MaxLongAccumulator.scala | 49 +++ .../fluss/spark/SparkStreamingMetricsIT.scala | 278 ++++++++++++++++++ 6 files changed, 505 insertions(+), 15 deletions(-) create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/MaxLongAccumulator.scala create mode 100644 fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingMetricsIT.scala diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala index 520d6ae718..2844f04f06 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala @@ -21,12 +21,18 @@ import org.apache.fluss.client.table.scanner.ScanRecord import org.apache.fluss.config.Configuration import org.apache.fluss.metadata.{TableBucket, TablePath} +import org.apache.spark.util.LongAccumulator + /** Partition reader that reads log data from a single Fluss table bucket. */ class FlussAppendPartitionReader( tablePath: TablePath, projection: Array[Int], flussPartition: FlussAppendInputPartition, - flussConfig: Configuration) + flussConfig: Configuration, + fetchRequestsAccum: LongAccumulator = null, + fetchTimeMsAccum: LongAccumulator = null, + maxFetchTimeMsAccum: MaxLongAccumulator = null, + fetchErrorsAccum: LongAccumulator = null) extends FlussPartitionReader(tablePath, flussConfig) { private val tableBucket: TableBucket = flussPartition.tableBucket @@ -44,7 +50,20 @@ class FlussAppendPartitionReader( initialize() private def pollMoreRecords(): Unit = { - val scanRecords = logScanner.poll(POLL_TIMEOUT) + val pollStartMs = System.currentTimeMillis() + val scanRecords = + try { + logScanner.poll(POLL_TIMEOUT) + } catch { + case e: Exception => + if (fetchErrorsAccum != null) fetchErrorsAccum.add(1L) + throw e + } finally { + val pollTimeMs = System.currentTimeMillis() - pollStartMs + if (fetchRequestsAccum != null) fetchRequestsAccum.add(1L) + if (fetchTimeMsAccum != null) fetchTimeMsAccum.add(pollTimeMs) + if (maxFetchTimeMsAccum != null) maxFetchTimeMsAccum.add(pollTimeMs) + } if ((scanRecords == null || scanRecords.isEmpty) && currentOffset < flussPartition.stopOffset) { throw new IllegalStateException(s"No more data from fluss server," + s" but current offset $currentOffset not reach the stop offset ${flussPartition.stopOffset}") diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala index e3d77a9d79..22c8f4bc8b 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala @@ -25,10 +25,12 @@ import org.apache.fluss.metadata.{PartitionInfo, TableBucket, TableInfo, TablePa import org.apache.fluss.utils.json.TableBucketOffsets import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} import org.apache.spark.sql.connector.read.streaming._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.LongAccumulator import java.util import java.util.Optional @@ -69,6 +71,28 @@ abstract class FlussMicroBatchStream( val stoppingOffsetsInitializer: OffsetsInitializer = FlussOffsetInitializers.stoppingOffsetsInitializer(false, options, flussConfig) + // Cached batch metadata for metrics reporting + @volatile protected var lastBatchPlannedInputRows: Long = 0L + @volatile protected var lastBatchNumBucketsRead: Int = 0 + + // Accumulators for executor-side scanner metrics + lazy val fetchRequestsAccum: LongAccumulator = + SparkSession.active.sparkContext.longAccumulator("flussFetchRequests") + lazy val fetchTimeMsAccum: LongAccumulator = + SparkSession.active.sparkContext.longAccumulator("flussFetchTimeMs") + lazy val maxFetchTimeMsAccum: MaxLongAccumulator = { + val acc = new MaxLongAccumulator + SparkSession.active.sparkContext.register(acc, "flussMaxFetchTimeMs") + acc + } + lazy val fetchErrorsAccum: LongAccumulator = + SparkSession.active.sparkContext.longAccumulator("flussFetchErrors") + + // Snapshots for computing per-batch deltas + private var prevFetchRequests: Long = 0L + private var prevFetchTimeMs: Long = 0L + private var prevFetchErrors: Long = 0L + protected def projection: Array[Int] = { val columnNameToIndex = tableInfo.getSchema.getColumnNames.asScala.zipWithIndex.toMap readSchema.fields.map { @@ -107,6 +131,13 @@ abstract class FlussMicroBatchStream( throw new UnsupportedOperationException(s"Only ReadAllAvailable is supported, but $readLimit") } + // Snapshot accumulator values at batch start for delta computation + prevFetchRequests = fetchRequestsAccum.value + prevFetchTimeMs = fetchTimeMsAccum.value + prevFetchErrors = fetchErrorsAccum.value + // Reset the max accumulator per batch (max is not delta-computable) + maxFetchTimeMsAccum.reset() + val latestTableBucketOffsets = if (allDataForTriggerAvailableNow.isDefined) { allDataForTriggerAvailableNow.get } else { @@ -152,8 +183,52 @@ abstract class FlussMicroBatchStream( } override def metrics(latestConsumedOffset: Optional[Offset]): util.Map[String, String] = { - // TODO add metrics - Map.empty[String, String].asJava + val metricsMap = new java.util.HashMap[String, String]() + + // Driver-side batch metrics + metricsMap.put("plannedInputRows", lastBatchPlannedInputRows.toString) + metricsMap.put("numBucketsRead", lastBatchNumBucketsRead.toString) + metricsMap.put("numBucketsTotal", tableInfo.getNumBuckets.toString) + + // Offset lag metrics (fetch fresh server offsets) + if (latestConsumedOffset.isPresent) { + try { + val consumed = latestConsumedOffset.get.asInstanceOf[FlussSourceOffset].tableBucketOffsets + val latestOpt = fetchLatestOffsets() + if (latestOpt.isDefined) { + val consumedMap = consumed.getOffsets.asScala + val latestMap = latestOpt.get.getOffsets.asScala + val lags = consumedMap.map { + case (bucket, consumedOff) => + val latestOff = + latestMap.get(bucket).map(Long.unbox).getOrElse(Long.unbox(consumedOff)) + math.max(0L, latestOff - Long.unbox(consumedOff)) + } + if (lags.nonEmpty) { + metricsMap.put("maxOffsetsBehindLatest", lags.max.toString) + metricsMap.put("avgOffsetsBehindLatest", f"${lags.sum.toDouble / lags.size}%.1f") + } + } + } catch { + case e: Exception => + logWarning("Failed to compute offset lag metrics", e) + } + } + + // Scanner metrics (from executor-side accumulators) + val batchFetchRequests = fetchRequestsAccum.value - prevFetchRequests + val batchFetchTimeMs = fetchTimeMsAccum.value - prevFetchTimeMs + val batchFetchErrors = fetchErrorsAccum.value - prevFetchErrors + metricsMap.put("batchFetchRequests", batchFetchRequests.toString) + metricsMap.put("maxFetchLatencyMs", maxFetchTimeMsAccum.value.toString) + metricsMap.put("totalFetchErrors", batchFetchErrors.toString) + if (batchFetchRequests > 0) { + metricsMap.put("avgFetchLatencyMs", f"${batchFetchTimeMs.toDouble / batchFetchRequests}%.1f") + } else { + metricsMap.put("avgFetchLatencyMs", "0.0") + } + + metricsMap } private def getOrCreateInitialPartitionOffsets(): TableBucketOffsets = { @@ -260,7 +335,15 @@ class FlussAppendMicroBatchStream( checkpointLocation) { override def createReaderFactory(): PartitionReaderFactory = { - new FlussAppendPartitionReaderFactory(tablePath, projection, options, flussConfig) + new FlussAppendPartitionReaderFactory( + tablePath, + projection, + options, + flussConfig, + fetchRequestsAccum, + fetchTimeMsAccum, + maxFetchTimeMsAccum, + fetchErrorsAccum) } override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = { @@ -290,6 +373,9 @@ class FlussAppendMicroBatchStream( } .filter(e => e.startOffset < e.stopOffset) .toArray + // Cache batch metadata for metrics reporting + lastBatchPlannedInputRows = inputPartitions.map(p => p.stopOffset - p.startOffset).sum + lastBatchNumBucketsRead = inputPartitions.length inputPartitions.map(_.asInstanceOf[InputPartition]) } } @@ -337,10 +423,22 @@ class FlussUpsertMicroBatchStream( } .filter(e => e.logStartingOffset < e.logStoppingOffset) .toArray + // Cache batch metadata for metrics reporting + lastBatchPlannedInputRows = + inputPartitions.map(p => p.logStoppingOffset - p.logStartingOffset).sum + lastBatchNumBucketsRead = inputPartitions.length inputPartitions.map(_.asInstanceOf[InputPartition]) } override def createReaderFactory(): PartitionReaderFactory = { - new FlussUpsertPartitionReaderFactory(tablePath, projection, options, flussConfig) + new FlussUpsertPartitionReaderFactory( + tablePath, + projection, + options, + flussConfig, + fetchRequestsAccum, + fetchTimeMsAccum, + maxFetchTimeMsAccum, + fetchErrorsAccum) } } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala index 13d374d384..64c4dc53db 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala @@ -23,13 +23,18 @@ import org.apache.fluss.metadata.TablePath import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.LongAccumulator /** Factory for creating partition readers to read data from Fluss. */ class FlussAppendPartitionReaderFactory( tablePath: TablePath, projection: Array[Int], options: CaseInsensitiveStringMap, - flussConfig: Configuration) + flussConfig: Configuration, + fetchRequestsAccum: LongAccumulator = null, + fetchTimeMsAccum: LongAccumulator = null, + maxFetchTimeMsAccum: MaxLongAccumulator = null, + fetchErrorsAccum: LongAccumulator = null) extends PartitionReaderFactory { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { @@ -38,8 +43,11 @@ class FlussAppendPartitionReaderFactory( tablePath, projection, flussPartition, - flussConfig - ) + flussConfig, + fetchRequestsAccum, + fetchTimeMsAccum, + maxFetchTimeMsAccum, + fetchErrorsAccum) } } @@ -48,7 +56,11 @@ class FlussUpsertPartitionReaderFactory( tablePath: TablePath, projection: Array[Int], options: CaseInsensitiveStringMap, - flussConfig: Configuration) + flussConfig: Configuration, + fetchRequestsAccum: LongAccumulator = null, + fetchTimeMsAccum: LongAccumulator = null, + maxFetchTimeMsAccum: MaxLongAccumulator = null, + fetchErrorsAccum: LongAccumulator = null) extends PartitionReaderFactory { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { @@ -57,7 +69,10 @@ class FlussUpsertPartitionReaderFactory( tablePath, projection, upsertPartition, - flussConfig - ) + flussConfig, + fetchRequestsAccum, + fetchTimeMsAccum, + maxFetchTimeMsAccum, + fetchErrorsAccum) } } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala index e8e56a2a5a..f643d981ad 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala @@ -30,6 +30,7 @@ import org.apache.fluss.spark.utils.LogChangesIterator import org.apache.fluss.utils.CloseableIterator import org.apache.spark.internal.Logging +import org.apache.spark.util.LongAccumulator import java.util.Comparator @@ -46,7 +47,11 @@ class FlussUpsertPartitionReader( tablePath: TablePath, projection: Array[Int], flussPartition: FlussUpsertInputPartition, - flussConfig: Configuration) + flussConfig: Configuration, + fetchRequestsAccum: LongAccumulator = null, + fetchTimeMsAccum: LongAccumulator = null, + maxFetchTimeMsAccum: MaxLongAccumulator = null, + fetchErrorsAccum: LongAccumulator = null) extends FlussPartitionReader(tablePath, flussConfig) with Logging { @@ -129,7 +134,20 @@ class FlussUpsertPartitionReader( var continue = true while (continue) { - val records = logScanner.poll(POLL_TIMEOUT) + val pollStartMs = System.currentTimeMillis() + val records = + try { + logScanner.poll(POLL_TIMEOUT) + } catch { + case e: Exception => + if (fetchErrorsAccum != null) fetchErrorsAccum.add(1L) + throw e + } finally { + val pollTimeMs = System.currentTimeMillis() - pollStartMs + if (fetchRequestsAccum != null) fetchRequestsAccum.add(1L) + if (fetchTimeMsAccum != null) fetchTimeMsAccum.add(pollTimeMs) + if (maxFetchTimeMsAccum != null) maxFetchTimeMsAccum.add(pollTimeMs) + } if (!records.isEmpty) { val flatRecords = records.asScala for (scanRecord <- flatRecords) { @@ -161,7 +179,20 @@ class FlussUpsertPartitionReader( override def hasNext: Boolean = { while ((currentBatch == null || !currentBatch.hasNext) && hasMoreBatches) { - val batch = snapshotScanner.pollBatch(POLL_TIMEOUT) + val pollStartMs = System.currentTimeMillis() + val batch = + try { + snapshotScanner.pollBatch(POLL_TIMEOUT) + } catch { + case e: Exception => + if (fetchErrorsAccum != null) fetchErrorsAccum.add(1L) + throw e + } finally { + val pollTimeMs = System.currentTimeMillis() - pollStartMs + if (fetchRequestsAccum != null) fetchRequestsAccum.add(1L) + if (fetchTimeMsAccum != null) fetchTimeMsAccum.add(pollTimeMs) + if (maxFetchTimeMsAccum != null) maxFetchTimeMsAccum.add(pollTimeMs) + } if (batch == null) { hasMoreBatches = false } else { diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/MaxLongAccumulator.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/MaxLongAccumulator.scala new file mode 100644 index 0000000000..5d9b09f163 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/MaxLongAccumulator.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.read + +import org.apache.spark.util.AccumulatorV2 + +/** Accumulator that tracks the maximum Long value across all tasks. */ +class MaxLongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] { + @volatile private var _max: Long = 0L + + override def isZero: Boolean = _max == 0L + + override def copy(): MaxLongAccumulator = { + val newAcc = new MaxLongAccumulator + newAcc._max = this._max + newAcc + } + + override def reset(): Unit = { _max = 0L } + + override def add(v: java.lang.Long): Unit = { + if (v > _max) _max = v + } + + override def merge(other: AccumulatorV2[java.lang.Long, java.lang.Long]): Unit = + other match { + case o: MaxLongAccumulator => if (o._max > _max) _max = o._max + case _ => + throw new UnsupportedOperationException( + s"Cannot merge ${other.getClass.getName} into MaxLongAccumulator") + } + + override def value: java.lang.Long = _max +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingMetricsIT.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingMetricsIT.scala new file mode 100644 index 0000000000..6d3023b07b --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingMetricsIT.scala @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark + +import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer} +import org.apache.fluss.spark.read.{FlussMicroBatchStream, FlussSourceOffset} +import org.apache.fluss.spark.write.{FlussAppendDataWriter, FlussUpsertDataWriter} +import org.apache.fluss.utils.json.TableBucketOffsets + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream} +import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters._ + +class SparkStreamingMetricsIT extends FlussSparkTestBase with StreamTest { + import testImplicits._ + + test("read: streaming source metrics") { + val tableName = "t_streaming_metrics" + val numBuckets = 3 + val numRecords = 5 + withTable(tableName) { + sql( + s"CREATE TABLE $tableName (id int, data string) TBLPROPERTIES('bucket.num' = '$numBuckets')") + + val schema = StructType(Seq(StructField("id", IntegerType), StructField("data", StringType))) + val clock = new StreamManualClock + + testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))( + StartStream(trigger = Trigger.ProcessingTime(500), clock), + AdvanceManualClock(500), + CheckNewAnswer(), + AddFlussData( + tableName, + schema, + Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"), Row(4, "d"), Row(5, "e"))), + AdvanceManualClock(500), + CheckLastBatch(Row(1, "a"), Row(2, "b"), Row(3, "c"), Row(4, "d"), Row(5, "e")), + AssertOnQuery { + q => + val progress = q.lastProgress + assert(progress != null, "lastProgress should not be null") + val metrics = progress.sources(0).metrics + + // All expected keys present + Seq( + "plannedInputRows", + "numBucketsRead", + "numBucketsTotal", + "maxOffsetsBehindLatest", + "avgOffsetsBehindLatest", + "batchFetchRequests", + "avgFetchLatencyMs", + "maxFetchLatencyMs", + "totalFetchErrors" + ).foreach(key => assert(metrics.containsKey(key), s"metric '$key' missing")) + + // plannedInputRows must match the number of records written + assert( + metrics.get("plannedInputRows").toLong == numRecords, + s"plannedInputRows should be $numRecords but was ${metrics.get("plannedInputRows")}") + + // numBucketsTotal must equal the configured bucket count + assert( + metrics.get("numBucketsTotal").toInt == numBuckets, + s"numBucketsTotal should be $numBuckets but was ${metrics.get("numBucketsTotal")}") + + // maxOffsetsBehindLatest must be >= 0 + assert( + metrics.get("maxOffsetsBehindLatest").toLong >= 0, + s"maxOffsetsBehindLatest should be >= 0 but was ${metrics + .get("maxOffsetsBehindLatest")}") + + // Scanner metrics (Phase 2) + assert( + metrics.get("batchFetchRequests").toLong > 0, + s"batchFetchRequests should be > 0 but was ${metrics.get("batchFetchRequests")}") + + val avgLatency = metrics.get("avgFetchLatencyMs").toDouble + assert(avgLatency >= 0.0, s"avgFetchLatencyMs should be >= 0 but was $avgLatency") + + val maxLatency = metrics.get("maxFetchLatencyMs").toLong + assert(maxLatency >= 0, s"maxFetchLatencyMs should be >= 0 but was $maxLatency") + assert( + maxLatency.toDouble >= avgLatency, + s"maxFetchLatencyMs ($maxLatency) should be >= avgFetchLatencyMs ($avgLatency)") + + assert( + metrics.get("totalFetchErrors").toLong == 0, + s"totalFetchErrors should be 0 but was ${metrics.get("totalFetchErrors")}") + + logInfo( + s"[metrics] batchFetchRequests=${metrics.get("batchFetchRequests")}" + + s" avgFetchLatencyMs=${metrics.get("avgFetchLatencyMs")}" + + s" maxFetchLatencyMs=${metrics.get("maxFetchLatencyMs")}" + + s" totalFetchErrors=${metrics.get("totalFetchErrors")}" + + s" plannedInputRows=${metrics.get("plannedInputRows")}" + + s" numBucketsRead=${metrics.get("numBucketsRead")}" + + s" numBucketsTotal=${metrics.get("numBucketsTotal")}" + + s" maxOffsetsBehindLatest=${metrics.get("maxOffsetsBehindLatest")}" + + s" avgOffsetsBehindLatest=${metrics.get("avgOffsetsBehindLatest")}") + + true + }, + // Second batch: verify per-batch delta is independent (accumulator reset logic) + AddFlussData(tableName, schema, Seq(Row(6, "f"), Row(7, "g"), Row(8, "h"))), + AdvanceManualClock(500), + CheckLastBatch(Row(6, "f"), Row(7, "g"), Row(8, "h")), + AssertOnQuery { + q => + val metrics = q.lastProgress.sources(0).metrics + // batchFetchRequests must reflect only THIS batch (delta), not cumulative + assert( + metrics.get("batchFetchRequests").toLong > 0, + s"batch 2 batchFetchRequests should be > 0 but was ${metrics.get("batchFetchRequests")}") + assert( + metrics.get("totalFetchErrors").toLong == 0, + s"batch 2 totalFetchErrors should be 0 but was ${metrics.get("totalFetchErrors")}") + logInfo( + s"[metrics batch2] batchFetchRequests=${metrics.get("batchFetchRequests")}" + + s" avgFetchLatencyMs=${metrics.get("avgFetchLatencyMs")}") + true + } + ) + } + } + + test("read: streaming source metrics for primary key table") { + val tableName = "t_streaming_metrics_pk" + val numBuckets = 1 + withTable(tableName) { + sql( + s"CREATE TABLE $tableName (id int, data string) " + + s"TBLPROPERTIES('primary.key' = 'id', 'bucket.num' = '$numBuckets')") + + val schema = StructType(Seq(StructField("id", IntegerType), StructField("data", StringType))) + val clock = new StreamManualClock + + testStream(spark.readStream.options(Map("scan.startup.mode" -> "latest")).table(tableName))( + StartStream(trigger = Trigger.ProcessingTime(500), clock), + AdvanceManualClock(500), + CheckNewAnswer(), + AddFlussData(tableName, schema, Seq(Row(1, "a"), Row(2, "b"))), + AdvanceManualClock(500), + CheckLastBatch(Row(1, "a"), Row(2, "b")), + AssertOnQuery { + q => + val progress = q.lastProgress + assert(progress != null, "lastProgress should not be null") + val metrics = progress.sources(0).metrics + + // All 9 metric keys must be present + Seq( + "plannedInputRows", + "numBucketsRead", + "numBucketsTotal", + "maxOffsetsBehindLatest", + "avgOffsetsBehindLatest", + "batchFetchRequests", + "avgFetchLatencyMs", + "maxFetchLatencyMs", + "totalFetchErrors" + ).foreach(key => assert(metrics.containsKey(key), s"metric '$key' missing")) + + assert( + metrics.get("numBucketsTotal").toInt == numBuckets, + s"numBucketsTotal should be $numBuckets but was ${metrics.get("numBucketsTotal")}") + + assert( + metrics.get("batchFetchRequests").toLong > 0, + s"batchFetchRequests should be > 0 but was ${metrics.get("batchFetchRequests")}") + + assert( + metrics.get("totalFetchErrors").toLong == 0, + s"totalFetchErrors should be 0 but was ${metrics.get("totalFetchErrors")}") + + logInfo( + s"[metrics] batchFetchRequests=${metrics.get("batchFetchRequests")}" + + s" avgFetchLatencyMs=${metrics.get("avgFetchLatencyMs")}" + + s" maxFetchLatencyMs=${metrics.get("maxFetchLatencyMs")}" + + s" totalFetchErrors=${metrics.get("totalFetchErrors")}" + + s" plannedInputRows=${metrics.get("plannedInputRows")}" + + s" numBucketsRead=${metrics.get("numBucketsRead")}" + + s" numBucketsTotal=${metrics.get("numBucketsTotal")}" + + s" maxOffsetsBehindLatest=${metrics.get("maxOffsetsBehindLatest")}" + + s" avgOffsetsBehindLatest=${metrics.get("avgOffsetsBehindLatest")}") + + true + } + ) + } + } + + case class AddFlussData(tableName: String, schema: StructType, dataArr: Seq[Row]) + extends AddData { + override def addData(query: Option[StreamExecution]): (SparkDataStream, Offset) = { + require(query.nonEmpty, "Cannot add data when there is no active query") + val sources = query.get.logicalPlan.collect { + case r: StreamingDataSourceV2Relation if r.stream.isInstanceOf[FlussMicroBatchStream] => + r.stream.asInstanceOf[FlussMicroBatchStream] + }.distinct + val tablePath = createTablePath(tableName) + if (!sources.exists(_.tablePath.equals(tablePath))) { + throw new IllegalArgumentException( + s"Could not find fluss stream source for table $tableName") + } + + val flussTable = loadFlussTable(tablePath) + val writer = if (flussTable.getTableInfo.hasPrimaryKey) { + FlussUpsertDataWriter(flussTable.getTableInfo.getTablePath, schema, conn.getConfiguration) + } else { + FlussAppendDataWriter(flussTable.getTableInfo.getTablePath, schema, conn.getConfiguration) + } + dataArr + .map { + row => + InternalRow.fromSeq(row.toSeq.map { + case v: String => UTF8String.fromString(v) + case v => v + }) + } + .foreach(writer.write) + writer.commit() + + val buckets = (0 until flussTable.getTableInfo.getNumBuckets).toSeq + val offsetsInitializer = OffsetsInitializer.latest() + val retriever = + new BucketOffsetsRetrieverImpl(admin, flussTable.getTableInfo.getTablePath) + val tableBucketOffsets = if (flussTable.getTableInfo.isPartitioned) { + val partitionInfos = + admin.listPartitionInfos(flussTable.getTableInfo.getTablePath).get() + val partitionOffsets = partitionInfos.asScala.map( + pi => + FlussMicroBatchStream.getLatestOffsets( + flussTable.getTableInfo, + offsetsInitializer, + retriever, + buckets, + Some(pi))) + val mergedOffsets = partitionOffsets + .map(_.getOffsets) + .reduce((l, r) => (l.asScala ++ r.asScala).asJava) + new TableBucketOffsets(flussTable.getTableInfo.getTableId, mergedOffsets) + } else { + FlussMicroBatchStream.getLatestOffsets( + flussTable.getTableInfo, + offsetsInitializer, + retriever, + buckets, + None) + } + + (sources.find(_.tablePath.equals(tablePath)).get, FlussSourceOffset(tableBucketOffsets)) + } + } +} From 9920f13233f0b9b36a28e7c478c7a4128fe79775 Mon Sep 17 00:00:00 2001 From: hemanthsavasere Date: Fri, 6 Mar 2026 09:09:05 +0000 Subject: [PATCH 2/2] Triggering GitHub build