From 2b6cbb64c67930933561a30615870d958cc76fe2 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 31 Mar 2026 22:47:41 +0000 Subject: [PATCH 1/2] Removes in memory set from dead compaction detector Removed an in memory set of tables ids in the dead compaction detectors that contained table ids that may have compaction tmp files that needed cleanup. This set would be hard to maintain in multiple managers. Also the set could lose track of tables if the process died. Replaced the in memory set with a set in the metadata table. This set is directly populated by the split and merge fate operations, so there is no chance of losing track of things when a process dies. Also this set is more narrow and allows looking for tmp files to cleanup in single tablets dirs rather than scanning an entire tables dir. Also made a change to the order in which tmp files are deleted for failed compactions. They used to be deleted after the metadata for the compaction was cleaned up, this could lead to losing track of the cleanup if the process died after deleting the metadata but before deleting the tmp file. Now the tmp files are deleted before the metadata entry, so should no longer lose track in process death. This change is needed by #6217 --- .../accumulo/core/metadata/schema/Ample.java | 21 ++++ .../core/metadata/schema/MetadataSchema.java | 32 ++++++ .../compaction/ExternalCompactionUtil.java | 3 +- .../metadata/RemovedCompactionStoreImpl.java | 108 ++++++++++++++++++ .../server/metadata/ServerAmpleImpl.java | 5 + .../server/util/FindCompactionTmpFiles.java | 42 ++++++- .../coordinator/CompactionCoordinator.java | 80 +++---------- .../coordinator/DeadCompactionDetector.java | 75 +++++------- .../manager/tableOps/merge/MergeTablets.java | 17 +++ .../manager/tableOps/split/UpdateTablets.java | 10 ++ .../tableOps/merge/MergeTabletsTest.java | 39 ++++++- .../tableOps/split/UpdateTabletsTest.java | 26 +++++ .../compaction/ExternalCompaction_3_IT.java | 10 +- 13 files changed, 350 insertions(+), 118 deletions(-) create mode 100644 server/base/src/main/java/org/apache/accumulo/server/metadata/RemovedCompactionStoreImpl.java diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 4d06f3b1aab..1d874dccf1b 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -25,6 +25,7 @@ import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Stream; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.admin.TabletAvailability; @@ -669,4 +670,24 @@ default void removeBulkLoadInProgressFlag(String path) { default ScanServerRefStore scanServerRefs() { throw new UnsupportedOperationException(); } + + record RemovedCompaction(ExternalCompactionId id, TableId table, String dir) { + }; + + /** + * Tracks compactions that were removed from the metadata table but may still be running on + * compactors. The tmp files associated with these compactions can eventually be removed when the + * compaction is no longer running. + */ + interface RemovedCompactionStore { + Stream list(); + + void add(Collection removedCompactions); + + void delete(Collection removedCompactions); + } + + default RemovedCompactionStore removedCompactions() { + throw new UnsupportedOperationException(); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 39e1c35e3b0..bb1309583e0 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -547,4 +547,36 @@ public static String getRowPrefix() { } + /** + * Holds information about compactions that were deleted from tablets metadata by split or merge + * operations. These may have tmp files that need to be cleaned up. + */ + public static class RemovedCompactionSection { + private static final Section section = + new Section(RESERVED_PREFIX + "rcomp", true, RESERVED_PREFIX + "rcomq", false); + + public static Range getRange() { + return section.getRange(); + } + + public static String getRowPrefix() { + return section.getRowPrefix(); + } + + public static Ample.RemovedCompaction decodeRow(String row) { + String[] fields = row.split("#"); + Preconditions.checkArgument(fields.length == 4); + Preconditions.checkArgument(getRowPrefix().equals(fields[0])); + return new Ample.RemovedCompaction(ExternalCompactionId.from(fields[1]), + TableId.of(fields[2]), fields[3]); + } + + public static String encodeRow(Ample.RemovedCompaction rc) { + // put the compaction id first in the row because its uuid will spread out nicely and avoid + // hot spotting + return getRowPrefix() + "#" + rc.id().canonical() + "#" + rc.table().canonical() + "#" + + rc.dir(); + + } + } } diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 149c12e12be..4e842f3681a 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@ -24,7 +24,6 @@ import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTION_IDS_POOL; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -262,7 +261,7 @@ public static List getCompactionsRunningOnCompactors(ClientContext con } } - public static Collection + public static Set getCompactionIdsRunningOnCompactors(ClientContext context) { final ExecutorService executor = ThreadPools.getServerThreadPools() .getPoolBuilder(COMPACTOR_RUNNING_COMPACTION_IDS_POOL).numCoreThreads(16).build(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/RemovedCompactionStoreImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/RemovedCompactionStoreImpl.java new file mode 100644 index 00000000000..ceb405e13ea --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/RemovedCompactionStoreImpl.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metadata; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.metadata.SystemTables; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.RemovedCompactionSection; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.server.ServerContext; + +import com.google.common.base.Preconditions; + +public class RemovedCompactionStoreImpl implements Ample.RemovedCompactionStore { + private final ServerContext context; + + public RemovedCompactionStoreImpl(ServerContext context) { + this.context = context; + } + + private Stream createStream(String tableName) { + Scanner scanner = null; + try { + scanner = context.createScanner(tableName, Authorizations.EMPTY); + } catch (TableNotFoundException e) { + throw new IllegalStateException(e); + } + scanner.setRange(RemovedCompactionSection.getRange()); + return scanner.stream().map(e -> e.getKey().getRowData().toString()) + .map(RemovedCompactionSection::decodeRow).onClose(scanner::close); + } + + @Override + public Stream list() { + return Stream.concat(createStream(SystemTables.ROOT.tableName()), + createStream(SystemTables.METADATA.tableName())); + } + + private void write(Collection removedCompactions, + Function converter) { + Map> byLevel = removedCompactions.stream() + .collect(Collectors.groupingBy(rc -> Ample.DataLevel.of(rc.table()))); + // Do not expect the root to split or merge so it should never have this data + Preconditions.checkArgument(!byLevel.containsKey(Ample.DataLevel.ROOT)); + byLevel.forEach((dl, removed) -> { + try (var writer = context.createBatchWriter(dl.metaTable())) { + for (var rc : removed) { + writer.addMutation(converter.apply(rc)); + } + } catch (TableNotFoundException | MutationsRejectedException e) { + throw new IllegalStateException(e); + } + }); + } + + @Override + public void add(Collection removedCompactions) { + if (removedCompactions.isEmpty()) { + return; + } + + write(removedCompactions, rc -> { + Mutation m = new Mutation(RemovedCompactionSection.encodeRow(rc)); + m.put("", "", ""); + return m; + }); + + } + + @Override + public void delete(Collection removedCompactions) { + if (removedCompactions.isEmpty()) { + return; + } + + write(removedCompactions, rc -> { + Mutation m = new Mutation(RemovedCompactionSection.encodeRow(rc)); + m.putDelete("", ""); + return m; + }); + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index a4eb470c664..35fc63fe3ff 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -281,6 +281,11 @@ public ScanServerRefStore scanServerRefs() { return scanServerRefStore; } + @Override + public RemovedCompactionStore removedCompactions() { + return new RemovedCompactionStoreImpl(getContext()); + } + @VisibleForTesting protected ServerContext getContext() { return context; diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java b/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java index 06755d1807b..c27a65639a4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java @@ -18,11 +18,13 @@ */ package org.apache.accumulo.server.util; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; @@ -32,7 +34,9 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.ServerOpts; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; @@ -46,6 +50,7 @@ import org.apache.accumulo.start.spi.CommandGroups; import org.apache.accumulo.start.spi.KeywordExecutable; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,6 +132,7 @@ public static Set findTempFiles(ServerContext context, String tableId) }); } } + LOG.trace("Final set of compaction tmp files after removing active compactions: {}", matches); return matches; } @@ -141,7 +147,8 @@ public static DeleteStats deleteTempFiles(ServerContext context, Set files throws InterruptedException { final ExecutorService delSvc = Executors.newFixedThreadPool(8); - final List> futures = new ArrayList<>(filesToDelete.size()); + // use a linked list to make removal from the middle of the list quick + final List> futures = new LinkedList<>(); final DeleteStats stats = new DeleteStats(); filesToDelete.forEach(p -> { @@ -190,6 +197,39 @@ public static DeleteStats deleteTempFiles(ServerContext context, Set files return stats; } + // Finds any tmp files matching the given compaction ids in table dir and deletes them. + public static void deleteTmpFiles(ServerContext ctx, TableId tableId, String dirName, + Set ecidsForTablet) { + final Collection vols = ctx.getVolumeManager().getVolumes(); + for (Volume vol : vols) { + try { + final String volPath = vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + + tableId.canonical() + Path.SEPARATOR + dirName; + final FileSystem fs = vol.getFileSystem(); + for (ExternalCompactionId ecid : ecidsForTablet) { + final String fileSuffix = "_tmp_" + ecid.canonical(); + FileStatus[] files = null; + try { + files = fs.listStatus(new Path(volPath), (path) -> path.getName().endsWith(fileSuffix)); + } catch (FileNotFoundException e) { + LOG.trace("Failed to list tablet dir {}", volPath, e); + } + if (files != null) { + for (FileStatus file : files) { + if (!fs.delete(file.getPath(), false)) { + LOG.warn("Unable to delete ecid tmp file: {}: ", file.getPath()); + } else { + LOG.debug("Deleted ecid tmp file: {}", file.getPath()); + } + } + } + } + } catch (IOException e) { + LOG.error("Exception deleting compaction tmp files for table: {}", tableId, e); + } + } + } + public FindCompactionTmpFiles() { super(new FindOpts()); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 51685c166cd..2e4f8b52b6e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -93,8 +93,6 @@ import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; -import org.apache.accumulo.core.metadata.schema.TabletsMetadata; -import org.apache.accumulo.core.metadata.schema.filters.HasExternalCompactionsFilter; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.spi.compaction.CompactionJob; @@ -108,7 +106,6 @@ import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; -import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction; import org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData; @@ -122,8 +119,8 @@ import org.apache.accumulo.server.compaction.CompactionPluginUtils; import org.apache.accumulo.server.security.AuditedSecurityOperation; import org.apache.accumulo.server.tablets.TabletNameGenerator; +import org.apache.accumulo.server.util.FindCompactionTmpFiles; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -765,6 +762,21 @@ void compactionsFailed(Map compactions) { void compactionFailedForLevel(Map> compactions) { + // CompactionFailed is called from the Compactor when either a compaction fails or is cancelled + // and it's called from the DeadCompactionDetector. Remove compaction tmp files from the tablet + // directory that have a corresponding ecid in the name. Must delete any tmp files before + // removing compaction entry from metadata table. This ensures that in the event of process + // death that the dead compaction will be detected in the future and the files removed then. + try (var tablets = ctx.getAmple().readTablets() + .forTablets(compactions.keySet(), Optional.empty()).fetch(ColumnType.DIR).build()) { + for (TabletMetadata tm : tablets) { + var extent = tm.getExtent(); + var ecidsForTablet = compactions.get(extent); + FindCompactionTmpFiles.deleteTmpFiles(ctx, extent.tableId(), tm.getDirName(), + ecidsForTablet); + } + } + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { compactions.forEach((extent, ecids) -> { try { @@ -791,78 +803,18 @@ public boolean test(TabletMetadata tabletMetadata) { } }); - final List ecidsForTablet = new ArrayList<>(); tabletsMutator.process().forEach((extent, result) -> { if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) { - // this should try again later when the dead compaction detector runs, lets log it in case // its a persistent problem if (LOG.isDebugEnabled()) { LOG.debug("Unable to remove failed compaction {} {}", extent, compactions.get(extent)); } - } else { - // compactionFailed is called from the Compactor when either a compaction fails or - // is cancelled and it's called from the DeadCompactionDetector. This block is - // entered when the conditional mutator above successfully deletes an ecid from - // the tablet metadata. Remove compaction tmp files from the tablet directory - // that have a corresponding ecid in the name. - - ecidsForTablet.clear(); - ecidsForTablet.addAll(compactions.get(extent)); - - if (!ecidsForTablet.isEmpty()) { - final TabletMetadata tm = ctx.getAmple().readTablet(extent, ColumnType.DIR); - if (tm != null) { - final Collection vols = ctx.getVolumeManager().getVolumes(); - for (Volume vol : vols) { - try { - final String volPath = - vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR - + extent.tableId().canonical() + Path.SEPARATOR + tm.getDirName(); - final FileSystem fs = vol.getFileSystem(); - for (ExternalCompactionId ecid : ecidsForTablet) { - final String fileSuffix = "_tmp_" + ecid.canonical(); - FileStatus[] files = null; - try { - files = fs.listStatus(new Path(volPath), - (path) -> path.getName().endsWith(fileSuffix)); - } catch (FileNotFoundException e) { - LOG.trace("Failed to list tablet dir {}", volPath, e); - } - if (files != null) { - for (FileStatus file : files) { - if (!fs.delete(file.getPath(), false)) { - LOG.warn("Unable to delete ecid tmp file: {}: ", file.getPath()); - } else { - LOG.debug("Deleted ecid tmp file: {}", file.getPath()); - } - } - } - } - } catch (IOException e) { - LOG.error("Exception deleting compaction tmp files for tablet: {}", extent, e); - } - } - } else { - // TabletMetadata does not exist for the extent. This could be due to a merge or - // split operation. Use the utility to find tmp files at the table level - deadCompactionDetector.addTableId(extent.tableId()); - } - } } }); } } - protected Set readExternalCompactionIds() { - try (TabletsMetadata tabletsMetadata = - this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER) - .filter(new HasExternalCompactionsFilter()).fetch(ECOMP).build()) { - return tabletsMetadata.stream().flatMap(tm -> tm.getExternalCompactions().keySet().stream()) - .collect(Collectors.toSet()); - } - } - /* Method exists to be called from test */ public CompactionJobQueues getJobQueues() { return jobQueues; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java index ce04296a615..89610550971 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java @@ -18,11 +18,12 @@ */ package org.apache.accumulo.manager.compaction.coordinator; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -33,11 +34,11 @@ import java.util.stream.Stream; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateClient; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -49,8 +50,6 @@ import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.util.FindCompactionTmpFiles; -import org.apache.accumulo.server.util.FindCompactionTmpFiles.DeleteStats; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +61,6 @@ public class DeadCompactionDetector { private final CompactionCoordinator coordinator; private final ScheduledThreadPoolExecutor schedExecutor; private final ConcurrentHashMap deadCompactions; - private final Set tablesWithUnreferencedTmpFiles = new HashSet<>(); private final Function> fateClients; public DeadCompactionDetector(ServerContext context, CompactionCoordinator coordinator, @@ -75,12 +73,6 @@ public DeadCompactionDetector(ServerContext context, CompactionCoordinator coord this.fateClients = fateClients; } - public void addTableId(TableId tableWithUnreferencedTmpFiles) { - synchronized (tablesWithUnreferencedTmpFiles) { - tablesWithUnreferencedTmpFiles.add(tableWithUnreferencedTmpFiles); - } - } - private void detectDeadCompactions() { /* @@ -162,6 +154,31 @@ private void detectDeadCompactions() { }); } + // Get the list of compaction entires that were removed from the metadata table by a split or + // merge operation. Must get this data before getting the running set of compactions. + List removedCompactions; + try (Stream listing = context.getAmple().removedCompactions().list()) { + removedCompactions = listing.collect(Collectors.toCollection(ArrayList::new)); + } + + // Must get the set of running compactions after reading compaction ids from the metadata table + Set running = null; + if (!removedCompactions.isEmpty() || !tabletCompactions.isEmpty()) { + running = ExternalCompactionUtil.getCompactionIdsRunningOnCompactors(context); + } + + // Delete any tmp files related to compaction metadata entries that were removed by split or + // merge and are no longer running. + if (!removedCompactions.isEmpty()) { + var runningSet = Objects.requireNonNull(running); + removedCompactions.removeIf(rc -> runningSet.contains(rc.id())); + removedCompactions.forEach(rc -> { + log.trace("attempting to delete tmp files for removed compaction {}", rc); + FindCompactionTmpFiles.deleteTmpFiles(context, rc.table(), rc.dir(), Set.of(rc.id())); + }); + context.getAmple().removedCompactions().delete(removedCompactions); + } + if (tabletCompactions.isEmpty()) { // Clear out dead compactions, tservers don't think anything is running log.trace("Clearing the dead compaction map, no tablets have compactions running"); @@ -183,9 +200,6 @@ private void detectDeadCompactions() { // In order for this overall algorithm to be correct and avoid race conditions, the compactor // must return ids covering the time period from before reservation until after commit. If the // ids do not cover this time period then legitimate running compactions could be canceled. - Collection running = - ExternalCompactionUtil.getCompactionIdsRunningOnCompactors(context); - running.forEach(ecid -> { if (tabletCompactions.remove(ecid) != null) { log.debug("Ignoring compaction {} that is running on a compactor", ecid); @@ -230,37 +244,6 @@ private void detectDeadCompactions() { coordinator.compactionsFailed(tabletCompactions); this.deadCompactions.keySet().removeAll(toFail); } - - // Find and delete compaction tmp files that are unreferenced - if (!tablesWithUnreferencedTmpFiles.isEmpty()) { - - Set copy = new HashSet<>(); - synchronized (tablesWithUnreferencedTmpFiles) { - copy.addAll(tablesWithUnreferencedTmpFiles); - tablesWithUnreferencedTmpFiles.clear(); - } - - log.debug("Tables that may have unreferenced compaction tmp files: {}", copy); - for (TableId tid : copy) { - try { - final Set matches = FindCompactionTmpFiles.findTempFiles(context, tid.canonical()); - log.debug("Found the following compaction tmp files for table {}:", tid); - matches.forEach(p -> log.debug("{}", p)); - - if (!matches.isEmpty()) { - log.debug("Deleting compaction tmp files for table {}...", tid); - DeleteStats stats = FindCompactionTmpFiles.deleteTempFiles(context, matches); - log.debug( - "Deletion of compaction tmp files for table {} complete. Success:{}, Failure:{}, Error:{}", - tid, stats.success, stats.failure, stats.error); - } - } catch (InterruptedException e) { - log.error("Interrupted while finding compaction tmp files for table: {}", tid.canonical(), - e); - } - } - } - } public void start() { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java index 8e8241fb92d..5ec0954e146 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java @@ -75,6 +75,7 @@ public Repo call(FateId fateId, FateEnv env) throws Exception { Map newFiles = new HashMap<>(); TabletMetadata firstTabletMeta = null; TabletMetadata lastTabletMeta = null; + List removedCompactions = new ArrayList<>(); try (var tabletsMetadata = env.getContext().getAmple().readTablets().forTable(range.tableId()) .overlapping(range.prevEndRow(), range.endRow()).build()) { @@ -141,6 +142,19 @@ public Repo call(FateId fateId, FateEnv env) throws Exception { dirs.clear(); } } + + // These compaction metadata entries will be deleted, queue up removal of the tmp file once + // the compaction is no longer running + tabletMeta.getExternalCompactions().keySet().stream() + .map(ecid -> new Ample.RemovedCompaction(ecid, tabletMeta.getExtent().tableId(), + tabletMeta.getDirName())) + .forEach(removedCompactions::add); + if (removedCompactions.size() > 1000 && tabletsSeen > 1) { + removedCompactions + .forEach(rc -> log.trace("{} adding removed compaction {}", fateId, rc)); + env.getContext().getAmple().removedCompactions().add(removedCompactions); + removedCompactions.clear(); + } } if (tabletsSeen == 1) { @@ -154,6 +168,9 @@ public Repo call(FateId fateId, FateEnv env) throws Exception { lastTabletMeta); } + removedCompactions.forEach(rc -> log.trace("{} adding removed compaction {}", fateId, rc)); + env.getContext().getAmple().removedCompactions().add(removedCompactions); + log.info("{} merge low tablet {}", fateId, firstTabletMeta.getExtent()); log.info("{} merge high tablet {}", fateId, lastTabletMeta.getExtent()); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index c12386a3721..7f387f64de8 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -266,6 +266,16 @@ private void addNewTablets(FateId fateId, FateEnv env, TabletMetadata tabletMeta private void updateExistingTablet(FateId fateId, ServerContext ctx, TabletMetadata tabletMetadata, TabletOperationId opid, NavigableMap newTablets, Map> newTabletsFiles) { + + // queue up the tmp files related to these compaction metadata entries to be eventually deleted + // once the compaction is no longer running + var removedCompactions = tabletMetadata.getExternalCompactions().keySet().stream() + .map(ecid -> new Ample.RemovedCompaction(ecid, tabletMetadata.getExtent().tableId(), + tabletMetadata.getDirName())) + .toList(); + removedCompactions.forEach(rc -> log.trace("{} adding removed compaction {}", fateId, rc)); + ctx.getAmple().removedCompactions().add(removedCompactions); + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { var newExtent = newTablets.navigableKeySet().last(); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java index 9e3037dbe01..416a76defed 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java @@ -44,11 +44,15 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.UNSPLITTABLE; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; import static org.apache.accumulo.manager.tableOps.split.UpdateTabletsTest.newSTF; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.OptionalLong; @@ -56,6 +60,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.stream.Stream; import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.data.NamespaceId; @@ -202,9 +207,12 @@ public void testManyColumns() throws Exception { EasyMock.expect(lastTabletMeta.getUnSplittable()).andReturn(unsplittableMeta).atLeastOnce(); EasyMock.expect(lastTabletMeta.getTabletMergeability()).andReturn(mergeability).atLeastOnce(); EasyMock.expect(lastTabletMeta.getMigration()).andReturn(migration).atLeastOnce(); + EasyMock.expect(lastTabletMeta.getDirName()).andReturn("td3").atLeastOnce(); EasyMock.replay(lastTabletMeta, compactions); + Set removedCompactions = new HashSet<>(); + testMerge(List.of(tablet1, tablet2, lastTabletMeta), tableId, null, null, tabletMutator -> { EasyMock.expect(tabletMutator.putTime(MetadataTime.parse("L30"))).andReturn(tabletMutator) .once(); @@ -243,9 +251,13 @@ public void testManyColumns() throws Exception { .andReturn(tabletMutator).once(); EasyMock.expect(tabletMutator.deleteMigration()).andReturn(tabletMutator); - }); + }, removedCompactions::add); EasyMock.verify(lastTabletMeta, compactions); + + assertEquals(Set.of(new Ample.RemovedCompaction(cid1, tableId, "td3"), + new Ample.RemovedCompaction(cid2, tableId, "td3"), + new Ample.RemovedCompaction(cid3, tableId, "td3")), removedCompactions); } @Test @@ -420,6 +432,12 @@ public void testTime() throws Exception { private static void testMerge(List inputTablets, TableId tableId, String start, String end, Consumer expectationsSetter) throws Exception { + testMerge(inputTablets, tableId, start, end, expectationsSetter, removedCompaction -> fail()); + } + + private static void testMerge(List inputTablets, TableId tableId, String start, + String end, Consumer expectationsSetter, + Consumer removedCompactionConsumer) throws Exception { MergeInfo mergeInfo = new MergeInfo(tableId, NamespaceId.of("1"), start == null ? null : start.getBytes(UTF_8), end == null ? null : end.getBytes(UTF_8), MergeInfo.Operation.MERGE); @@ -434,6 +452,25 @@ private static void testMerge(List inputTablets, TableId tableId EasyMock.mock(ConditionalTabletsMutatorImpl.class); ConditionalTabletMutatorImpl tabletMutator = EasyMock.mock(ConditionalTabletMutatorImpl.class); + Ample.RemovedCompactionStore removedCompactionStore = new Ample.RemovedCompactionStore() { + @Override + public Stream list() { + throw new UnsupportedOperationException(); + } + + @Override + public void add(Collection removedCompactions) { + System.out.println("removedCompactions : " + removedCompactions); + removedCompactions.forEach(removedCompactionConsumer); + } + + @Override + public void delete(Collection removedCompactions) { + throw new UnsupportedOperationException(); + } + }; + EasyMock.expect(ample.removedCompactions()).andReturn(removedCompactionStore); + ServiceLock managerLock = EasyMock.mock(ServiceLock.class); EasyMock.expect(context.getServiceLock()).andReturn(managerLock).anyTimes(); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java index 6ceac3d6810..044cf08da07 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java @@ -24,7 +24,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.OptionalLong; @@ -37,6 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.client.admin.TabletMergeability; @@ -250,6 +253,24 @@ public void testManyColumns() throws Exception { EasyMock.expect(manager.getFileRangeCache()).andReturn(fileRangeCache).atLeastOnce(); EasyMock.expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100_000, TimeUnit.SECONDS)) .atLeastOnce(); + Set removedCompactionSet = new HashSet<>(); + Ample.RemovedCompactionStore rcs = new Ample.RemovedCompactionStore() { + @Override + public Stream list() { + throw new UnsupportedOperationException(); + } + + @Override + public void add(Collection removedCompactions) { + removedCompactionSet.addAll(removedCompactions); + } + + @Override + public void delete(Collection removedCompactions) { + throw new UnsupportedOperationException(); + } + }; + EasyMock.expect(ample.removedCompactions()).andReturn(rcs).atLeastOnce(); ServiceLock managerLock = EasyMock.mock(ServiceLock.class); EasyMock.expect(context.getServiceLock()).andReturn(managerLock).anyTimes(); @@ -289,6 +310,7 @@ public void testManyColumns() throws Exception { UnSplittableMetadata.toUnSplittable(origExtent, 1000, 1001, 1002, tabletFiles.keySet()); EasyMock.expect(tabletMeta.getUnSplittable()).andReturn(usm).atLeastOnce(); EasyMock.expect(tabletMeta.getMigration()).andReturn(migration).atLeastOnce(); + EasyMock.expect(tabletMeta.getDirName()).andReturn("td1").atLeastOnce(); EasyMock.expect(ample.readTablet(origExtent)).andReturn(tabletMeta); @@ -408,6 +430,10 @@ public void testManyColumns() throws Exception { EasyMock.verify(manager, context, ample, tabletMeta, fileRangeCache, tabletsMutator, tablet1Mutator, tablet2Mutator, tablet3Mutator, cr, compactions); + + assertEquals(Set.of(new Ample.RemovedCompaction(cid1, tableId, "td1"), + new Ample.RemovedCompaction(cid2, tableId, "td1"), + new Ample.RemovedCompaction(cid3, tableId, "td1")), removedCompactionSet); } @Test diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java index 0101865b3fa..1ed8e81fecd 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java @@ -150,14 +150,16 @@ public void testMergeCancelsExternalCompaction() throws Exception { .collect(Collectors.toSet()); } } - // We need to cancel the compaction or delete the table here because we initiate a user - // compaction above in the test. Even though the external compaction was cancelled - // because we split the table, FaTE will continue to queue up a compaction - client.tableOperations().delete(table1); // Verify that the tmp file are cleaned up Wait.waitFor(() -> FindCompactionTmpFiles .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 0); + + // We need to cancel the compaction or delete the table here because we initiate a user + // compaction above in the test. Even though the external compaction was cancelled + // because we split the table, FaTE would continue to queue up a compaction + client.tableOperations().cancelCompaction(table1); + } finally { getCluster().getClusterControl().stop(ServerType.COMPACTOR); getCluster().getClusterControl().start(ServerType.COMPACTOR); From 6ad2090f91238bfe96373a92ec0cd452bb7c3562 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 31 Mar 2026 23:08:36 +0000 Subject: [PATCH 2/2] consolidate code --- .../server/metadata/RemovedCompactionStoreImpl.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/RemovedCompactionStoreImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/RemovedCompactionStoreImpl.java index ceb405e13ea..5dafd5a2746 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/RemovedCompactionStoreImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/RemovedCompactionStoreImpl.java @@ -64,6 +64,10 @@ public Stream list() { private void write(Collection removedCompactions, Function converter) { + if (removedCompactions.isEmpty()) { + return; + } + Map> byLevel = removedCompactions.stream() .collect(Collectors.groupingBy(rc -> Ample.DataLevel.of(rc.table()))); // Do not expect the root to split or merge so it should never have this data @@ -81,10 +85,6 @@ private void write(Collection removedCompactions, @Override public void add(Collection removedCompactions) { - if (removedCompactions.isEmpty()) { - return; - } - write(removedCompactions, rc -> { Mutation m = new Mutation(RemovedCompactionSection.encodeRow(rc)); m.put("", "", ""); @@ -95,10 +95,6 @@ public void add(Collection removedCompactions) { @Override public void delete(Collection removedCompactions) { - if (removedCompactions.isEmpty()) { - return; - } - write(removedCompactions, rc -> { Mutation m = new Mutation(RemovedCompactionSection.encodeRow(rc)); m.putDelete("", "");