Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ import org.apache.fluss.client.table.scanner.ScanRecord
import org.apache.fluss.config.Configuration
import org.apache.fluss.metadata.{TableBucket, TablePath}

import org.apache.spark.util.LongAccumulator

/** Partition reader that reads log data from a single Fluss table bucket. */
class FlussAppendPartitionReader(
tablePath: TablePath,
projection: Array[Int],
flussPartition: FlussAppendInputPartition,
flussConfig: Configuration)
flussConfig: Configuration,
fetchRequestsAccum: LongAccumulator = null,
fetchTimeMsAccum: LongAccumulator = null,
maxFetchTimeMsAccum: MaxLongAccumulator = null,
fetchErrorsAccum: LongAccumulator = null)
extends FlussPartitionReader(tablePath, flussConfig) {

private val tableBucket: TableBucket = flussPartition.tableBucket
Expand All @@ -44,7 +50,20 @@ class FlussAppendPartitionReader(
initialize()

private def pollMoreRecords(): Unit = {
val scanRecords = logScanner.poll(POLL_TIMEOUT)
val pollStartMs = System.currentTimeMillis()
val scanRecords =
try {
logScanner.poll(POLL_TIMEOUT)
} catch {
case e: Exception =>
if (fetchErrorsAccum != null) fetchErrorsAccum.add(1L)
throw e
} finally {
val pollTimeMs = System.currentTimeMillis() - pollStartMs
if (fetchRequestsAccum != null) fetchRequestsAccum.add(1L)
if (fetchTimeMsAccum != null) fetchTimeMsAccum.add(pollTimeMs)
if (maxFetchTimeMsAccum != null) maxFetchTimeMsAccum.add(pollTimeMs)
}
if ((scanRecords == null || scanRecords.isEmpty) && currentOffset < flussPartition.stopOffset) {
throw new IllegalStateException(s"No more data from fluss server," +
s" but current offset $currentOffset not reach the stop offset ${flussPartition.stopOffset}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import org.apache.fluss.metadata.{PartitionInfo, TableBucket, TableInfo, TablePa
import org.apache.fluss.utils.json.TableBucketOffsets

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory}
import org.apache.spark.sql.connector.read.streaming._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.LongAccumulator

import java.util
import java.util.Optional
Expand Down Expand Up @@ -69,6 +71,28 @@ abstract class FlussMicroBatchStream(
val stoppingOffsetsInitializer: OffsetsInitializer =
FlussOffsetInitializers.stoppingOffsetsInitializer(false, options, flussConfig)

// Cached batch metadata for metrics reporting
@volatile protected var lastBatchPlannedInputRows: Long = 0L
@volatile protected var lastBatchNumBucketsRead: Int = 0

// Accumulators for executor-side scanner metrics
lazy val fetchRequestsAccum: LongAccumulator =
SparkSession.active.sparkContext.longAccumulator("flussFetchRequests")
lazy val fetchTimeMsAccum: LongAccumulator =
SparkSession.active.sparkContext.longAccumulator("flussFetchTimeMs")
lazy val maxFetchTimeMsAccum: MaxLongAccumulator = {
val acc = new MaxLongAccumulator
SparkSession.active.sparkContext.register(acc, "flussMaxFetchTimeMs")
acc
}
lazy val fetchErrorsAccum: LongAccumulator =
SparkSession.active.sparkContext.longAccumulator("flussFetchErrors")

// Snapshots for computing per-batch deltas
private var prevFetchRequests: Long = 0L
private var prevFetchTimeMs: Long = 0L
private var prevFetchErrors: Long = 0L

protected def projection: Array[Int] = {
val columnNameToIndex = tableInfo.getSchema.getColumnNames.asScala.zipWithIndex.toMap
readSchema.fields.map {
Expand Down Expand Up @@ -107,6 +131,13 @@ abstract class FlussMicroBatchStream(
throw new UnsupportedOperationException(s"Only ReadAllAvailable is supported, but $readLimit")
}

// Snapshot accumulator values at batch start for delta computation
prevFetchRequests = fetchRequestsAccum.value
prevFetchTimeMs = fetchTimeMsAccum.value
prevFetchErrors = fetchErrorsAccum.value
// Reset the max accumulator per batch (max is not delta-computable)
maxFetchTimeMsAccum.reset()

val latestTableBucketOffsets = if (allDataForTriggerAvailableNow.isDefined) {
allDataForTriggerAvailableNow.get
} else {
Expand Down Expand Up @@ -152,8 +183,52 @@ abstract class FlussMicroBatchStream(
}

override def metrics(latestConsumedOffset: Optional[Offset]): util.Map[String, String] = {
// TODO add metrics
Map.empty[String, String].asJava
val metricsMap = new java.util.HashMap[String, String]()

// Driver-side batch metrics
metricsMap.put("plannedInputRows", lastBatchPlannedInputRows.toString)
metricsMap.put("numBucketsRead", lastBatchNumBucketsRead.toString)
metricsMap.put("numBucketsTotal", tableInfo.getNumBuckets.toString)

// Offset lag metrics (fetch fresh server offsets)
if (latestConsumedOffset.isPresent) {
try {
val consumed = latestConsumedOffset.get.asInstanceOf[FlussSourceOffset].tableBucketOffsets
val latestOpt = fetchLatestOffsets()
if (latestOpt.isDefined) {
val consumedMap = consumed.getOffsets.asScala
val latestMap = latestOpt.get.getOffsets.asScala
val lags = consumedMap.map {
case (bucket, consumedOff) =>
val latestOff =
latestMap.get(bucket).map(Long.unbox).getOrElse(Long.unbox(consumedOff))
math.max(0L, latestOff - Long.unbox(consumedOff))
}
if (lags.nonEmpty) {
metricsMap.put("maxOffsetsBehindLatest", lags.max.toString)
metricsMap.put("avgOffsetsBehindLatest", f"${lags.sum.toDouble / lags.size}%.1f")
}
}
} catch {
case e: Exception =>
logWarning("Failed to compute offset lag metrics", e)
}
}

// Scanner metrics (from executor-side accumulators)
val batchFetchRequests = fetchRequestsAccum.value - prevFetchRequests
val batchFetchTimeMs = fetchTimeMsAccum.value - prevFetchTimeMs
val batchFetchErrors = fetchErrorsAccum.value - prevFetchErrors
metricsMap.put("batchFetchRequests", batchFetchRequests.toString)
metricsMap.put("maxFetchLatencyMs", maxFetchTimeMsAccum.value.toString)
metricsMap.put("totalFetchErrors", batchFetchErrors.toString)
if (batchFetchRequests > 0) {
metricsMap.put("avgFetchLatencyMs", f"${batchFetchTimeMs.toDouble / batchFetchRequests}%.1f")
} else {
metricsMap.put("avgFetchLatencyMs", "0.0")
}

metricsMap
}

private def getOrCreateInitialPartitionOffsets(): TableBucketOffsets = {
Expand Down Expand Up @@ -260,7 +335,15 @@ class FlussAppendMicroBatchStream(
checkpointLocation) {

override def createReaderFactory(): PartitionReaderFactory = {
new FlussAppendPartitionReaderFactory(tablePath, projection, options, flussConfig)
new FlussAppendPartitionReaderFactory(
tablePath,
projection,
options,
flussConfig,
fetchRequestsAccum,
fetchTimeMsAccum,
maxFetchTimeMsAccum,
fetchErrorsAccum)
}

override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = {
Expand Down Expand Up @@ -290,6 +373,9 @@ class FlussAppendMicroBatchStream(
}
.filter(e => e.startOffset < e.stopOffset)
.toArray
// Cache batch metadata for metrics reporting
lastBatchPlannedInputRows = inputPartitions.map(p => p.stopOffset - p.startOffset).sum
lastBatchNumBucketsRead = inputPartitions.length
inputPartitions.map(_.asInstanceOf[InputPartition])
}
}
Expand Down Expand Up @@ -337,10 +423,22 @@ class FlussUpsertMicroBatchStream(
}
.filter(e => e.logStartingOffset < e.logStoppingOffset)
.toArray
// Cache batch metadata for metrics reporting
lastBatchPlannedInputRows =
inputPartitions.map(p => p.logStoppingOffset - p.logStartingOffset).sum
lastBatchNumBucketsRead = inputPartitions.length
inputPartitions.map(_.asInstanceOf[InputPartition])
}

override def createReaderFactory(): PartitionReaderFactory = {
new FlussUpsertPartitionReaderFactory(tablePath, projection, options, flussConfig)
new FlussUpsertPartitionReaderFactory(
tablePath,
projection,
options,
flussConfig,
fetchRequestsAccum,
fetchTimeMsAccum,
maxFetchTimeMsAccum,
fetchErrorsAccum)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@ import org.apache.fluss.metadata.TablePath
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.LongAccumulator

/** Factory for creating partition readers to read data from Fluss. */
class FlussAppendPartitionReaderFactory(
tablePath: TablePath,
projection: Array[Int],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
flussConfig: Configuration,
fetchRequestsAccum: LongAccumulator = null,
fetchTimeMsAccum: LongAccumulator = null,
maxFetchTimeMsAccum: MaxLongAccumulator = null,
fetchErrorsAccum: LongAccumulator = null)
extends PartitionReaderFactory {

override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
Expand All @@ -38,8 +43,11 @@ class FlussAppendPartitionReaderFactory(
tablePath,
projection,
flussPartition,
flussConfig
)
flussConfig,
fetchRequestsAccum,
fetchTimeMsAccum,
maxFetchTimeMsAccum,
fetchErrorsAccum)
}
}

Expand All @@ -48,7 +56,11 @@ class FlussUpsertPartitionReaderFactory(
tablePath: TablePath,
projection: Array[Int],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
flussConfig: Configuration,
fetchRequestsAccum: LongAccumulator = null,
fetchTimeMsAccum: LongAccumulator = null,
maxFetchTimeMsAccum: MaxLongAccumulator = null,
fetchErrorsAccum: LongAccumulator = null)
extends PartitionReaderFactory {

override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
Expand All @@ -57,7 +69,10 @@ class FlussUpsertPartitionReaderFactory(
tablePath,
projection,
upsertPartition,
flussConfig
)
flussConfig,
fetchRequestsAccum,
fetchTimeMsAccum,
maxFetchTimeMsAccum,
fetchErrorsAccum)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.fluss.spark.utils.LogChangesIterator
import org.apache.fluss.utils.CloseableIterator

import org.apache.spark.internal.Logging
import org.apache.spark.util.LongAccumulator

import java.util.Comparator

Expand All @@ -46,7 +47,11 @@ class FlussUpsertPartitionReader(
tablePath: TablePath,
projection: Array[Int],
flussPartition: FlussUpsertInputPartition,
flussConfig: Configuration)
flussConfig: Configuration,
fetchRequestsAccum: LongAccumulator = null,
fetchTimeMsAccum: LongAccumulator = null,
maxFetchTimeMsAccum: MaxLongAccumulator = null,
fetchErrorsAccum: LongAccumulator = null)
extends FlussPartitionReader(tablePath, flussConfig)
with Logging {

Expand Down Expand Up @@ -129,7 +134,20 @@ class FlussUpsertPartitionReader(
var continue = true

while (continue) {
val records = logScanner.poll(POLL_TIMEOUT)
val pollStartMs = System.currentTimeMillis()
val records =
try {
logScanner.poll(POLL_TIMEOUT)
} catch {
case e: Exception =>
if (fetchErrorsAccum != null) fetchErrorsAccum.add(1L)
throw e
} finally {
val pollTimeMs = System.currentTimeMillis() - pollStartMs
if (fetchRequestsAccum != null) fetchRequestsAccum.add(1L)
if (fetchTimeMsAccum != null) fetchTimeMsAccum.add(pollTimeMs)
if (maxFetchTimeMsAccum != null) maxFetchTimeMsAccum.add(pollTimeMs)
}
if (!records.isEmpty) {
val flatRecords = records.asScala
for (scanRecord <- flatRecords) {
Expand Down Expand Up @@ -161,7 +179,20 @@ class FlussUpsertPartitionReader(

override def hasNext: Boolean = {
while ((currentBatch == null || !currentBatch.hasNext) && hasMoreBatches) {
val batch = snapshotScanner.pollBatch(POLL_TIMEOUT)
val pollStartMs = System.currentTimeMillis()
val batch =
try {
snapshotScanner.pollBatch(POLL_TIMEOUT)
} catch {
case e: Exception =>
if (fetchErrorsAccum != null) fetchErrorsAccum.add(1L)
throw e
} finally {
val pollTimeMs = System.currentTimeMillis() - pollStartMs
if (fetchRequestsAccum != null) fetchRequestsAccum.add(1L)
if (fetchTimeMsAccum != null) fetchTimeMsAccum.add(pollTimeMs)
if (maxFetchTimeMsAccum != null) maxFetchTimeMsAccum.add(pollTimeMs)
}
if (batch == null) {
hasMoreBatches = false
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.spark.read

import org.apache.spark.util.AccumulatorV2

/** Accumulator that tracks the maximum Long value across all tasks. */
class MaxLongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {
@volatile private var _max: Long = 0L

override def isZero: Boolean = _max == 0L

override def copy(): MaxLongAccumulator = {
val newAcc = new MaxLongAccumulator
newAcc._max = this._max
newAcc
}

override def reset(): Unit = { _max = 0L }

override def add(v: java.lang.Long): Unit = {
if (v > _max) _max = v
}

override def merge(other: AccumulatorV2[java.lang.Long, java.lang.Long]): Unit =
other match {
case o: MaxLongAccumulator => if (o._max > _max) _max = o._max
case _ =>
throw new UnsupportedOperationException(
s"Cannot merge ${other.getClass.getName} into MaxLongAccumulator")
}

override def value: java.lang.Long = _max
}
Loading
Loading