diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 4d06f3b1aab..1d874dccf1b 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -25,6 +25,7 @@ import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Stream; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.admin.TabletAvailability; @@ -669,4 +670,24 @@ default void removeBulkLoadInProgressFlag(String path) { default ScanServerRefStore scanServerRefs() { throw new UnsupportedOperationException(); } + + record RemovedCompaction(ExternalCompactionId id, TableId table, String dir) { + }; + + /** + * Tracks compactions that were removed from the metadata table but may still be running on + * compactors. The tmp files associated with these compactions can eventually be removed when the + * compaction is no longer running. + */ + interface RemovedCompactionStore { + Stream list(); + + void add(Collection removedCompactions); + + void delete(Collection removedCompactions); + } + + default RemovedCompactionStore removedCompactions() { + throw new UnsupportedOperationException(); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 39e1c35e3b0..bb1309583e0 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -547,4 +547,36 @@ public static String getRowPrefix() { } + /** + * Holds information about compactions that were deleted from tablets metadata by split or merge + * operations. These may have tmp files that need to be cleaned up. + */ + public static class RemovedCompactionSection { + private static final Section section = + new Section(RESERVED_PREFIX + "rcomp", true, RESERVED_PREFIX + "rcomq", false); + + public static Range getRange() { + return section.getRange(); + } + + public static String getRowPrefix() { + return section.getRowPrefix(); + } + + public static Ample.RemovedCompaction decodeRow(String row) { + String[] fields = row.split("#"); + Preconditions.checkArgument(fields.length == 4); + Preconditions.checkArgument(getRowPrefix().equals(fields[0])); + return new Ample.RemovedCompaction(ExternalCompactionId.from(fields[1]), + TableId.of(fields[2]), fields[3]); + } + + public static String encodeRow(Ample.RemovedCompaction rc) { + // put the compaction id first in the row because its uuid will spread out nicely and avoid + // hot spotting + return getRowPrefix() + "#" + rc.id().canonical() + "#" + rc.table().canonical() + "#" + + rc.dir(); + + } + } } diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 149c12e12be..4e842f3681a 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@ -24,7 +24,6 @@ import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTION_IDS_POOL; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -262,7 +261,7 @@ public static List getCompactionsRunningOnCompactors(ClientContext con } } - public static Collection + public static Set getCompactionIdsRunningOnCompactors(ClientContext context) { final ExecutorService executor = ThreadPools.getServerThreadPools() .getPoolBuilder(COMPACTOR_RUNNING_COMPACTION_IDS_POOL).numCoreThreads(16).build(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/RemovedCompactionStoreImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/RemovedCompactionStoreImpl.java new file mode 100644 index 00000000000..5dafd5a2746 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/RemovedCompactionStoreImpl.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metadata; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.metadata.SystemTables; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.RemovedCompactionSection; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.server.ServerContext; + +import com.google.common.base.Preconditions; + +public class RemovedCompactionStoreImpl implements Ample.RemovedCompactionStore { + private final ServerContext context; + + public RemovedCompactionStoreImpl(ServerContext context) { + this.context = context; + } + + private Stream createStream(String tableName) { + Scanner scanner = null; + try { + scanner = context.createScanner(tableName, Authorizations.EMPTY); + } catch (TableNotFoundException e) { + throw new IllegalStateException(e); + } + scanner.setRange(RemovedCompactionSection.getRange()); + return scanner.stream().map(e -> e.getKey().getRowData().toString()) + .map(RemovedCompactionSection::decodeRow).onClose(scanner::close); + } + + @Override + public Stream list() { + return Stream.concat(createStream(SystemTables.ROOT.tableName()), + createStream(SystemTables.METADATA.tableName())); + } + + private void write(Collection removedCompactions, + Function converter) { + if (removedCompactions.isEmpty()) { + return; + } + + Map> byLevel = removedCompactions.stream() + .collect(Collectors.groupingBy(rc -> Ample.DataLevel.of(rc.table()))); + // Do not expect the root to split or merge so it should never have this data + Preconditions.checkArgument(!byLevel.containsKey(Ample.DataLevel.ROOT)); + byLevel.forEach((dl, removed) -> { + try (var writer = context.createBatchWriter(dl.metaTable())) { + for (var rc : removed) { + writer.addMutation(converter.apply(rc)); + } + } catch (TableNotFoundException | MutationsRejectedException e) { + throw new IllegalStateException(e); + } + }); + } + + @Override + public void add(Collection removedCompactions) { + write(removedCompactions, rc -> { + Mutation m = new Mutation(RemovedCompactionSection.encodeRow(rc)); + m.put("", "", ""); + return m; + }); + + } + + @Override + public void delete(Collection removedCompactions) { + write(removedCompactions, rc -> { + Mutation m = new Mutation(RemovedCompactionSection.encodeRow(rc)); + m.putDelete("", ""); + return m; + }); + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index a4eb470c664..35fc63fe3ff 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -281,6 +281,11 @@ public ScanServerRefStore scanServerRefs() { return scanServerRefStore; } + @Override + public RemovedCompactionStore removedCompactions() { + return new RemovedCompactionStoreImpl(getContext()); + } + @VisibleForTesting protected ServerContext getContext() { return context; diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java b/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java index 06755d1807b..c27a65639a4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java @@ -18,11 +18,13 @@ */ package org.apache.accumulo.server.util; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; @@ -32,7 +34,9 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.ServerOpts; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; @@ -46,6 +50,7 @@ import org.apache.accumulo.start.spi.CommandGroups; import org.apache.accumulo.start.spi.KeywordExecutable; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,6 +132,7 @@ public static Set findTempFiles(ServerContext context, String tableId) }); } } + LOG.trace("Final set of compaction tmp files after removing active compactions: {}", matches); return matches; } @@ -141,7 +147,8 @@ public static DeleteStats deleteTempFiles(ServerContext context, Set files throws InterruptedException { final ExecutorService delSvc = Executors.newFixedThreadPool(8); - final List> futures = new ArrayList<>(filesToDelete.size()); + // use a linked list to make removal from the middle of the list quick + final List> futures = new LinkedList<>(); final DeleteStats stats = new DeleteStats(); filesToDelete.forEach(p -> { @@ -190,6 +197,39 @@ public static DeleteStats deleteTempFiles(ServerContext context, Set files return stats; } + // Finds any tmp files matching the given compaction ids in table dir and deletes them. + public static void deleteTmpFiles(ServerContext ctx, TableId tableId, String dirName, + Set ecidsForTablet) { + final Collection vols = ctx.getVolumeManager().getVolumes(); + for (Volume vol : vols) { + try { + final String volPath = vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + + tableId.canonical() + Path.SEPARATOR + dirName; + final FileSystem fs = vol.getFileSystem(); + for (ExternalCompactionId ecid : ecidsForTablet) { + final String fileSuffix = "_tmp_" + ecid.canonical(); + FileStatus[] files = null; + try { + files = fs.listStatus(new Path(volPath), (path) -> path.getName().endsWith(fileSuffix)); + } catch (FileNotFoundException e) { + LOG.trace("Failed to list tablet dir {}", volPath, e); + } + if (files != null) { + for (FileStatus file : files) { + if (!fs.delete(file.getPath(), false)) { + LOG.warn("Unable to delete ecid tmp file: {}: ", file.getPath()); + } else { + LOG.debug("Deleted ecid tmp file: {}", file.getPath()); + } + } + } + } + } catch (IOException e) { + LOG.error("Exception deleting compaction tmp files for table: {}", tableId, e); + } + } + } + public FindCompactionTmpFiles() { super(new FindOpts()); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 51685c166cd..2e4f8b52b6e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -93,8 +93,6 @@ import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; -import org.apache.accumulo.core.metadata.schema.TabletsMetadata; -import org.apache.accumulo.core.metadata.schema.filters.HasExternalCompactionsFilter; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.spi.compaction.CompactionJob; @@ -108,7 +106,6 @@ import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; -import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction; import org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData; @@ -122,8 +119,8 @@ import org.apache.accumulo.server.compaction.CompactionPluginUtils; import org.apache.accumulo.server.security.AuditedSecurityOperation; import org.apache.accumulo.server.tablets.TabletNameGenerator; +import org.apache.accumulo.server.util.FindCompactionTmpFiles; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -765,6 +762,21 @@ void compactionsFailed(Map compactions) { void compactionFailedForLevel(Map> compactions) { + // CompactionFailed is called from the Compactor when either a compaction fails or is cancelled + // and it's called from the DeadCompactionDetector. Remove compaction tmp files from the tablet + // directory that have a corresponding ecid in the name. Must delete any tmp files before + // removing compaction entry from metadata table. This ensures that in the event of process + // death that the dead compaction will be detected in the future and the files removed then. + try (var tablets = ctx.getAmple().readTablets() + .forTablets(compactions.keySet(), Optional.empty()).fetch(ColumnType.DIR).build()) { + for (TabletMetadata tm : tablets) { + var extent = tm.getExtent(); + var ecidsForTablet = compactions.get(extent); + FindCompactionTmpFiles.deleteTmpFiles(ctx, extent.tableId(), tm.getDirName(), + ecidsForTablet); + } + } + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { compactions.forEach((extent, ecids) -> { try { @@ -791,78 +803,18 @@ public boolean test(TabletMetadata tabletMetadata) { } }); - final List ecidsForTablet = new ArrayList<>(); tabletsMutator.process().forEach((extent, result) -> { if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) { - // this should try again later when the dead compaction detector runs, lets log it in case // its a persistent problem if (LOG.isDebugEnabled()) { LOG.debug("Unable to remove failed compaction {} {}", extent, compactions.get(extent)); } - } else { - // compactionFailed is called from the Compactor when either a compaction fails or - // is cancelled and it's called from the DeadCompactionDetector. This block is - // entered when the conditional mutator above successfully deletes an ecid from - // the tablet metadata. Remove compaction tmp files from the tablet directory - // that have a corresponding ecid in the name. - - ecidsForTablet.clear(); - ecidsForTablet.addAll(compactions.get(extent)); - - if (!ecidsForTablet.isEmpty()) { - final TabletMetadata tm = ctx.getAmple().readTablet(extent, ColumnType.DIR); - if (tm != null) { - final Collection vols = ctx.getVolumeManager().getVolumes(); - for (Volume vol : vols) { - try { - final String volPath = - vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR - + extent.tableId().canonical() + Path.SEPARATOR + tm.getDirName(); - final FileSystem fs = vol.getFileSystem(); - for (ExternalCompactionId ecid : ecidsForTablet) { - final String fileSuffix = "_tmp_" + ecid.canonical(); - FileStatus[] files = null; - try { - files = fs.listStatus(new Path(volPath), - (path) -> path.getName().endsWith(fileSuffix)); - } catch (FileNotFoundException e) { - LOG.trace("Failed to list tablet dir {}", volPath, e); - } - if (files != null) { - for (FileStatus file : files) { - if (!fs.delete(file.getPath(), false)) { - LOG.warn("Unable to delete ecid tmp file: {}: ", file.getPath()); - } else { - LOG.debug("Deleted ecid tmp file: {}", file.getPath()); - } - } - } - } - } catch (IOException e) { - LOG.error("Exception deleting compaction tmp files for tablet: {}", extent, e); - } - } - } else { - // TabletMetadata does not exist for the extent. This could be due to a merge or - // split operation. Use the utility to find tmp files at the table level - deadCompactionDetector.addTableId(extent.tableId()); - } - } } }); } } - protected Set readExternalCompactionIds() { - try (TabletsMetadata tabletsMetadata = - this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER) - .filter(new HasExternalCompactionsFilter()).fetch(ECOMP).build()) { - return tabletsMetadata.stream().flatMap(tm -> tm.getExternalCompactions().keySet().stream()) - .collect(Collectors.toSet()); - } - } - /* Method exists to be called from test */ public CompactionJobQueues getJobQueues() { return jobQueues; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java index ce04296a615..89610550971 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java @@ -18,11 +18,12 @@ */ package org.apache.accumulo.manager.compaction.coordinator; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -33,11 +34,11 @@ import java.util.stream.Stream; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateClient; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -49,8 +50,6 @@ import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.util.FindCompactionTmpFiles; -import org.apache.accumulo.server.util.FindCompactionTmpFiles.DeleteStats; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +61,6 @@ public class DeadCompactionDetector { private final CompactionCoordinator coordinator; private final ScheduledThreadPoolExecutor schedExecutor; private final ConcurrentHashMap deadCompactions; - private final Set tablesWithUnreferencedTmpFiles = new HashSet<>(); private final Function> fateClients; public DeadCompactionDetector(ServerContext context, CompactionCoordinator coordinator, @@ -75,12 +73,6 @@ public DeadCompactionDetector(ServerContext context, CompactionCoordinator coord this.fateClients = fateClients; } - public void addTableId(TableId tableWithUnreferencedTmpFiles) { - synchronized (tablesWithUnreferencedTmpFiles) { - tablesWithUnreferencedTmpFiles.add(tableWithUnreferencedTmpFiles); - } - } - private void detectDeadCompactions() { /* @@ -162,6 +154,31 @@ private void detectDeadCompactions() { }); } + // Get the list of compaction entires that were removed from the metadata table by a split or + // merge operation. Must get this data before getting the running set of compactions. + List removedCompactions; + try (Stream listing = context.getAmple().removedCompactions().list()) { + removedCompactions = listing.collect(Collectors.toCollection(ArrayList::new)); + } + + // Must get the set of running compactions after reading compaction ids from the metadata table + Set running = null; + if (!removedCompactions.isEmpty() || !tabletCompactions.isEmpty()) { + running = ExternalCompactionUtil.getCompactionIdsRunningOnCompactors(context); + } + + // Delete any tmp files related to compaction metadata entries that were removed by split or + // merge and are no longer running. + if (!removedCompactions.isEmpty()) { + var runningSet = Objects.requireNonNull(running); + removedCompactions.removeIf(rc -> runningSet.contains(rc.id())); + removedCompactions.forEach(rc -> { + log.trace("attempting to delete tmp files for removed compaction {}", rc); + FindCompactionTmpFiles.deleteTmpFiles(context, rc.table(), rc.dir(), Set.of(rc.id())); + }); + context.getAmple().removedCompactions().delete(removedCompactions); + } + if (tabletCompactions.isEmpty()) { // Clear out dead compactions, tservers don't think anything is running log.trace("Clearing the dead compaction map, no tablets have compactions running"); @@ -183,9 +200,6 @@ private void detectDeadCompactions() { // In order for this overall algorithm to be correct and avoid race conditions, the compactor // must return ids covering the time period from before reservation until after commit. If the // ids do not cover this time period then legitimate running compactions could be canceled. - Collection running = - ExternalCompactionUtil.getCompactionIdsRunningOnCompactors(context); - running.forEach(ecid -> { if (tabletCompactions.remove(ecid) != null) { log.debug("Ignoring compaction {} that is running on a compactor", ecid); @@ -230,37 +244,6 @@ private void detectDeadCompactions() { coordinator.compactionsFailed(tabletCompactions); this.deadCompactions.keySet().removeAll(toFail); } - - // Find and delete compaction tmp files that are unreferenced - if (!tablesWithUnreferencedTmpFiles.isEmpty()) { - - Set copy = new HashSet<>(); - synchronized (tablesWithUnreferencedTmpFiles) { - copy.addAll(tablesWithUnreferencedTmpFiles); - tablesWithUnreferencedTmpFiles.clear(); - } - - log.debug("Tables that may have unreferenced compaction tmp files: {}", copy); - for (TableId tid : copy) { - try { - final Set matches = FindCompactionTmpFiles.findTempFiles(context, tid.canonical()); - log.debug("Found the following compaction tmp files for table {}:", tid); - matches.forEach(p -> log.debug("{}", p)); - - if (!matches.isEmpty()) { - log.debug("Deleting compaction tmp files for table {}...", tid); - DeleteStats stats = FindCompactionTmpFiles.deleteTempFiles(context, matches); - log.debug( - "Deletion of compaction tmp files for table {} complete. Success:{}, Failure:{}, Error:{}", - tid, stats.success, stats.failure, stats.error); - } - } catch (InterruptedException e) { - log.error("Interrupted while finding compaction tmp files for table: {}", tid.canonical(), - e); - } - } - } - } public void start() { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java index 8e8241fb92d..5ec0954e146 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java @@ -75,6 +75,7 @@ public Repo call(FateId fateId, FateEnv env) throws Exception { Map newFiles = new HashMap<>(); TabletMetadata firstTabletMeta = null; TabletMetadata lastTabletMeta = null; + List removedCompactions = new ArrayList<>(); try (var tabletsMetadata = env.getContext().getAmple().readTablets().forTable(range.tableId()) .overlapping(range.prevEndRow(), range.endRow()).build()) { @@ -141,6 +142,19 @@ public Repo call(FateId fateId, FateEnv env) throws Exception { dirs.clear(); } } + + // These compaction metadata entries will be deleted, queue up removal of the tmp file once + // the compaction is no longer running + tabletMeta.getExternalCompactions().keySet().stream() + .map(ecid -> new Ample.RemovedCompaction(ecid, tabletMeta.getExtent().tableId(), + tabletMeta.getDirName())) + .forEach(removedCompactions::add); + if (removedCompactions.size() > 1000 && tabletsSeen > 1) { + removedCompactions + .forEach(rc -> log.trace("{} adding removed compaction {}", fateId, rc)); + env.getContext().getAmple().removedCompactions().add(removedCompactions); + removedCompactions.clear(); + } } if (tabletsSeen == 1) { @@ -154,6 +168,9 @@ public Repo call(FateId fateId, FateEnv env) throws Exception { lastTabletMeta); } + removedCompactions.forEach(rc -> log.trace("{} adding removed compaction {}", fateId, rc)); + env.getContext().getAmple().removedCompactions().add(removedCompactions); + log.info("{} merge low tablet {}", fateId, firstTabletMeta.getExtent()); log.info("{} merge high tablet {}", fateId, lastTabletMeta.getExtent()); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index c12386a3721..7f387f64de8 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -266,6 +266,16 @@ private void addNewTablets(FateId fateId, FateEnv env, TabletMetadata tabletMeta private void updateExistingTablet(FateId fateId, ServerContext ctx, TabletMetadata tabletMetadata, TabletOperationId opid, NavigableMap newTablets, Map> newTabletsFiles) { + + // queue up the tmp files related to these compaction metadata entries to be eventually deleted + // once the compaction is no longer running + var removedCompactions = tabletMetadata.getExternalCompactions().keySet().stream() + .map(ecid -> new Ample.RemovedCompaction(ecid, tabletMetadata.getExtent().tableId(), + tabletMetadata.getDirName())) + .toList(); + removedCompactions.forEach(rc -> log.trace("{} adding removed compaction {}", fateId, rc)); + ctx.getAmple().removedCompactions().add(removedCompactions); + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { var newExtent = newTablets.navigableKeySet().last(); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java index 9e3037dbe01..416a76defed 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java @@ -44,11 +44,15 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.UNSPLITTABLE; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; import static org.apache.accumulo.manager.tableOps.split.UpdateTabletsTest.newSTF; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.OptionalLong; @@ -56,6 +60,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.stream.Stream; import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.data.NamespaceId; @@ -202,9 +207,12 @@ public void testManyColumns() throws Exception { EasyMock.expect(lastTabletMeta.getUnSplittable()).andReturn(unsplittableMeta).atLeastOnce(); EasyMock.expect(lastTabletMeta.getTabletMergeability()).andReturn(mergeability).atLeastOnce(); EasyMock.expect(lastTabletMeta.getMigration()).andReturn(migration).atLeastOnce(); + EasyMock.expect(lastTabletMeta.getDirName()).andReturn("td3").atLeastOnce(); EasyMock.replay(lastTabletMeta, compactions); + Set removedCompactions = new HashSet<>(); + testMerge(List.of(tablet1, tablet2, lastTabletMeta), tableId, null, null, tabletMutator -> { EasyMock.expect(tabletMutator.putTime(MetadataTime.parse("L30"))).andReturn(tabletMutator) .once(); @@ -243,9 +251,13 @@ public void testManyColumns() throws Exception { .andReturn(tabletMutator).once(); EasyMock.expect(tabletMutator.deleteMigration()).andReturn(tabletMutator); - }); + }, removedCompactions::add); EasyMock.verify(lastTabletMeta, compactions); + + assertEquals(Set.of(new Ample.RemovedCompaction(cid1, tableId, "td3"), + new Ample.RemovedCompaction(cid2, tableId, "td3"), + new Ample.RemovedCompaction(cid3, tableId, "td3")), removedCompactions); } @Test @@ -420,6 +432,12 @@ public void testTime() throws Exception { private static void testMerge(List inputTablets, TableId tableId, String start, String end, Consumer expectationsSetter) throws Exception { + testMerge(inputTablets, tableId, start, end, expectationsSetter, removedCompaction -> fail()); + } + + private static void testMerge(List inputTablets, TableId tableId, String start, + String end, Consumer expectationsSetter, + Consumer removedCompactionConsumer) throws Exception { MergeInfo mergeInfo = new MergeInfo(tableId, NamespaceId.of("1"), start == null ? null : start.getBytes(UTF_8), end == null ? null : end.getBytes(UTF_8), MergeInfo.Operation.MERGE); @@ -434,6 +452,25 @@ private static void testMerge(List inputTablets, TableId tableId EasyMock.mock(ConditionalTabletsMutatorImpl.class); ConditionalTabletMutatorImpl tabletMutator = EasyMock.mock(ConditionalTabletMutatorImpl.class); + Ample.RemovedCompactionStore removedCompactionStore = new Ample.RemovedCompactionStore() { + @Override + public Stream list() { + throw new UnsupportedOperationException(); + } + + @Override + public void add(Collection removedCompactions) { + System.out.println("removedCompactions : " + removedCompactions); + removedCompactions.forEach(removedCompactionConsumer); + } + + @Override + public void delete(Collection removedCompactions) { + throw new UnsupportedOperationException(); + } + }; + EasyMock.expect(ample.removedCompactions()).andReturn(removedCompactionStore); + ServiceLock managerLock = EasyMock.mock(ServiceLock.class); EasyMock.expect(context.getServiceLock()).andReturn(managerLock).anyTimes(); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java index 6ceac3d6810..044cf08da07 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java @@ -24,7 +24,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.OptionalLong; @@ -37,6 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.client.admin.TabletMergeability; @@ -250,6 +253,24 @@ public void testManyColumns() throws Exception { EasyMock.expect(manager.getFileRangeCache()).andReturn(fileRangeCache).atLeastOnce(); EasyMock.expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100_000, TimeUnit.SECONDS)) .atLeastOnce(); + Set removedCompactionSet = new HashSet<>(); + Ample.RemovedCompactionStore rcs = new Ample.RemovedCompactionStore() { + @Override + public Stream list() { + throw new UnsupportedOperationException(); + } + + @Override + public void add(Collection removedCompactions) { + removedCompactionSet.addAll(removedCompactions); + } + + @Override + public void delete(Collection removedCompactions) { + throw new UnsupportedOperationException(); + } + }; + EasyMock.expect(ample.removedCompactions()).andReturn(rcs).atLeastOnce(); ServiceLock managerLock = EasyMock.mock(ServiceLock.class); EasyMock.expect(context.getServiceLock()).andReturn(managerLock).anyTimes(); @@ -289,6 +310,7 @@ public void testManyColumns() throws Exception { UnSplittableMetadata.toUnSplittable(origExtent, 1000, 1001, 1002, tabletFiles.keySet()); EasyMock.expect(tabletMeta.getUnSplittable()).andReturn(usm).atLeastOnce(); EasyMock.expect(tabletMeta.getMigration()).andReturn(migration).atLeastOnce(); + EasyMock.expect(tabletMeta.getDirName()).andReturn("td1").atLeastOnce(); EasyMock.expect(ample.readTablet(origExtent)).andReturn(tabletMeta); @@ -408,6 +430,10 @@ public void testManyColumns() throws Exception { EasyMock.verify(manager, context, ample, tabletMeta, fileRangeCache, tabletsMutator, tablet1Mutator, tablet2Mutator, tablet3Mutator, cr, compactions); + + assertEquals(Set.of(new Ample.RemovedCompaction(cid1, tableId, "td1"), + new Ample.RemovedCompaction(cid2, tableId, "td1"), + new Ample.RemovedCompaction(cid3, tableId, "td1")), removedCompactionSet); } @Test diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java index 0101865b3fa..1ed8e81fecd 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java @@ -150,14 +150,16 @@ public void testMergeCancelsExternalCompaction() throws Exception { .collect(Collectors.toSet()); } } - // We need to cancel the compaction or delete the table here because we initiate a user - // compaction above in the test. Even though the external compaction was cancelled - // because we split the table, FaTE will continue to queue up a compaction - client.tableOperations().delete(table1); // Verify that the tmp file are cleaned up Wait.waitFor(() -> FindCompactionTmpFiles .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 0); + + // We need to cancel the compaction or delete the table here because we initiate a user + // compaction above in the test. Even though the external compaction was cancelled + // because we split the table, FaTE would continue to queue up a compaction + client.tableOperations().cancelCompaction(table1); + } finally { getCluster().getClusterControl().stop(ServerType.COMPACTOR); getCluster().getClusterControl().start(ServerType.COMPACTOR);