Skip to content

[lake/tiering] Add CoordinatorServer Monitoring Metrics for Lake Tiering#2454

Draft
beryllw wants to merge 3 commits intoapache:mainfrom
beryllw:lake-metrics-poc
Draft

[lake/tiering] Add CoordinatorServer Monitoring Metrics for Lake Tiering#2454
beryllw wants to merge 3 commits intoapache:mainfrom
beryllw:lake-metrics-poc

Conversation

@beryllw
Copy link
Contributor

@beryllw beryllw commented Jan 23, 2026

Purpose

Linked issue: close #2440

Brief change log

Tests

API and Format

Documentation

@beryllw beryllw marked this pull request as draft January 23, 2026 07:50
Copy link
Member

@zuston zuston left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pushing this forward! @beryllw .

From my perspective, the key observation for lake tiering is to expose the tiering progress—whether it is catching up with the latest data written to lake storage. This would help us decide whether to scale the tiering service Flink job up or down.
cc @luoyuxia @wuchong Please let me know if I’m missing anything.

Maybe I can build the above metric on top of your current framework.

@beryllw beryllw force-pushed the lake-metrics-poc branch 2 times, most recently from 510e1e5 to e4eef43 Compare March 6, 2026 10:11
@luoyuxia luoyuxia requested review from Copilot and luoyuxia March 6, 2026 11:56
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds coordinator-side monitoring for the Lake Tiering subsystem by introducing new metric groups/metrics and wiring end-to-end propagation of per-table lake stats (file size / record count) from lake committers → Flink tiering job → coordinator heartbeat handling.

Changes:

  • Introduce LakeTieringMetricGroup and register coordinator-level + table-level lake tiering gauges in LakeTableTieringManager.
  • Extend tiering heartbeat payloads to include PbLakeTieringStats and propagate stats from lake committers through the Flink tiering pipeline.
  • Update documentation and unit tests to reflect the new metrics and the updated tiering manager APIs.

Reviewed changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
website/docs/maintenance/observability/monitor-metrics.md Documents new coordinator lake tiering metrics (global + per-table).
fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java Adds a test LakeTieringMetricGroup for coordinator tests.
fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java Updates tiering manager construction to pass metric group.
fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java Updates tiering manager construction to pass metric group.
fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java Adapts to new APIs; adds assertions for new metrics behavior.
fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java Updates tiering manager construction to pass metric group.
fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java New metric group for lake tiering + per-table subgroups.
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java Registers/updates lake tiering metrics; extends finish API to accept lake stats.
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java Reads optional tiering stats from finished-table heartbeats and forwards to manager.
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java Constructs tiering manager with a real LakeTieringMetricGroup.
fluss-rpc/src/main/proto/FlussApi.proto Adds PbLakeTieringStats and optional inclusion in heartbeat table requests.
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java Computes and returns cumulative table stats for a committed snapshot (best-effort).
fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java Notes stats aren’t available yet; leaves values unknown.
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java Extracts cumulative table stats from snapshot summary and returns them.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java Sends per-finished-table stats in heartbeat finished table entries.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringStats.java New immutable stats container with -1 unknown sentinels.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/FinishedTieringEvent.java Extends finished event to include tiering stats.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java Collects LakeCommitResult stats and emits them via FinishedTieringEvent.
fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java Adds metric name constants for lake tiering gauges.
fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitResult.java Extends commit result to carry cumulative lake file size / record count.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +592 to +604
finishedTableReqs.add(
new PbHeartbeatReqForTable()
.setTableId(tableId)
.setCoordinatorEpoch(coordinatorEpoch)
.setTieringEpoch(tieringFinishInfo.tieringEpoch)
.setLakeTieringStats(
new PbLakeTieringStats()
.setFileSize(
tieringFinishInfo.stats
.getFileSize())
.setRecordCount(
tieringFinishInfo.stats
.getRecordCount())));
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TieringFinishInfo.stats can be null (e.g., if a caller constructs FinishedTieringEvent with a null stats value), but this code unconditionally dereferences it when building PbLakeTieringStats, which can crash the enumerator during heartbeat building. Default null stats to TieringStats.UNKNOWN (either when constructing TieringFinishInfo / FinishedTieringEvent, or guard here before calling getters).

