-
Notifications
You must be signed in to change notification settings - Fork 478
Removes the completed set from the coordinator #6241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,8 +18,8 @@ | |
| */ | ||
| package org.apache.accumulo.test.compaction; | ||
|
|
||
| import static java.util.concurrent.TimeUnit.MINUTES; | ||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertNotNull; | ||
| import static org.junit.jupiter.api.Assertions.assertNull; | ||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||
|
|
||
|
|
@@ -162,7 +162,7 @@ public static void writeData(AccumuloClient client, String table1, int rows) | |
| } | ||
| } | ||
|
|
||
| client.tableOperations().flush(table1); | ||
| client.tableOperations().flush(table1, null, null, true); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was an existing test bug I ran into while running ITs. Changed this code to wait for the flush instead of just initiate it. One test was hanging because it was trying to create 4 files, but this would not always happen. |
||
| } | ||
|
|
||
| public static void writeData(AccumuloClient client, String table1) | ||
|
|
@@ -265,19 +265,6 @@ public static TExternalCompactionMap getRunningCompactions(ClientContext context | |
| } | ||
| } | ||
|
|
||
| private static TExternalCompactionMap getCompletedCompactions(ClientContext context, | ||
| Optional<HostAndPort> coordinatorHost) throws Exception { | ||
| CompactionCoordinatorService.Client client = | ||
| ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, coordinatorHost.orElseThrow(), context); | ||
| try { | ||
| TExternalCompactionMap completed = | ||
| client.getCompletedCompactions(TraceUtil.traceInfo(), context.rpcCreds()); | ||
| return completed; | ||
| } finally { | ||
| ThriftUtil.returnClient(client, context); | ||
| } | ||
| } | ||
|
|
||
| public static TCompactionState getLastState(TExternalCompaction status) { | ||
| ArrayList<Long> timestamps = new ArrayList<>(status.getUpdates().size()); | ||
| status.getUpdates().keySet().forEach(k -> timestamps.add(k)); | ||
|
|
@@ -349,33 +336,18 @@ public static int confirmCompactionRunning(ServerContext ctx, Set<ExternalCompac | |
| return matches; | ||
| } | ||
|
|
||
| public static void confirmCompactionCompleted(ServerContext ctx, Set<ExternalCompactionId> ecids, | ||
| TCompactionState expectedState) throws Exception { | ||
| Optional<HostAndPort> coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(ctx); | ||
| if (coordinatorHost.isEmpty()) { | ||
| throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); | ||
| } | ||
|
|
||
| // The running compaction should be removed | ||
| TExternalCompactionMap running = | ||
| ExternalCompactionTestUtils.getRunningCompactions(ctx, coordinatorHost); | ||
| while (running.getCompactions() != null && running.getCompactions().keySet().stream() | ||
| .anyMatch((e) -> ecids.contains(ExternalCompactionId.of(e)))) { | ||
| running = ExternalCompactionTestUtils.getRunningCompactions(ctx, coordinatorHost); | ||
| } | ||
| // The compaction should be in the completed list with the expected state | ||
| TExternalCompactionMap completed = | ||
| ExternalCompactionTestUtils.getCompletedCompactions(ctx, coordinatorHost); | ||
| while (completed.getCompactions() == null) { | ||
| UtilWaitThread.sleep(50); | ||
| completed = ExternalCompactionTestUtils.getCompletedCompactions(ctx, coordinatorHost); | ||
| } | ||
| for (ExternalCompactionId e : ecids) { | ||
| TExternalCompaction tec = completed.getCompactions().get(e.canonical()); | ||
| assertNotNull(tec); | ||
| assertEquals(expectedState, ExternalCompactionTestUtils.getLastState(tec)); | ||
| } | ||
| /** | ||
| * Waits for compactions to no longer be running on compactors | ||
| */ | ||
| public static void confirmCompactionsNoLongerRunning(ServerContext ctx, | ||
| Set<ExternalCompactionId> ecids) { | ||
| Wait.waitFor(() -> { | ||
| Set<ExternalCompactionId> running = new HashSet<>(); | ||
| ExternalCompactionUtil.getCompactionsRunningOnCompactors(ctx, | ||
| tec -> running.add(ExternalCompactionId.of(tec.getJob().getExternalCompactionId()))); | ||
|
|
||
| return Collections.disjoint(running, ecids); | ||
| }, MINUTES.toMillis(5)); | ||
| } | ||
|
|
||
| public static void assertNoCompactionMetadata(ServerContext ctx, String tableName) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can
CacheName.COMPACTIONS_COMPLETEDbe removed also?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably, I will look into that