Skip to content

Commit b951aab

Browse files
authored
Removes the completed set from the coordinator (#6241)
This change is made in support of #6217. Successfully ran all ITs that were changed.
1 parent 6efc91e commit b951aab

9 files changed

Lines changed: 25 additions & 1378 deletions

File tree

core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ public enum CacheName {
3737
BULK_IMPORT_FILE_LENGTHS,
3838
CLASSLOADERS,
3939
COMBINER_LOGGED_MSGS,
40-
COMPACTIONS_COMPLETED,
4140
COMPACTION_CONFIGS,
4241
COMPACTOR_COUNTS,
4342
COMPACTION_DIR_CACHE,

core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinatorService.java

Lines changed: 0 additions & 1275 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/main/thrift/compaction-coordinator.thrift

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -147,17 +147,6 @@ service CompactionCoordinatorService {
147147
2:client.ThriftNotActiveServiceException tnase
148148
)
149149

150-
/*
151-
* Called by the Monitor to get progress information
152-
*/
153-
TExternalCompactionMap getCompletedCompactions(
154-
1:client.TInfo tinfo
155-
2:security.TCredentials credentials
156-
)throws(
157-
1:client.ThriftSecurityException sec
158-
2:client.ThriftNotActiveServiceException tnase
159-
)
160-
161150
void cancel(
162151
1:client.TInfo tinfo
163152
2:security.TCredentials credentials

server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,6 @@ static FailureCounts incrementSuccess(Object key, FailureCounts counts) {
214214
// Exposed for tests
215215
protected final CountDownLatch shutdown = new CountDownLatch(1);
216216

217-
private final Cache<ExternalCompactionId,TExternalCompaction> completed;
218217
private final LoadingCache<FateId,CompactionConfig> compactionConfigCache;
219218
private final Cache<Path,Integer> tabletDirCache;
220219
private final DeadCompactionDetector deadCompactionDetector;
@@ -244,9 +243,6 @@ public CompactionCoordinator(Manager manager,
244243

245244
this.fateClients = fateClients;
246245

247-
completed = ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true)
248-
.maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build();
249-
250246
CacheLoader<FateId,CompactionConfig> loader =
251247
fateId -> CompactionConfigStorage.getConfig(ctx, fateId);
252248

@@ -978,9 +974,6 @@ public void recordCompletion(TInfo tinfo, TCredentials credentials, String exter
978974

979975
public void recordCompletion(ExternalCompactionId ecid) {
980976
var tec = RUNNING_CACHE.remove(ecid);
981-
if (tec != null) {
982-
completed.put(ecid, tec);
983-
}
984977
}
985978

986979
protected Set<ExternalCompactionId> readExternalCompactionIds() {
@@ -1016,29 +1009,6 @@ public TExternalCompactionMap getRunningCompactions(TInfo tinfo, TCredentials cr
10161009
return result;
10171010
}
10181011

1019-
/**
1020-
* Return information about recently completed compactions
1021-
*
1022-
* @param tinfo trace info
1023-
* @param credentials tcredentials object
1024-
* @return map of ECID to TExternalCompaction objects
1025-
* @throws ThriftSecurityException permission error
1026-
*/
1027-
@Override
1028-
public TExternalCompactionMap getCompletedCompactions(TInfo tinfo, TCredentials credentials)
1029-
throws ThriftSecurityException {
1030-
// do not expect users to call this directly, expect other tservers to call this method
1031-
if (!security.canPerformSystemActions(credentials)) {
1032-
throw new AccumuloSecurityException(credentials.getPrincipal(),
1033-
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
1034-
}
1035-
final TExternalCompactionMap result = new TExternalCompactionMap();
1036-
completed.asMap().forEach((ecid, tec) -> {
1037-
result.putToCompactions(ecid.canonical(), tec);
1038-
});
1039-
return result;
1040-
}
1041-
10421012
@Override
10431013
public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId)
10441014
throws TException {

test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction2ITBase.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP4;
2424
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA;
2525
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
26-
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted;
2726
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionRunning;
27+
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionsNoLongerRunning;
2828
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.countTablets;
2929
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
3030
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row;
@@ -50,7 +50,6 @@
5050
import org.apache.accumulo.core.client.AccumuloSecurityException;
5151
import org.apache.accumulo.core.client.TableNotFoundException;
5252
import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
53-
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
5453
import org.apache.accumulo.core.conf.Property;
5554
import org.apache.accumulo.core.data.TableId;
5655
import org.apache.accumulo.core.metadata.SystemTables;
@@ -122,8 +121,7 @@ public void testSplitCancelsExternalCompaction() throws Exception {
122121

123122
client.tableOperations().addSplits(table1, splits);
124123

125-
confirmCompactionCompleted(getCluster().getServerContext(), ecids,
126-
TCompactionState.CANCELLED);
124+
confirmCompactionsNoLongerRunning(getCluster().getServerContext(), ecids);
127125

128126
// ensure compaction ids were deleted by split operation from metadata table
129127
try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets()
@@ -192,8 +190,7 @@ public void testUserCompactionCancellation() throws Exception {
192190
assertNotNull(e);
193191
assertEquals(TableOperationsImpl.COMPACTION_CANCELED_MSG, e.getMessage());
194192

195-
confirmCompactionCompleted(getCluster().getServerContext(), ecids,
196-
TCompactionState.CANCELLED);
193+
confirmCompactionsNoLongerRunning(getCluster().getServerContext(), ecids);
197194

198195
// ensure the canceled compaction deletes any tablet metadata related to the compaction
199196
while (countTablets(getCluster().getServerContext(), table1,
@@ -236,8 +233,7 @@ public void testDeleteTableCancelsUserExternalCompaction() throws Exception {
236233

237234
client.tableOperations().delete(table1);
238235

239-
confirmCompactionCompleted(getCluster().getServerContext(), ecids,
240-
TCompactionState.CANCELLED);
236+
confirmCompactionsNoLongerRunning(getCluster().getServerContext(), ecids);
241237

242238
// Ensure compaction did not write anything to metadata table after delete table
243239
try (var scanner = client.createScanner(SystemTables.METADATA.tableName())) {
@@ -281,6 +277,7 @@ public void testDeleteTableCancelsExternalCompaction() throws Exception {
281277
Thread t = new Thread(r);
282278
t.start();
283279
latch.await();
280+
assertNull(error.get());
284281
// Wait for the compaction to start by waiting for 1 external compaction column
285282
Set<ExternalCompactionId> ecids =
286283
ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids(ctx, tid);
@@ -293,7 +290,7 @@ public void testDeleteTableCancelsExternalCompaction() throws Exception {
293290

294291
LoggerFactory.getLogger(getClass()).debug("Table deleted.");
295292

296-
confirmCompactionCompleted(ctx, ecids, TCompactionState.CANCELLED);
293+
confirmCompactionsNoLongerRunning(ctx, ecids);
297294

298295
LoggerFactory.getLogger(getClass()).debug("Confirmed compaction cancelled.");
299296

test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction4_IT.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import static org.apache.accumulo.core.conf.Property.TABLE_FILE_MAX;
2222
import static org.apache.accumulo.core.conf.Property.TABLE_MAJC_RATIO;
23-
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted;
2423
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionRunning;
24+
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionsNoLongerRunning;
2525
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
2626
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids;
2727
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData;
@@ -39,7 +39,6 @@
3939
import org.apache.accumulo.core.client.IteratorSetting;
4040
import org.apache.accumulo.core.client.admin.CompactionConfig;
4141
import org.apache.accumulo.core.clientImpl.ClientContext;
42-
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
4342
import org.apache.accumulo.core.data.TableId;
4443
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
4544
import org.apache.accumulo.core.iterators.user.AgeOffFilter;
@@ -173,9 +172,7 @@ public void testDeleteTableCancelsSleepingExternalCompaction() throws Exception
173172

174173
client.tableOperations().delete(table1);
175174

176-
confirmCompactionCompleted(getCluster().getServerContext(), ecids,
177-
TCompactionState.CANCELLED);
178-
175+
confirmCompactionsNoLongerRunning(getCluster().getServerContext(), ecids);
179176
}
180177
}
181178

test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java

Lines changed: 13 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
*/
1919
package org.apache.accumulo.test.compaction;
2020

21+
import static java.util.concurrent.TimeUnit.MINUTES;
2122
import static org.junit.jupiter.api.Assertions.assertEquals;
22-
import static org.junit.jupiter.api.Assertions.assertNotNull;
2323
import static org.junit.jupiter.api.Assertions.assertNull;
2424
import static org.junit.jupiter.api.Assertions.assertTrue;
2525

@@ -162,7 +162,7 @@ public static void writeData(AccumuloClient client, String table1, int rows)
162162
}
163163
}
164164

165-
client.tableOperations().flush(table1);
165+
client.tableOperations().flush(table1, null, null, true);
166166
}
167167

168168
public static void writeData(AccumuloClient client, String table1)
@@ -265,19 +265,6 @@ public static TExternalCompactionMap getRunningCompactions(ClientContext context
265265
}
266266
}
267267

268-
private static TExternalCompactionMap getCompletedCompactions(ClientContext context,
269-
Optional<HostAndPort> coordinatorHost) throws Exception {
270-
CompactionCoordinatorService.Client client =
271-
ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, coordinatorHost.orElseThrow(), context);
272-
try {
273-
TExternalCompactionMap completed =
274-
client.getCompletedCompactions(TraceUtil.traceInfo(), context.rpcCreds());
275-
return completed;
276-
} finally {
277-
ThriftUtil.returnClient(client, context);
278-
}
279-
}
280-
281268
public static TCompactionState getLastState(TExternalCompaction status) {
282269
ArrayList<Long> timestamps = new ArrayList<>(status.getUpdates().size());
283270
status.getUpdates().keySet().forEach(k -> timestamps.add(k));
@@ -349,33 +336,18 @@ public static int confirmCompactionRunning(ServerContext ctx, Set<ExternalCompac
349336
return matches;
350337
}
351338

352-
public static void confirmCompactionCompleted(ServerContext ctx, Set<ExternalCompactionId> ecids,
353-
TCompactionState expectedState) throws Exception {
354-
Optional<HostAndPort> coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(ctx);
355-
if (coordinatorHost.isEmpty()) {
356-
throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
357-
}
358-
359-
// The running compaction should be removed
360-
TExternalCompactionMap running =
361-
ExternalCompactionTestUtils.getRunningCompactions(ctx, coordinatorHost);
362-
while (running.getCompactions() != null && running.getCompactions().keySet().stream()
363-
.anyMatch((e) -> ecids.contains(ExternalCompactionId.of(e)))) {
364-
running = ExternalCompactionTestUtils.getRunningCompactions(ctx, coordinatorHost);
365-
}
366-
// The compaction should be in the completed list with the expected state
367-
TExternalCompactionMap completed =
368-
ExternalCompactionTestUtils.getCompletedCompactions(ctx, coordinatorHost);
369-
while (completed.getCompactions() == null) {
370-
UtilWaitThread.sleep(50);
371-
completed = ExternalCompactionTestUtils.getCompletedCompactions(ctx, coordinatorHost);
372-
}
373-
for (ExternalCompactionId e : ecids) {
374-
TExternalCompaction tec = completed.getCompactions().get(e.canonical());
375-
assertNotNull(tec);
376-
assertEquals(expectedState, ExternalCompactionTestUtils.getLastState(tec));
377-
}
339+
/**
340+
* Waits for compactions to no longer be running on compactors
341+
*/
342+
public static void confirmCompactionsNoLongerRunning(ServerContext ctx,
343+
Set<ExternalCompactionId> ecids) {
344+
Wait.waitFor(() -> {
345+
Set<ExternalCompactionId> running = new HashSet<>();
346+
ExternalCompactionUtil.getCompactionsRunningOnCompactors(ctx,
347+
tec -> running.add(ExternalCompactionId.of(tec.getJob().getExternalCompactionId())));
378348

349+
return Collections.disjoint(running, ecids);
350+
}, MINUTES.toMillis(5));
379351
}
380352

381353
public static void assertNoCompactionMetadata(ServerContext ctx, String tableName) {

test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2;
2222
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
23-
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted;
23+
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionsNoLongerRunning;
2424
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
2525
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getRunningCompactions;
2626
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids;
@@ -137,8 +137,7 @@ public void testMergeCancelsExternalCompaction() throws Exception {
137137
Text end = md.get(1).getEndRow();
138138
client.tableOperations().merge(table1, start, end);
139139

140-
confirmCompactionCompleted(getCluster().getServerContext(), ecids,
141-
TCompactionState.CANCELLED);
140+
confirmCompactionsNoLongerRunning(getCluster().getServerContext(), ecids);
142141

143142
// ensure compaction ids were deleted by merge operation from metadata table
144143
try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets()

test/src/main/java/org/apache/accumulo/test/compaction/SplitCancelsMajCIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.accumulo.test.compaction;
2020

2121
import static java.util.concurrent.TimeUnit.SECONDS;
22+
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionsNoLongerRunning;
2223
import static org.junit.jupiter.api.Assertions.assertEquals;
2324
import static org.junit.jupiter.api.Assertions.assertNotNull;
2425
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -34,7 +35,6 @@
3435
import org.apache.accumulo.core.client.AccumuloClient;
3536
import org.apache.accumulo.core.client.BatchWriter;
3637
import org.apache.accumulo.core.client.IteratorSetting;
37-
import org.apache.accumulo.core.compaction.thrift.TCompactionState;
3838
import org.apache.accumulo.core.conf.Property;
3939
import org.apache.accumulo.core.data.Mutation;
4040
import org.apache.accumulo.core.data.TableId;
@@ -119,8 +119,7 @@ public void test() throws Exception {
119119
partitionKeys.add(new Text("10"));
120120
c.tableOperations().addSplits(tableName, partitionKeys);
121121

122-
ExternalCompactionTestUtils.confirmCompactionCompleted(getCluster().getServerContext(),
123-
compactionIds, TCompactionState.CANCELLED);
122+
confirmCompactionsNoLongerRunning(getCluster().getServerContext(), compactionIds);
124123

125124
thread.join();
126125
// wait for the restarted compaction

0 commit comments

Comments
 (0)