From c267ed0338539977d543464de27860ec23ed47a6 Mon Sep 17 00:00:00 2001 From: hemanthsavasere Date: Fri, 6 Mar 2026 12:30:46 +0000 Subject: [PATCH 1/2] Fixing the Flaky Test for org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScannerITCase.testKvSnapshotLeas --- .../testutils/FlussClusterExtension.java | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index 305a4269aa..65721b8d6a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -745,33 +745,37 @@ public CompletedSnapshot triggerAndWaitSnapshot(TableBucket tableBucket) { } private Long triggerSnapshot(TableBucket tableBucket) { - Long snapshotId = null; - Long nextSnapshotId = null; for (TabletServer ts : tabletServers.values()) { ReplicaManager.HostedReplica replica = ts.getReplicaManager().getReplica(tableBucket); if (replica instanceof ReplicaManager.OnlineReplica) { Replica r = ((ReplicaManager.OnlineReplica) replica).getReplica(); PeriodicSnapshotManager kvSnapshotManager = r.getKvSnapshotManager(); if (r.isLeader() && kvSnapshotManager != null) { - snapshotId = kvSnapshotManager.currentSnapshotId(); + long snapshotId = kvSnapshotManager.currentSnapshotId(); kvSnapshotManager.triggerSnapshot(); - nextSnapshotId = kvSnapshotManager.currentSnapshotId(); - break; + // Poll until the snapshot ID increments, confirming the async trigger was + // processed. triggerSnapshot() submits work to a guardedExecutor + // asynchronously, so the counter may not have incremented yet on return. + long deadline = System.currentTimeMillis() + 30_000; + while (kvSnapshotManager.currentSnapshotId() <= snapshotId) { + if (System.currentTimeMillis() > deadline) { + fail( + "Timed out waiting for snapshot trigger to be processed for " + + tableBucket); + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + return snapshotId; } } } - - if (snapshotId != null) { - if (nextSnapshotId > snapshotId) { - // only there is a new snapshot triggered, we return the snapshot id - return snapshotId; - } else { - return null; - } - } else { - fail("No KV snapshot manager found for table bucket " + tableBucket); - return null; - } + fail("No KV snapshot manager found for table bucket " + tableBucket); + return null; } private CompletedSnapshot waitUntilSnapshotFinished(TableBucket tableBucket, long snapshotId) { From e671f0194c022c1c049ff00d869f9e94f8adc7f0 Mon Sep 17 00:00:00 2001 From: hemanthsavasere Date: Fri, 6 Mar 2026 12:59:35 +0000 Subject: [PATCH 2/2] Fixing the Flaky Test for org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScannerITCase.testKvSnapshotLease timeout fix --- .../fluss/server/testutils/FlussClusterExtension.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index 65721b8d6a..a342d3c07a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -756,12 +756,12 @@ private Long triggerSnapshot(TableBucket tableBucket) { // Poll until the snapshot ID increments, confirming the async trigger was // processed. triggerSnapshot() submits work to a guardedExecutor // asynchronously, so the counter may not have incremented yet on return. - long deadline = System.currentTimeMillis() + 30_000; + // If the ID does not increment within the timeout, the snapshot was + // legitimately skipped (e.g., no new data since last snapshot). + long deadline = System.currentTimeMillis() + 1_000; while (kvSnapshotManager.currentSnapshotId() <= snapshotId) { if (System.currentTimeMillis() > deadline) { - fail( - "Timed out waiting for snapshot trigger to be processed for " - + tableBucket); + return null; } try { Thread.sleep(10);