Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0289e6d
alpha
Vladsz83 Sep 8, 2025
9c00048
remove external op id, refactoring
Vladsz83 Sep 8, 2025
bd9b1ca
+ incremental metrics, refactorings, renamings
Vladsz83 Sep 8, 2025
5d74475
+ test
Vladsz83 Sep 9, 2025
af5f83e
+ incr. test
Vladsz83 Sep 9, 2025
8683f81
self reviewed
Vladsz83 Sep 9, 2025
997bb90
+ thread pool sz param
Vladsz83 Sep 9, 2025
66cbd24
+ thread pool sz param
Vladsz83 Sep 9, 2025
b8c9b23
Merge remote-tracking branch 'my/IGNITE-19972-Add-metrics-of-snapshot…
Vladsz83 Sep 9, 2025
f1dcbee
fix
Vladsz83 Sep 9, 2025
c2f078c
Merge branch 'master' into IGNITE-19972-Add-metrics-of-snapshot-check
Vladsz83 Feb 16, 2026
423a809
+ master
Vladsz83 Feb 16, 2026
b5efb85
Merge branch 'master' into IGNITE-19972-Add-metrics-of-snapshot-check
Vladsz83 Feb 19, 2026
0670c13
Merge branch 'master' into IGNITE-19972-Add-metrics-of-snapshot-check
Vladsz83 Feb 20, 2026
114b1dd
test fix
Vladsz83 Feb 20, 2026
42e4be5
Merge branch 'master' into IGNITE-19972-Add-metrics-of-snapshot-check
Vladsz83 Feb 21, 2026
5e7d9d3
minority
Vladsz83 Feb 21, 2026
e4e5dab
logs
Vladsz83 Feb 22, 2026
145175e
+ logs
Vladsz83 Feb 22, 2026
5da3e9a
minority
Vladsz83 Feb 22, 2026
040726c
test logs
Vladsz83 Feb 22, 2026
2adbd0d
research log fixes
Vladsz83 Feb 22, 2026
0ea777c
test fix. revert researching logs
Vladsz83 Feb 23, 2026
5117c40
Merge branch 'master' into IGNITE-19972-Add-metrics-of-snapshot-check
Vladsz83 Mar 10, 2026
d036f4a
Merge branch 'master' into IGNITE-19972-Add-metrics-of-snapshot-check
Vladsz83 Mar 17, 2026
1c52f85
review metrics
Vladsz83 Mar 17, 2026
be1c81c
Merge branch 'master' into IGNITE-19972-Add-metrics-of-snapshot-check
Vladsz83 Mar 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,7 @@ private IgniteInternalFuture<SnapshotOperationResponse> initLocalFullSnapshot(
);

SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta, req.groups(), cctx.localNode(), snpOp.snapshotFileTree(),
snpOp.streamerWarning(), true);
snpOp.streamerWarning(), true, null, null);

snpOp.meta(meta);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -50,6 +51,7 @@
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.managers.discovery.ConsistentIdMapper.ALL_NODES;

Expand All @@ -71,11 +73,26 @@ public class IncrementalSnapshotVerify implements Supplier<IncrementalSnapshotVe
private LongAdder procEntriesCnt;

