[lake/tiering] Add CoordinatorServer Monitoring Metrics for Lake Tiering#2454
[lake/tiering] Add CoordinatorServer Monitoring Metrics for Lake Tiering#2454beryllw wants to merge 3 commits intoapache:mainfrom
Conversation
621573c to
85b4cca
Compare
There was a problem hiding this comment.
Thanks for pushing this forward! @beryllw .
From my perspective, the key observation for lake tiering is to expose the tiering progress—whether it is catching up with the latest data written to lake storage. This would help us decide whether to scale the tiering service Flink job up or down.
cc @luoyuxia @wuchong Please let me know if I’m missing anything.
Maybe I can build the above metric on top of your current framework.
85b4cca to
6dda873
Compare
510e1e5 to
e4eef43
Compare
There was a problem hiding this comment.
Pull request overview
Adds coordinator-side monitoring for the Lake Tiering subsystem by introducing new metric groups/metrics and wiring end-to-end propagation of per-table lake stats (file size / record count) from lake committers → Flink tiering job → coordinator heartbeat handling.
Changes:
- Introduce
LakeTieringMetricGroupand register coordinator-level + table-level lake tiering gauges inLakeTableTieringManager. - Extend tiering heartbeat payloads to include
PbLakeTieringStatsand propagate stats from lake committers through the Flink tiering pipeline. - Update documentation and unit tests to reflect the new metrics and the updated tiering manager APIs.
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| website/docs/maintenance/observability/monitor-metrics.md | Documents new coordinator lake tiering metrics (global + per-table). |
| fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java | Adds a test LakeTieringMetricGroup for coordinator tests. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java | Updates tiering manager construction to pass metric group. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java | Updates tiering manager construction to pass metric group. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java | Adapts to new APIs; adds assertions for new metrics behavior. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java | Updates tiering manager construction to pass metric group. |
| fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java | New metric group for lake tiering + per-table subgroups. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java | Registers/updates lake tiering metrics; extends finish API to accept lake stats. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java | Reads optional tiering stats from finished-table heartbeats and forwards to manager. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java | Constructs tiering manager with a real LakeTieringMetricGroup. |
| fluss-rpc/src/main/proto/FlussApi.proto | Adds PbLakeTieringStats and optional inclusion in heartbeat table requests. |
| fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java | Computes and returns cumulative table stats for a committed snapshot (best-effort). |
| fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java | Notes stats aren’t available yet; leaves values unknown. |
| fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java | Extracts cumulative table stats from snapshot summary and returns them. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java | Sends per-finished-table stats in heartbeat finished table entries. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringStats.java | New immutable stats container with -1 unknown sentinels. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/FinishedTieringEvent.java | Extends finished event to include tiering stats. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java | Collects LakeCommitResult stats and emits them via FinishedTieringEvent. |
| fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java | Adds metric name constants for lake tiering gauges. |
| fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitResult.java | Extends commit result to carry cumulative lake file size / record count. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| finishedTableReqs.add( | ||
| new PbHeartbeatReqForTable() | ||
| .setTableId(tableId) | ||
| .setCoordinatorEpoch(coordinatorEpoch) | ||
| .setTieringEpoch(tieringFinishInfo.tieringEpoch) | ||
| .setLakeTieringStats( | ||
| new PbLakeTieringStats() | ||
| .setFileSize( | ||
| tieringFinishInfo.stats | ||
| .getFileSize()) | ||
| .setRecordCount( | ||
| tieringFinishInfo.stats | ||
| .getRecordCount()))); |
There was a problem hiding this comment.
TieringFinishInfo.stats can be null (e.g., if a caller constructs FinishedTieringEvent with a null stats value), but this code unconditionally dereferences it when building PbLakeTieringStats, which can crash the enumerator during heartbeat building. Default null stats to TieringStats.UNKNOWN (either when constructing TieringFinishInfo / FinishedTieringEvent, or guard here before calling getters).
| totalFileSize = Long.parseLong(sizeStr); | ||
| } | ||
| if (countStr != null) { | ||
| totalRecordCount = Long.parseLong(countStr); |
There was a problem hiding this comment.
Parsing Iceberg snapshot summary values with Long.parseLong can throw NumberFormatException if the summary is missing, corrupted, or uses a non-numeric format, which would fail the whole commit. Treat stats collection as best-effort here as well: catch parse errors (and optionally log a warning) and keep the values as -1 instead of failing the commit.
| totalFileSize = Long.parseLong(sizeStr); | |
| } | |
| if (countStr != null) { | |
| totalRecordCount = Long.parseLong(countStr); | |
| try { | |
| totalFileSize = Long.parseLong(sizeStr); | |
| } catch (NumberFormatException nfe) { | |
| LOG.warn( | |
| "Failed to parse snapshot total file size '{}' for snapshot {}. Using default -1.", | |
| sizeStr, | |
| snapshotId, | |
| nfe); | |
| } | |
| } | |
| if (countStr != null) { | |
| try { | |
| totalRecordCount = Long.parseLong(countStr); | |
| } catch (NumberFormatException nfe) { | |
| LOG.warn( | |
| "Failed to parse snapshot total record count '{}' for snapshot {}. Using default -1.", | |
| countStr, | |
| snapshotId, | |
| nfe); | |
| } |
| /** | ||
| * Computes cumulative table stats from the snapshot's base manifest list. | ||
| * | ||
| * <p>The base manifest list covers all live data files in the snapshot (main branch only, | ||
| * expired snapshots excluded). ADD entries increase the total; DELETE entries (produced by | ||
| * compaction) decrease it, yielding the net live file size and physical row count. | ||
| * | ||
| * <p>For primary-key tables the physical row count may include un-compacted delete rows at L0 | ||
| * before a full compaction is completed. | ||
| * | ||
| * @return {@code long[]{totalFileSize, totalRowCount}} | ||
| */ | ||
| @VisibleForTesting | ||
| static long[] computeTableStats(FileStore<?> store, Snapshot snapshot) { | ||
| ManifestList manifestList = store.manifestListFactory().create(); | ||
| ManifestFile manifestFile = store.manifestFileFactory().create(); | ||
| List<ManifestFileMeta> manifestFileMetas = manifestList.readDataManifests(snapshot); | ||
| long totalFileSize = 0L; | ||
| long totalRowCount = 0L; | ||
| for (ManifestFileMeta manifestFileMeta : manifestFileMetas) { | ||
| List<ManifestEntry> manifestEntries = manifestFile.read(manifestFileMeta.fileName()); | ||
| for (ManifestEntry entry : manifestEntries) { | ||
| if (entry.kind() == FileKind.ADD) { | ||
| totalFileSize += entry.file().fileSize(); | ||
| totalRowCount += entry.file().rowCount(); | ||
| } else { | ||
| totalFileSize -= entry.file().fileSize(); | ||
| totalRowCount -= entry.file().rowCount(); | ||
| } | ||
| } | ||
| } | ||
| return new long[] {totalFileSize, totalRowCount}; |
There was a problem hiding this comment.
computeTableStats reads every data manifest and every manifest entry for the committed snapshot on each commit. For large/active tables this can add significant overhead to the tiering commit path and increase end-to-end latency. Consider guarding stats collection behind a config flag, sampling, caching, or using a cheaper built-in snapshot/statistics API if available, while keeping the existing best-effort behavior.
| <td rowspan="3">lakeTiering</td> | ||
| <td>pendingTablesCount</td> | ||
| <td>The number of tables waiting to be tiered.</td> | ||
| <td>Gauge</td> | ||
| </tr> | ||
| <tr> | ||
| <td>runningTablesCount</td> | ||
| <td>The number of tables currently being tiered.</td> | ||
| <td>Gauge</td> | ||
| </tr> | ||
| <tr> | ||
| <td>failuresTotal</td> | ||
| <td>The total number of tiering failures across all tables.</td> | ||
| <td>Gauge</td> | ||
| </tr> | ||
| <tr> | ||
| <td rowspan="5">lakeTiering_table</td> | ||
| <td>tierLag</td> | ||
| <td>Time in milliseconds since the last successful tiering operation for this table. Returns -1 for newly created tables that have never been tiered.</td> | ||
| <td>Gauge</td> |
There was a problem hiding this comment.
This doc section lists lakeTiering.failuresTotal and states tierLag returns -1 for newly created tables. In the current implementation, a global failuresTotal metric is not registered, and tierLag is computed from tableLastTieredTime which is initialized on table registration (so new tables will report ~0 rather than -1). Either implement the documented metrics/semantics or adjust the documentation to match the actual exported values.
| tieringMetricGroup.gauge( | ||
| MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT, pendingTieringTables::size); | ||
| tieringMetricGroup.gauge( | ||
| MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, liveTieringTableIds::size); |
There was a problem hiding this comment.
Docs list a coordinator lakeTiering.failuresTotal metric, and the manager maintains globalTieringFailureCount, but no metric is registered for it (and there is no corresponding MetricNames constant). Register a gauge/counter for the global failure total here and add the metric name constant so the documented metric is actually exposed.
| MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, liveTieringTableIds::size); | |
| MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, liveTieringTableIds::size); | |
| tieringMetricGroup.gauge( | |
| MetricNames.LAKE_TIERING_FAILURES_TOTAL, () -> globalTieringFailureCount); |
| private TieringFinishInfo(long tieringEpoch, boolean isForceFinished, TieringStats stats) { | ||
| this.tieringEpoch = tieringEpoch; | ||
| this.isForceFinished = isForceFinished; | ||
| this.stats = stats; |
There was a problem hiding this comment.
The TieringFinishInfo factory/constructor accepts a potentially null TieringStats and stores it directly. Since downstream code assumes stats is non-null (to build protobuf messages), enforce non-null here (e.g., replace null with TieringStats.UNKNOWN or validate with a null-check).
| this.stats = stats; | |
| this.stats = (stats == null) ? TieringStats.UNKNOWN : stats; |
|
|
||
| public FinishedTieringEvent(long tableId, TieringStats stats) { | ||
| this.tableId = tableId; | ||
| this.stats = stats; |
There was a problem hiding this comment.
FinishedTieringEvent stores the provided stats reference as-is. If any caller passes null, it will propagate and can cause NPEs when building heartbeats. Consider normalizing null to TieringStats.UNKNOWN (or rejecting null) in this constructor to keep the event contract safe.
| this.stats = stats; | |
| this.stats = stats != null ? stats : TieringStats.UNKNOWN; |
| MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT, pendingTieringTables::size); | ||
| tieringMetricGroup.gauge( | ||
| MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, liveTieringTableIds::size); |
There was a problem hiding this comment.
The global lakeTiering gauges read pendingTieringTables::size (ArrayDeque) and liveTieringTableIds::size (HashMap) without holding the manager lock. Metric reporters may call these suppliers concurrently with state transitions, which can lead to racy/inconsistent values and potential concurrency issues. Wrap the suppliers in inReadLock(lock, ...) (or switch these structures to concurrent + maintain atomic counts) so metrics collection is thread-safe.
| MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT, pendingTieringTables::size); | |
| tieringMetricGroup.gauge( | |
| MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, liveTieringTableIds::size); | |
| MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT, | |
| () -> inReadLock(lock, () -> pendingTieringTables.size())); | |
| tieringMetricGroup.gauge( | |
| MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, | |
| () -> inReadLock(lock, () -> liveTieringTableIds.size())); |
| // tierLag: milliseconds since last successful tiering | ||
| tableMetricGroup.gauge( | ||
| MetricNames.LAKE_TIERING_TABLE_TIER_LAG, | ||
| () -> | ||
| inReadLock( | ||
| lock, | ||
| () -> { | ||
| Long lastTiered = tableLastTieredTime.get(tableId); | ||
| return lastTiered != null | ||
| ? clock.milliseconds() - lastTiered | ||
| : -1L; | ||
| })); |
There was a problem hiding this comment.
The tierLag gauge is described as "since last successful tiering" and the docs say it returns -1 for tables that have never been tiered, but the implementation uses tableLastTieredTime which is initialized when the table is registered. That means newly created tables will report a small/zero lag instead of -1. Consider tracking "last successful tiering time" separately (initialized to -1) or adjust the metric/doc semantics to match what is stored.
| // tierDuration: duration of last tiering job | ||
| tableMetricGroup.gauge( | ||
| MetricNames.LAKE_TIERING_TABLE_TIER_DURATION, | ||
| () -> inReadLock(lock, () -> tableLastTieringDuration.getOrDefault(tableId, 0L))); |
There was a problem hiding this comment.
tierDuration is initialized to -1L for unknown, but the gauge uses getOrDefault(tableId, 0L), which can surface an incorrect 0 during races (e.g., if the table is removed before the metric group is fully deregistered). Use -1L as the default here to keep the "unknown" sentinel consistent.
| () -> inReadLock(lock, () -> tableLastTieringDuration.getOrDefault(tableId, 0L))); | |
| () -> inReadLock(lock, () -> tableLastTieringDuration.getOrDefault(tableId, -1L))); |
Purpose
Linked issue: close #2440
Brief change log
Tests
API and Format
Documentation