Copilot uses AI. Check for mistakes.
Comment on lines +168 to +171
totalFileSize = Long.parseLong(sizeStr);
}
if (countStr != null) {
totalRecordCount = Long.parseLong(countStr);
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parsing Iceberg snapshot summary values with Long.parseLong can throw NumberFormatException if the summary is missing, corrupted, or uses a non-numeric format, which would fail the whole commit. Treat stats collection as best-effort here as well: catch parse errors (and optionally log a warning) and keep the values as -1 instead of failing the commit.

Suggested change
totalFileSize = Long.parseLong(sizeStr);
}
if (countStr != null) {
totalRecordCount = Long.parseLong(countStr);
try {
totalFileSize = Long.parseLong(sizeStr);
} catch (NumberFormatException nfe) {
LOG.warn(
"Failed to parse snapshot total file size '{}' for snapshot {}. Using default -1.",
sizeStr,
snapshotId,
nfe);
}
}
if (countStr != null) {
try {
totalRecordCount = Long.parseLong(countStr);
} catch (NumberFormatException nfe) {
LOG.warn(
"Failed to parse snapshot total record count '{}' for snapshot {}. Using default -1.",
countStr,
snapshotId,
nfe);
}

Copilot uses AI. Check for mistakes.
Comment on lines +186 to +217
/**
* Computes cumulative table stats from the snapshot's base manifest list.
*
* <p>The base manifest list covers all live data files in the snapshot (main branch only,
* expired snapshots excluded). ADD entries increase the total; DELETE entries (produced by
* compaction) decrease it, yielding the net live file size and physical row count.
*
* <p>For primary-key tables the physical row count may include un-compacted delete rows at L0
* before a full compaction is completed.
*
* @return {@code long[]{totalFileSize, totalRowCount}}
*/
@VisibleForTesting
static long[] computeTableStats(FileStore<?> store, Snapshot snapshot) {
ManifestList manifestList = store.manifestListFactory().create();
ManifestFile manifestFile = store.manifestFileFactory().create();
List<ManifestFileMeta> manifestFileMetas = manifestList.readDataManifests(snapshot);
long totalFileSize = 0L;
long totalRowCount = 0L;
for (ManifestFileMeta manifestFileMeta : manifestFileMetas) {
List<ManifestEntry> manifestEntries = manifestFile.read(manifestFileMeta.fileName());
for (ManifestEntry entry : manifestEntries) {
if (entry.kind() == FileKind.ADD) {
totalFileSize += entry.file().fileSize();
totalRowCount += entry.file().rowCount();
} else {
totalFileSize -= entry.file().fileSize();
totalRowCount -= entry.file().rowCount();
}
}
}
return new long[] {totalFileSize, totalRowCount};
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

computeTableStats reads every data manifest and every manifest entry for the committed snapshot on each commit. For large/active tables this can add significant overhead to the tiering commit path and increase end-to-end latency. Consider guarding stats collection behind a config flag, sampling, caching, or using a cheaper built-in snapshot/statistics API if available, while keeping the existing best-effort behavior.

Copilot uses AI. Check for mistakes.
Comment on lines +388 to +407
<td rowspan="3">lakeTiering</td>
<td>pendingTablesCount</td>
<td>The number of tables waiting to be tiered.</td>
<td>Gauge</td>
</tr>
<tr>
<td>runningTablesCount</td>
<td>The number of tables currently being tiered.</td>
<td>Gauge</td>
</tr>
<tr>
<td>failuresTotal</td>
<td>The total number of tiering failures across all tables.</td>
<td>Gauge</td>
</tr>
<tr>
<td rowspan="5">lakeTiering_table</td>
<td>tierLag</td>
<td>Time in milliseconds since the last successful tiering operation for this table. Returns -1 for newly created tables that have never been tiered.</td>
<td>Gauge</td>
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc section lists lakeTiering.failuresTotal and states tierLag returns -1 for newly created tables. In the current implementation, a global failuresTotal metric is not registered, and tierLag is computed from tableLastTieredTime which is initialized on table registration (so new tables will report ~0 rather than -1). Either implement the documented metrics/semantics or adjust the documentation to match the actual exported values.

Copilot uses AI. Check for mistakes.
tieringMetricGroup.gauge(
MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT, pendingTieringTables::size);
tieringMetricGroup.gauge(
MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, liveTieringTableIds::size);
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs list a coordinator lakeTiering.failuresTotal metric, and the manager maintains globalTieringFailureCount, but no metric is registered for it (and there is no corresponding MetricNames constant). Register a gauge/counter for the global failure total here and add the metric name constant so the documented metric is actually exposed.

Suggested change
MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, liveTieringTableIds::size);
MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, liveTieringTableIds::size);
tieringMetricGroup.gauge(
MetricNames.LAKE_TIERING_FAILURES_TOTAL, () -> globalTieringFailureCount);