/** */
public IncrementalSnapshotVerify(IgniteEx ignite, IgniteLogger log, SnapshotFileTree sft, int incrementalIdx) {
@Nullable private final Consumer<Integer> totalCnsmr;

/** */
@Nullable private final Consumer<Integer> checkedCnsmr;

/** */
public IncrementalSnapshotVerify(
IgniteEx ignite,
IgniteLogger log,
SnapshotFileTree sft,
int incrementalIdx,
@Nullable Consumer<Integer> totalCnsmr,
@Nullable Consumer<Integer> checkedCnsmr
) {
this.ignite = ignite;
this.log = log;
this.sft = sft;
this.incIdx = incrementalIdx;
incIdx = incrementalIdx;
this.totalCnsmr = totalCnsmr;
this.checkedCnsmr = checkedCnsmr;
}

/**
Expand Down Expand Up @@ -106,11 +123,15 @@ public IncrementalSnapshotVerify(IgniteEx ignite, IgniteLogger log, SnapshotFile
ignite.context().cache().context(), sft, incIdx, txCaches.keySet()
) {
@Override void totalWalSegments(int segCnt) {
// No-op.
if (totalCnsmr != null)
totalCnsmr.accept(segCnt);
}

@Override void processedWalSegments(int segCnt) {
procSegCnt.set(segCnt);

if (checkedCnsmr != null)
checkedCnsmr.accept(segCnt);
}

@Override void initWalEntries(LongAdder entriesCnt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@
import org.apache.ignite.internal.management.cache.PartitionKey;
import org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
Expand All @@ -60,6 +63,9 @@

/** Distributed process of snapshot checking. */
public class SnapshotCheckProcess {
/** */
public static final String SNAPSHOT_CHECK_METRIC = "snapshot-check";

/** */
private final IgniteLogger log;

Expand Down Expand Up @@ -157,6 +163,8 @@ else if (ctx.req.allRestoreHandlers())
return new GridFinishedFuture<>();
}
finally {
unregisterMetrics(ctx.req.snapshotName());

if (log.isInfoEnabled())
log.info("Finished snapshot validation [req=" + ctx.req + ']');
}
Expand Down Expand Up @@ -314,6 +322,8 @@ private IgniteInternalFuture<SnapshotCheckResponse> validateParts(SnapshotCheckP
if (F.isEmpty(ctx.metas))
return new GridFinishedFuture<>();

ctx.totalCounter.set(0);

GridFutureAdapter<SnapshotCheckResponse> phaseFut = new GridFutureAdapter<>();

CompletableFuture<SnapshotCheckResponse> workingFut;
Expand Down Expand Up @@ -346,7 +356,11 @@ private CompletableFuture<SnapshotCheckResponse> incrementalFuture(SnapshotCheck
CompletableFuture<SnapshotCheckResponse> resFut = new CompletableFuture<>();

CompletableFuture<IncrementalSnapshotVerifyResult> workingFut = snpChecker.checkIncrementalSnapshot(
ctx.locFileTree.get(meta.consistentId()), ctx.req.incrementalIndex());
ctx.locFileTree.get(meta.consistentId()),
ctx.req.incrementalIndex(),
ctx.totalCounter::addAndGet,
ctx.checkedCounter::addAndGet
);

workingFut.whenComplete((res, err) -> {
if (err != null)
Expand All @@ -368,22 +382,29 @@ private CompletableFuture<SnapshotCheckResponse> partitionsHashesFuture(Snapshot
AtomicInteger metasProcessed = new AtomicInteger(ctx.metas.size());

for (SnapshotMetadata meta : ctx.metas) {
CompletableFuture<Map<PartitionKey, PartitionHashRecord>> metaFut = snpChecker.checkPartitions(
meta,
ctx.locFileTree.get(meta.consistentId()),
ctx.req.groups(),
false,
ctx.req.fullCheck()
);

metaFut.whenComplete((res, err) -> {
if (err != null)
exceptions.put(meta.consistentId(), err);
else if (!F.isEmpty(res))
perMetaResults.put(meta.consistentId(), res);

if (metasProcessed.decrementAndGet() == 0)
composedFut.complete(new SnapshotCheckResponse(perMetaResults, exceptions));
// Run asynchronously to calculate the metric 'total partitions' faster.
kctx.pools().getSnapshotExecutorService().submit(() -> {
CompletableFuture<Map<PartitionKey, PartitionHashRecord>> metaFut = snpChecker.checkPartitions(
meta,
ctx.locFileTree.get(meta.consistentId()),
ctx.req.groups(),
false,
ctx.req.fullCheck(),
ctx.totalCounter::addAndGet,
checkedPartId -> ctx.checkedCounter.incrementAndGet()
);

metaFut.whenComplete((res, err) -> {
ctx.checkedSnapshotParts.incrementAndGet();

if (err != null)
exceptions.put(meta.consistentId(), err);
else if (!F.isEmpty(res))
perMetaResults.put(meta.consistentId(), res);

if (metasProcessed.decrementAndGet() == 0)
composedFut.complete(new SnapshotCheckResponse(perMetaResults, exceptions));
});
});
}

Expand All @@ -403,17 +424,28 @@ private CompletableFuture<SnapshotCheckResponse> allHandlersFuture(SnapshotCheck
AtomicInteger metasProcessed = new AtomicInteger(ctx.metas.size());

for (SnapshotMetadata meta : ctx.metas) {
CompletableFuture<Map<String, SnapshotHandlerResult<Object>>> metaFut = snpChecker.invokeCustomHandlers(meta,
ctx.locFileTree.get(meta.consistentId()), ctx.req.groups(), ctx.req.fullCheck());

metaFut.whenComplete((res, err) -> {
if (err != null)
exceptions.put(meta.consistentId(), err);
else if (!F.isEmpty(res))
perMetaResults.put(meta.consistentId(), res);

if (metasProcessed.decrementAndGet() == 0)
composedFut.complete(new SnapshotCheckResponse(perMetaResults, exceptions));
// Run asynchronously to calculate the metric 'total partitions' faster.
kctx.pools().getSnapshotExecutorService().submit(() -> {
CompletableFuture<Map<String, SnapshotHandlerResult<Object>>> metaFut = snpChecker.invokeCustomHandlers(
meta,
ctx.locFileTree.get(meta.consistentId()),
ctx.req.groups(),
ctx.req.fullCheck(),
ctx.totalCounter::addAndGet,
processedPart -> ctx.checkedCounter.incrementAndGet()
);

metaFut.whenComplete((res, err) -> {
ctx.checkedSnapshotParts.incrementAndGet();

if (err != null)
exceptions.put(meta.consistentId(), err);
else if (!F.isEmpty(res))
perMetaResults.put(meta.consistentId(), res);

if (metasProcessed.decrementAndGet() == 0)
composedFut.complete(new SnapshotCheckResponse(perMetaResults, exceptions));
});
});
}

Expand Down Expand Up @@ -465,6 +497,8 @@ private IgniteInternalFuture<SnapshotCheckResponse> prepareAndCheckMetas(Snapsho
if (!baseline(kctx.localNodeId()))
return new GridFinishedFuture<>();

registerMetrics(ctx);

Collection<Integer> grpIds = F.isEmpty(req.groups()) ? null : F.viewReadOnly(req.groups(), CU::cacheId);

GridFutureAdapter<SnapshotCheckResponse> phaseFut = new GridFutureAdapter<>();
Expand Down Expand Up @@ -548,8 +582,11 @@ private void reducePreparationAndMetasCheck(
phase2PartsHashes.start(reqId, ctx.req);
}
catch (Throwable th) {
if (ctx != null)
if (ctx != null) {
unregisterMetrics(ctx.req.snapshotName());

contexts.remove(ctx.req.snapshotName());
}

if (clusterOpFut != null)
clusterOpFut.onDone(th);
Expand All @@ -562,12 +599,13 @@ private void reducePreparationAndMetasCheck(
*
* @return Metadatas to process on current node.
*/
private @Nullable List<SnapshotMetadata> assingMetas(Map<ClusterNode, List<SnapshotMetadata>> clusterMetas) {
private List<SnapshotMetadata> assingMetas(Map<ClusterNode, List<SnapshotMetadata>> clusterMetas) {
ClusterNode locNode = kctx.cluster().get().localNode();
List<SnapshotMetadata> locMetas = clusterMetas.get(locNode);

// Might be empty due to a cache's node filter.
if (F.isEmpty(locMetas))
return null;
return Collections.emptyList();

Set<String> onlineNodesConstIdsStr = new HashSet<>(clusterMetas.size());
// The nodes are sorted with lesser order.
Expand Down Expand Up @@ -681,6 +719,42 @@ private boolean baseline(UUID nodeId) {
return null;
}

/** */
private void registerMetrics(SnapshotCheckContext ctx) {
MetricRegistryImpl mreg = kctx.metric().registry(MetricUtils.metricName(SNAPSHOT_CHECK_METRIC, ctx.req.snapshotName()));

assert !mreg.iterator().hasNext();
assert ctx.req.requestId() != null;

mreg.register("startTime", U::currentTimeMillis,
"The system time of the start of the cluster snapshot check operation on current node.");

if (ctx.req.incrementalIndex() > 0) {
mreg.register("incrementIndex", ctx.req::incrementalIndex,
"The index of incremental snapshot of the snapshot check operation.");
mreg.register("totalWalSegments", ctx.totalCounter::get,
"The total number of WAL segments in the incremental snapshot to check on current node.");
mreg.register("processedWalSegments", ctx.checkedCounter::get,
"The number of checked WAL segments in the incremental snapshot on current node.");
}
else {
mreg.register("checkPartitions", ctx.req::fullCheck, "Shows whether full validation of snapshot partitions is enabled.");
mreg.register("totalPartitions", ctx.totalCounter::get, "The total number of partitions to check on current node.");
mreg.register("processedPartitions", ctx.checkedCounter::get, "The number of checked partitions on current node.");

// Node can hold and process another nodes' snapshot data.
mreg.register("snapshotPartsToProcess", () -> ctx.metas == null ? -1 : ctx.metas.size(),
"Number of parts (nodes data) of snapshot to check on current node.");
mreg.register("processedSnapshotParts", ctx.checkedSnapshotParts::get,
"Number of checked snapshot parts (nodes data) on current node.");
}
}

/** */
private void unregisterMetrics(String snpName) {
kctx.metric().remove(MetricUtils.metricName(SNAPSHOT_CHECK_METRIC, snpName));
}

/** Operation context. */
private static final class SnapshotCheckContext {
/** Request. */
Expand All @@ -691,7 +765,7 @@ private static final class SnapshotCheckContext {
* Metadatas to process on this node. Also indicates the snapshot parts to check on this node.
* @see #partitionsHashesFuture(SnapshotCheckContext)
*/
@Nullable private List<SnapshotMetadata> metas;
@Nullable private volatile List<SnapshotMetadata> metas;

/** Map of snapshot pathes per consistent id for {@link #metas}. */
@GridToStringInclude
Expand All @@ -700,9 +774,23 @@ private static final class SnapshotCheckContext {
/** All the snapshot metadatas. */
@Nullable private Map<ClusterNode, List<SnapshotMetadata>> clusterMetas;

/** Common counter of total work units to process on current node. */
@GridToStringExclude
private final AtomicInteger totalCounter = new AtomicInteger(-1);

/** Common counter of checked work units on current node. */
@GridToStringExclude
private final AtomicInteger checkedCounter = new AtomicInteger(0);

/** Number of checked snapshot parts (nodes data/consistent ids) on current node. {@code Null} for incremental snapshots. */
@GridToStringExclude
@Nullable private final AtomicInteger checkedSnapshotParts;

/** Creates operation context. */
private SnapshotCheckContext(SnapshotCheckProcessRequest req) {
this.req = req;

checkedSnapshotParts = req.incrementalIndex() > 0 ? null : new AtomicInteger(0);
}

/** {@inheritDoc} */
Expand Down
Loading