Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public enum CacheName {
BULK_IMPORT_FILE_LENGTHS,
CLASSLOADERS,
COMBINER_LOGGED_MSGS,
COMPACTIONS_COMPLETED,
COMPACTION_CONFIGS,
COMPACTOR_COUNTS,
COMPACTION_DIR_CACHE,
Expand Down

Large diffs are not rendered by default.

11 changes: 0 additions & 11 deletions core/src/main/thrift/compaction-coordinator.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,6 @@ service CompactionCoordinatorService {
2:client.ThriftNotActiveServiceException tnase
)

/*
* Called by the Monitor to get progress information
*/
TExternalCompactionMap getCompletedCompactions(
1:client.TInfo tinfo
2:security.TCredentials credentials
)throws(
1:client.ThriftSecurityException sec
2:client.ThriftNotActiveServiceException tnase
)

void cancel(
1:client.TInfo tinfo
2:security.TCredentials credentials
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ static FailureCounts incrementSuccess(Object key, FailureCounts counts) {
// Exposed for tests
protected final CountDownLatch shutdown = new CountDownLatch(1);

private final Cache<ExternalCompactionId,TExternalCompaction> completed;
private final LoadingCache<FateId,CompactionConfig> compactionConfigCache;
private final Cache<Path,Integer> tabletDirCache;
private final DeadCompactionDetector deadCompactionDetector;
Expand Down Expand Up @@ -247,9 +246,6 @@ public CompactionCoordinator(Manager manager,

this.fateClients = fateClients;

completed = ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can CacheName.COMPACTIONS_COMPLETED be removed also?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably, I will look into that

.maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build();

CacheLoader<FateId,CompactionConfig> loader =
fateId -> CompactionConfigStorage.getConfig(ctx, fateId);

Expand Down Expand Up @@ -1014,9 +1010,6 @@ public void recordCompletion(TInfo tinfo, TCredentials credentials, String exter

public void recordCompletion(ExternalCompactionId ecid) {
var tec = RUNNING_CACHE.remove(ecid);
if (tec != null) {
completed.put(ecid, tec);
}
}

protected Set<ExternalCompactionId> readExternalCompactionIds() {
Expand Down Expand Up @@ -1052,29 +1045,6 @@ public TExternalCompactionMap getRunningCompactions(TInfo tinfo, TCredentials cr
return result;
}

/**
* Return information about recently completed compactions
*
* @param tinfo trace info
* @param credentials tcredentials object
* @return map of ECID to TExternalCompaction objects
* @throws ThriftSecurityException permission error
*/
@Override
public TExternalCompactionMap getCompletedCompactions(TInfo tinfo, TCredentials credentials)
throws ThriftSecurityException {
// do not expect users to call this directly, expect other tservers to call this method
if (!security.canPerformSystemActions(credentials)) {
throw new AccumuloSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}
final TExternalCompactionMap result = new TExternalCompactionMap();
completed.asMap().forEach((ecid, tec) -> {
result.putToCompactions(ecid.canonical(), tec);
});
return result;
}

@Override
public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId)
throws TException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP4;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionRunning;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionsNoLongerRunning;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.countTablets;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row;
Expand All @@ -50,7 +50,6 @@
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.metadata.SystemTables;
Expand Down Expand Up @@ -122,8 +121,7 @@ public void testSplitCancelsExternalCompaction() throws Exception {

client.tableOperations().addSplits(table1, splits);

confirmCompactionCompleted(getCluster().getServerContext(), ecids,
TCompactionState.CANCELLED);
confirmCompactionsNoLongerRunning(getCluster().getServerContext(), ecids);

// ensure compaction ids were deleted by split operation from metadata table
try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets()
Expand Down Expand Up @@ -192,8 +190,7 @@ public void testUserCompactionCancellation() throws Exception {
assertNotNull(e);
assertEquals(TableOperationsImpl.COMPACTION_CANCELED_MSG, e.getMessage());

confirmCompactionCompleted(getCluster().getServerContext(), ecids,
TCompactionState.CANCELLED);
confirmCompactionsNoLongerRunning(getCluster().getServerContext(), ecids);

// ensure the canceled compaction deletes any tablet metadata related to the compaction
while (countTablets(getCluster().getServerContext(), table1,
Expand Down Expand Up @@ -236,8 +233,7 @@ public void testDeleteTableCancelsUserExternalCompaction() throws Exception {

client.tableOperations().delete(table1);

confirmCompactionCompleted(getCluster().getServerContext(), ecids,
TCompactionState.CANCELLED);
confirmCompactionsNoLongerRunning(getCluster().getServerContext(), ecids);

// Ensure compaction did not write anything to metadata table after delete table
try (var scanner = client.createScanner(SystemTables.METADATA.tableName())) {
Expand Down Expand Up @@ -281,6 +277,7 @@ public void testDeleteTableCancelsExternalCompaction() throws Exception {
Thread t = new Thread(r);
t.start();
latch.await();
assertNull(error.get());
// Wait for the compaction to start by waiting for 1 external compaction column
Set<ExternalCompactionId> ecids =
ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids(ctx, tid);
Expand All @@ -293,7 +290,7 @@ public void testDeleteTableCancelsExternalCompaction() throws Exception {

LoggerFactory.getLogger(getClass()).debug("Table deleted.");

confirmCompactionCompleted(ctx, ecids, TCompactionState.CANCELLED);
confirmCompactionsNoLongerRunning(ctx, ecids);

LoggerFactory.getLogger(getClass()).debug("Confirmed compaction cancelled.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import static org.apache.accumulo.core.conf.Property.TABLE_FILE_MAX;
import static org.apache.accumulo.core.conf.Property.TABLE_MAJC_RATIO;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionRunning;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionsNoLongerRunning;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData;
Expand All @@ -39,7 +39,6 @@
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.user.AgeOffFilter;
Expand Down Expand Up @@ -173,9 +172,7 @@ public void testDeleteTableCancelsSleepingExternalCompaction() throws Exception

client.tableOperations().delete(table1);

confirmCompactionCompleted(getCluster().getServerContext(), ecids,
TCompactionState.CANCELLED);

confirmCompactionsNoLongerRunning(getCluster().getServerContext(), ecids);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
*/
package org.apache.accumulo.test.compaction;

import static java.util.concurrent.TimeUnit.MINUTES;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -162,7 +162,7 @@ public static void writeData(AccumuloClient client, String table1, int rows)
}
}

client.tableOperations().flush(table1);
client.tableOperations().flush(table1, null, null, true);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an existing test bug I ran into while running ITs. Changed this code to wait for the flush instead of just initiate it. One test was hanging because it was trying to create 4 files, but this would not always happen.

}

public static void writeData(AccumuloClient client, String table1)
Expand Down Expand Up @@ -265,19 +265,6 @@ public static TExternalCompactionMap getRunningCompactions(ClientContext context
}
}

private static TExternalCompactionMap getCompletedCompactions(ClientContext context,
Optional<HostAndPort> coordinatorHost) throws Exception {
CompactionCoordinatorService.Client client =
ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, coordinatorHost.orElseThrow(), context);
try {
TExternalCompactionMap completed =
client.getCompletedCompactions(TraceUtil.traceInfo(), context.rpcCreds());
return completed;
} finally {
ThriftUtil.returnClient(client, context);
}
}

public static TCompactionState getLastState(TExternalCompaction status) {
ArrayList<Long> timestamps = new ArrayList<>(status.getUpdates().size());
status.getUpdates().keySet().forEach(k -> timestamps.add(k));
Expand Down Expand Up @@ -349,33 +336,18 @@ public static int confirmCompactionRunning(ServerContext ctx, Set<ExternalCompac
return matches;
}

public static void confirmCompactionCompleted(ServerContext ctx, Set<ExternalCompactionId> ecids,
TCompactionState expectedState) throws Exception {
Optional<HostAndPort> coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(ctx);
if (coordinatorHost.isEmpty()) {
throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
}

// The running compaction should be removed
TExternalCompactionMap running =
ExternalCompactionTestUtils.getRunningCompactions(ctx, coordinatorHost);
while (running.getCompactions() != null && running.getCompactions().keySet().stream()
.anyMatch((e) -> ecids.contains(ExternalCompactionId.of(e)))) {
running = ExternalCompactionTestUtils.getRunningCompactions(ctx, coordinatorHost);
}
// The compaction should be in the completed list with the expected state
TExternalCompactionMap completed =
ExternalCompactionTestUtils.getCompletedCompactions(ctx, coordinatorHost);
while (completed.getCompactions() == null) {
UtilWaitThread.sleep(50);
completed = ExternalCompactionTestUtils.getCompletedCompactions(ctx, coordinatorHost);
}
for (ExternalCompactionId e : ecids) {
TExternalCompaction tec = completed.getCompactions().get(e.canonical());
assertNotNull(tec);
assertEquals(expectedState, ExternalCompactionTestUtils.getLastState(tec));
}
/**
* Waits for compactions to no longer be running on compactors
*/
public static void confirmCompactionsNoLongerRunning(ServerContext ctx,
Set<ExternalCompactionId> ecids) {
Wait.waitFor(() -> {
Set<ExternalCompactionId> running = new HashSet<>();
ExternalCompactionUtil.getCompactionsRunningOnCompactors(ctx,
tec -> running.add(ExternalCompactionId.of(tec.getJob().getExternalCompactionId())));

return Collections.disjoint(running, ecids);
}, MINUTES.toMillis(5));
}

public static void assertNoCompactionMetadata(ServerContext ctx, String tableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionsNoLongerRunning;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getRunningCompactions;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids;
Expand Down Expand Up @@ -137,8 +137,7 @@ public void testMergeCancelsExternalCompaction() throws Exception {
Text end = md.get(1).getEndRow();
client.tableOperations().merge(table1, start, end);

confirmCompactionCompleted(getCluster().getServerContext(), ecids,
TCompactionState.CANCELLED);
confirmCompactionsNoLongerRunning(getCluster().getServerContext(), ecids);

// ensure compaction ids were deleted by merge operation from metadata table
try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.accumulo.test.compaction;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionsNoLongerRunning;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -34,7 +35,6 @@
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
Expand Down Expand Up @@ -119,8 +119,7 @@ public void test() throws Exception {
partitionKeys.add(new Text("10"));
c.tableOperations().addSplits(tableName, partitionKeys);

ExternalCompactionTestUtils.confirmCompactionCompleted(getCluster().getServerContext(),
compactionIds, TCompactionState.CANCELLED);
confirmCompactionsNoLongerRunning(getCluster().getServerContext(), compactionIds);

thread.join();
// wait for the restarted compaction
Expand Down