Copilot uses AI. Check for mistakes.
private TieringFinishInfo(long tieringEpoch, boolean isForceFinished, TieringStats stats) {
this.tieringEpoch = tieringEpoch;
this.isForceFinished = isForceFinished;
this.stats = stats;
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TieringFinishInfo factory/constructor accepts a potentially null TieringStats and stores it directly. Since downstream code assumes stats is non-null (to build protobuf messages), enforce non-null here (e.g., replace null with TieringStats.UNKNOWN or validate with a null-check).

Suggested change
this.stats = stats;
this.stats = (stats == null) ? TieringStats.UNKNOWN : stats;

Copilot uses AI. Check for mistakes.

public FinishedTieringEvent(long tableId, TieringStats stats) {
this.tableId = tableId;
this.stats = stats;
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FinishedTieringEvent stores the provided stats reference as-is. If any caller passes null, it will propagate and can cause NPEs when building heartbeats. Consider normalizing null to TieringStats.UNKNOWN (or rejecting null) in this constructor to keep the event contract safe.

Suggested change
this.stats = stats;
this.stats = stats != null ? stats : TieringStats.UNKNOWN;

Copilot uses AI. Check for mistakes.
Comment on lines +201 to +203
MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT, pendingTieringTables::size);
tieringMetricGroup.gauge(
MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, liveTieringTableIds::size);
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The global lakeTiering gauges read pendingTieringTables::size (ArrayDeque) and liveTieringTableIds::size (HashMap) without holding the manager lock. Metric reporters may call these suppliers concurrently with state transitions, which can lead to racy/inconsistent values and potential concurrency issues. Wrap the suppliers in inReadLock(lock, ...) (or switch these structures to concurrent + maintain atomic counts) so metrics collection is thread-safe.

Suggested change
MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT, pendingTieringTables::size);
tieringMetricGroup.gauge(
MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, liveTieringTableIds::size);
MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT,
() -> inReadLock(lock, () -> pendingTieringTables.size()));
tieringMetricGroup.gauge(
MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT,
() -> inReadLock(lock, () -> liveTieringTableIds.size()));

Copilot uses AI. Check for mistakes.
Comment on lines +275 to +286
// tierLag: milliseconds since last successful tiering
tableMetricGroup.gauge(
MetricNames.LAKE_TIERING_TABLE_TIER_LAG,
() ->
inReadLock(
lock,
() -> {
Long lastTiered = tableLastTieredTime.get(tableId);
return lastTiered != null
? clock.milliseconds() - lastTiered
: -1L;
}));
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tierLag gauge is described as "since last successful tiering" and the docs say it returns -1 for tables that have never been tiered, but the implementation uses tableLastTieredTime which is initialized when the table is registered. That means newly created tables will report a small/zero lag instead of -1. Consider tracking "last successful tiering time" separately (initialized to -1) or adjust the metric/doc semantics to match what is stored.

Copilot uses AI. Check for mistakes.
// tierDuration: duration of last tiering job
tableMetricGroup.gauge(
MetricNames.LAKE_TIERING_TABLE_TIER_DURATION,
() -> inReadLock(lock, () -> tableLastTieringDuration.getOrDefault(tableId, 0L)));
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tierDuration is initialized to -1L for unknown, but the gauge uses getOrDefault(tableId, 0L), which can surface an incorrect 0 during races (e.g., if the table is removed before the metric group is fully deregistered). Use -1L as the default here to keep the "unknown" sentinel consistent.

Suggested change
() -> inReadLock(lock, () -> tableLastTieringDuration.getOrDefault(tableId, 0L)));
() -> inReadLock(lock, () -> tableLastTieringDuration.getOrDefault(tableId, -1L)));

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[lake/tiering] Add Server-Level Monitoring Metrics for Lake Tiering

3 participants