Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.admin.TabletAvailability;
Expand Down Expand Up @@ -669,4 +670,24 @@ default void removeBulkLoadInProgressFlag(String path) {
default ScanServerRefStore scanServerRefs() {
throw new UnsupportedOperationException();
}

record RemovedCompaction(ExternalCompactionId id, TableId table, String dir) {
};

/**
* Tracks compactions that were removed from the metadata table but may still be running on
* compactors. The tmp files associated with these compactions can eventually be removed when the
* compaction is no longer running.
*/
interface RemovedCompactionStore {
Stream<RemovedCompaction> list();

void add(Collection<RemovedCompaction> removedCompactions);

void delete(Collection<RemovedCompaction> removedCompactions);
}

default RemovedCompactionStore removedCompactions() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -547,4 +547,36 @@ public static String getRowPrefix() {

}

/**
* Holds information about compactions that were deleted from tablets metadata by split or merge
* operations. These may have tmp files that need to be cleaned up.
*/
public static class RemovedCompactionSection {
private static final Section section =
new Section(RESERVED_PREFIX + "rcomp", true, RESERVED_PREFIX + "rcomq", false);

public static Range getRange() {
return section.getRange();
}

public static String getRowPrefix() {
return section.getRowPrefix();
}

public static Ample.RemovedCompaction decodeRow(String row) {
String[] fields = row.split("#");
Preconditions.checkArgument(fields.length == 4);
Preconditions.checkArgument(getRowPrefix().equals(fields[0]));
return new Ample.RemovedCompaction(ExternalCompactionId.from(fields[1]),
TableId.of(fields[2]), fields[3]);
}

public static String encodeRow(Ample.RemovedCompaction rc) {
// put the compaction id first in the row because its uuid will spread out nicely and avoid
// hot spotting
return getRowPrefix() + "#" + rc.id().canonical() + "#" + rc.table().canonical() + "#"
+ rc.dir();

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTION_IDS_POOL;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -262,7 +261,7 @@ public static List<ServerId> getCompactionsRunningOnCompactors(ClientContext con
}
}

public static Collection<ExternalCompactionId>
public static Set<ExternalCompactionId>
getCompactionIdsRunningOnCompactors(ClientContext context) {
final ExecutorService executor = ThreadPools.getServerThreadPools()
.getPoolBuilder(COMPACTOR_RUNNING_COMPACTION_IDS_POOL).numCoreThreads(16).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.metadata;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.metadata.SystemTables;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.RemovedCompactionSection;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.server.ServerContext;

import com.google.common.base.Preconditions;

public class RemovedCompactionStoreImpl implements Ample.RemovedCompactionStore {
private final ServerContext context;

public RemovedCompactionStoreImpl(ServerContext context) {
this.context = context;
}

private Stream<Ample.RemovedCompaction> createStream(String tableName) {
Scanner scanner = null;
try {
scanner = context.createScanner(tableName, Authorizations.EMPTY);
} catch (TableNotFoundException e) {
throw new IllegalStateException(e);
}
scanner.setRange(RemovedCompactionSection.getRange());
return scanner.stream().map(e -> e.getKey().getRowData().toString())
.map(RemovedCompactionSection::decodeRow).onClose(scanner::close);
}

@Override
public Stream<Ample.RemovedCompaction> list() {
return Stream.concat(createStream(SystemTables.ROOT.tableName()),
createStream(SystemTables.METADATA.tableName()));
}

private void write(Collection<Ample.RemovedCompaction> removedCompactions,
Function<Ample.RemovedCompaction,Mutation> converter) {
if (removedCompactions.isEmpty()) {
return;
}

Map<Ample.DataLevel,List<Ample.RemovedCompaction>> byLevel = removedCompactions.stream()
.collect(Collectors.groupingBy(rc -> Ample.DataLevel.of(rc.table())));
// Do not expect the root to split or merge so it should never have this data
Preconditions.checkArgument(!byLevel.containsKey(Ample.DataLevel.ROOT));
byLevel.forEach((dl, removed) -> {
try (var writer = context.createBatchWriter(dl.metaTable())) {
for (var rc : removed) {
writer.addMutation(converter.apply(rc));
}
} catch (TableNotFoundException | MutationsRejectedException e) {
throw new IllegalStateException(e);
}
});
}

@Override
public void add(Collection<Ample.RemovedCompaction> removedCompactions) {
write(removedCompactions, rc -> {
Mutation m = new Mutation(RemovedCompactionSection.encodeRow(rc));
m.put("", "", "");
return m;
});

}

@Override
public void delete(Collection<Ample.RemovedCompaction> removedCompactions) {
write(removedCompactions, rc -> {
Mutation m = new Mutation(RemovedCompactionSection.encodeRow(rc));
m.putDelete("", "");
return m;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ public ScanServerRefStore scanServerRefs() {
return scanServerRefStore;
}

@Override
public RemovedCompactionStore removedCompactions() {
return new RemovedCompactionStoreImpl(getContext());
}

@VisibleForTesting
protected ServerContext getContext() {
return context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
*/
package org.apache.accumulo.server.util;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
Expand All @@ -32,7 +34,9 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.ServerOpts;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
Expand All @@ -46,6 +50,7 @@
import org.apache.accumulo.start.spi.CommandGroups;
import org.apache.accumulo.start.spi.KeywordExecutable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -127,6 +132,7 @@ public static Set<Path> findTempFiles(ServerContext context, String tableId)
});
}
}

LOG.trace("Final set of compaction tmp files after removing active compactions: {}", matches);
return matches;
}
Expand All @@ -141,7 +147,8 @@ public static DeleteStats deleteTempFiles(ServerContext context, Set<Path> files
throws InterruptedException {

final ExecutorService delSvc = Executors.newFixedThreadPool(8);
final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
// use a linked list to make removal from the middle of the list quick
final List<Future<Boolean>> futures = new LinkedList<>();
final DeleteStats stats = new DeleteStats();

filesToDelete.forEach(p -> {
Expand Down Expand Up @@ -190,6 +197,39 @@ public static DeleteStats deleteTempFiles(ServerContext context, Set<Path> files
return stats;
}

// Finds any tmp files matching the given compaction ids in table dir and deletes them.
public static void deleteTmpFiles(ServerContext ctx, TableId tableId, String dirName,
Set<ExternalCompactionId> ecidsForTablet) {
final Collection<Volume> vols = ctx.getVolumeManager().getVolumes();
for (Volume vol : vols) {
try {
final String volPath = vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR
+ tableId.canonical() + Path.SEPARATOR + dirName;
final FileSystem fs = vol.getFileSystem();
for (ExternalCompactionId ecid : ecidsForTablet) {
final String fileSuffix = "_tmp_" + ecid.canonical();
FileStatus[] files = null;
try {
files = fs.listStatus(new Path(volPath), (path) -> path.getName().endsWith(fileSuffix));
} catch (FileNotFoundException e) {
LOG.trace("Failed to list tablet dir {}", volPath, e);
}
if (files != null) {
for (FileStatus file : files) {
if (!fs.delete(file.getPath(), false)) {
LOG.warn("Unable to delete ecid tmp file: {}: ", file.getPath());
} else {
LOG.debug("Deleted ecid tmp file: {}", file.getPath());
}
}
}
}
} catch (IOException e) {
LOG.error("Exception deleting compaction tmp files for table: {}", tableId, e);
}
}
}

public FindCompactionTmpFiles() {
super(new FindOpts());
}
Expand Down
Loading