diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index aee9679cc01..17ca7e98a21 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -117,6 +117,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; @@ -207,9 +208,6 @@ static FailureCounts incrementSuccess(Object key, FailureCounts counts) { protected final Map RUNNING_CACHE = new ConcurrentHashMap<>(); - /* Map of group name to last time compactor called to get a compaction job */ - private final Map TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); - private final ServerContext ctx; private final AuditedSecurityOperation security; private final CompactionJobQueues jobQueues; @@ -382,30 +380,6 @@ public void run() { LOG.info("Shutting down"); } - private Map> getIdleCompactors(Set runningCompactors) { - - final Map> allCompactors = new HashMap<>(); - runningCompactors.forEach((csi) -> allCompactors - .computeIfAbsent(csi.getResourceGroup().canonical(), (k) -> new HashSet<>()) - .add(HostAndPort.fromParts(csi.getHost(), csi.getPort()))); - - final Set emptyQueues = new HashSet<>(); - - // Remove all of the compactors that are running a compaction - RUNNING_CACHE.values().forEach(tec -> { - Set busyCompactors = allCompactors.get(tec.getGroupName()); - if (busyCompactors != null - && busyCompactors.remove(HostAndPort.fromString(tec.getCompactor()))) { - if (busyCompactors.isEmpty()) { - emptyQueues.add(tec.getGroupName()); - } - } - }); - // Remove entries with empty queues - emptyQueues.forEach(e -> allCompactors.remove(e)); - return allCompactors; - } - protected void startDeadCompactionDetector() { deadCompactionDetector.start(); } @@ -414,10 +388,6 @@ protected long getMissingCompactorWarningTime() { return this.ctx.getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME) * 3; } - public long getNumRunningCompactions() { - return RUNNING_CACHE.size(); - } - /** * Return the next compaction job from the queue to a Compactor * @@ -438,7 +408,6 @@ public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials } ResourceGroupId groupId = ResourceGroupId.of(groupName); LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, compactorAddress); - TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis()); TExternalCompactionJob result = null; @@ -1203,17 +1172,46 @@ private Set getCompactionServicesConfigurationGroups() return groups; } + record DequeuedSample(Timer timer, long dequeuedCount) { + } + + // Used to track how long a resource groups dequeued count is the same. + private final Map dequeuedCountTracker = + Collections.synchronizedMap(new HashMap<>()); + + private void logNonEmptyQueuesThatAreInactive() { + dequeuedCountTracker.keySet().retainAll(jobQueues.getQueueIds()); + + Duration warnDuration = Duration.ofMillis(getMissingCompactorWarningTime()); + + for (var rgid : jobQueues.getQueueIds()) { + var last = dequeuedCountTracker.get(rgid); + long dequeued = jobQueues.getDequeuedJobs(rgid); + if (last == null || last.dequeuedCount != dequeued) { + dequeuedCountTracker.put(rgid, new DequeuedSample(Timer.startNew(), dequeued)); + continue; + } + + Duration notEmptyDuration = jobQueues.getNotEmptyDuration(rgid); + if (last.timer.elapsed().compareTo(warnDuration) > 0 + && notEmptyDuration.compareTo(warnDuration) > 0) { + // This queue has been non empty and nothing has been dequeued from it for greater than the + // warn duration. + LOG.warn( + "Compactor group {} has {} queued jobs, has had queued jobs for {}ms, and nothing was dequeued for {}ms", + rgid, jobQueues.getQueuedJobs(rgid), notEmptyDuration.toMillis(), + last.timer.elapsed(TimeUnit.MILLISECONDS)); + } + } + } + public void cleanUpInternalState() { // This method does the following: // - // 1. Removes entries from RUNNING_CACHE and LONG_RUNNING_COMPACTIONS_BY_RG that are not really - // running + // 1. Removes entries from RUNNING_CACHE that are not really running // 2. Cancels running compactions for groups that are not in the current configuration - // 3. Remove groups not in configuration from TIME_COMPACTOR_LAST_CHECKED - // 4. Log groups with no compactors // 5. Log compactors with no groups - // 6. Log groups with compactors and queued jos that have not checked in var config = ctx.getConfiguration(); ThreadPools.resizePool(reservationPools.get(DataLevel.ROOT), config, @@ -1269,11 +1267,6 @@ public void cleanUpInternalState() { cancelCompactionOnCompactor(tec.getCompactor(), tec.getJob().getExternalCompactionId()); } }); - - final Set trackedGroups = Set.copyOf(TIME_COMPACTOR_LAST_CHECKED.keySet()); - TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(groupsInConfiguration); - LOG.debug("No longer tracking compactor check-in times for groups: {}", - Sets.difference(trackedGroups, TIME_COMPACTOR_LAST_CHECKED.keySet())); } final Set runningCompactors = getRunningCompactors(); @@ -1282,18 +1275,6 @@ public void cleanUpInternalState() { runningCompactors.forEach( c -> runningCompactorGroups.add(ResourceGroupId.of(c.getResourceGroup().canonical()))); - final Set groupsWithNoCompactors = - Sets.difference(groupsInConfiguration, runningCompactorGroups); - if (groupsWithNoCompactors != null && !groupsWithNoCompactors.isEmpty()) { - for (ResourceGroupId group : groupsWithNoCompactors) { - long queuedJobCount = jobQueues.getQueuedJobs(group); - if (queuedJobCount > 0) { - LOG.warn("Compactor group {} has {} queued compactions but no running compactors", group, - queuedJobCount); - } - } - } - final Set compactorsWithNoGroups = Sets.difference(runningCompactorGroups, groupsInConfiguration); if (compactorsWithNoGroups != null && !compactorsWithNoGroups.isEmpty()) { @@ -1302,19 +1283,6 @@ public void cleanUpInternalState() { compactorsWithNoGroups); } - final long now = System.currentTimeMillis(); - final long warningTime = getMissingCompactorWarningTime(); - Map> idleCompactors = getIdleCompactors(runningCompactors); - for (ResourceGroupId groupName : groupsInConfiguration) { - long lastCheckTime = - TIME_COMPACTOR_LAST_CHECKED.getOrDefault(groupName, coordinatorStartTime); - if ((now - lastCheckTime) > warningTime && jobQueues.getQueuedJobs(groupName) > 0 - && idleCompactors.containsKey(groupName.canonical())) { - LOG.warn( - "The group {} has queued jobs and {} idle compactors, however none have checked in " - + "with coordinator for {}ms", - groupName, idleCompactors.get(groupName.canonical()).size(), warningTime); - } - } + logNonEmptyQueuesThatAreInactive(); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index f0b0bec8c6d..62d6e149cfd 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -238,6 +238,10 @@ public synchronized int add(KeyExtent extent, Collection jobs, lo return jobsAdded; } + public synchronized Duration getNotEmptyDuration() { + return jobQueue.getNotEmptyDuration(); + } + public synchronized long getMaxSize() { return maxSize.get(); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java index e87c231bed9..36b69815d6f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.manager.compaction.queue; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.EnumMap; @@ -108,6 +109,16 @@ public long getQueuedJobs(ResourceGroupId groupId) { return prioQ == null ? 0 : prioQ.getQueuedJobs(); } + public long getDequeuedJobs(ResourceGroupId groupId) { + var prioQ = priorityQueues.get(groupId); + return prioQ == null ? 0 : prioQ.getDequeuedJobs(); + } + + public Duration getNotEmptyDuration(ResourceGroupId groupId) { + var prioQ = priorityQueues.get(groupId); + return prioQ == null ? Duration.ZERO : prioQ.getNotEmptyDuration(); + } + public long getQueueCount() { return priorityQueues.mappingCount(); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java index ab1e8f5f521..9c3a9d88ecf 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java @@ -18,10 +18,13 @@ */ package org.apache.accumulo.manager.compaction.queue; +import java.time.Duration; import java.util.AbstractMap; import java.util.Map; import java.util.TreeMap; +import org.apache.accumulo.core.util.Timer; + import com.google.common.base.Preconditions; /** @@ -43,6 +46,7 @@ private ValueWrapper(V2 val, long computedSize) { private final TreeMap> map = new TreeMap<>(); private long dataSize = 0; private Weigher weigher; + private final Timer notEmptyTimer = Timer.startNew(); private Map.Entry unwrap(Map.Entry> wrapperEntry) { if (wrapperEntry == null) { @@ -54,6 +58,9 @@ private Map.Entry unwrap(Map.Entry> wrapperEntry) { private void incrementDataSize(ValueWrapper val) { Preconditions.checkState(dataSize >= 0); + if (dataSize == 0) { + notEmptyTimer.restart(); + } dataSize += val.computedSize; } @@ -98,6 +105,17 @@ public Map.Entry firstEntry() { return unwrap(map.firstEntry()); } + /** + * @return The duration since transitioning from empty to not empty. + */ + public Duration getNotEmptyDuration() { + if (map.isEmpty()) { + return Duration.ZERO; + } else { + return notEmptyTimer.elapsed(); + } + } + public void remove(K key) { var prev = map.remove(key); decrementDataSize(prev); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index a11e345a993..881cfa51ce3 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -39,7 +39,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.thrift.TInfo; @@ -370,11 +369,8 @@ public void testGetCompactionJobNoJobs() throws Exception { @Test public void testCleanUpRunning() throws Exception { TExternalCompaction ext1 = createMock(TExternalCompaction.class); - expect(ext1.getGroupName()).andReturn(Constants.DEFAULT_RESOURCE_GROUP_NAME).atLeastOnce(); TExternalCompaction ext2 = createMock(TExternalCompaction.class); - expect(ext2.getGroupName()).andReturn(Constants.DEFAULT_RESOURCE_GROUP_NAME).atLeastOnce(); TExternalCompaction ext3 = createMock(TExternalCompaction.class); - expect(ext3.getGroupName()).andReturn(Constants.DEFAULT_RESOURCE_GROUP_NAME).atLeastOnce(); replay(ext1, ext2, ext3); TestCoordinator coordinator = new TestCoordinator(manager, new ArrayList<>()); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java index d3c1acd469b..b6205bc9847 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java @@ -21,12 +21,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.net.URI; import java.net.URISyntaxException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -426,4 +428,65 @@ public void testResetSize() throws Exception { assertEquals(500000, jobQueues.getQueueMaxSize(cg1)); assertEquals(500000, jobQueues.getQueueMaxSize(cg2)); } + + @Test + public void testNonEmptyDuration() throws Exception { + CompactionJobQueues jobQueues = new CompactionJobQueues(1000000); + + var tid = TableId.of("1"); + var extent1 = new KeyExtent(tid, new Text("z"), new Text("q")); + var extent2 = new KeyExtent(tid, new Text("c"), new Text("b")); + + var cg1 = ResourceGroupId.of("CG1"); + var cg2 = ResourceGroupId.of("CG2"); + + assertEquals(Duration.ZERO, jobQueues.getNotEmptyDuration(cg1)); + + jobQueues.add(extent1, List.of(newJob((short) 1, 5, cg1))); + Thread.sleep(2); + assertTrue(jobQueues.getNotEmptyDuration(cg1).toMillis() >= 2); + + assertEquals(Duration.ZERO, jobQueues.getNotEmptyDuration(cg2)); + + jobQueues.add(extent1, List.of(newJob((short) 1, 5, cg2))); + Thread.sleep(2); + assertTrue(jobQueues.getNotEmptyDuration(cg2).toMillis() >= 2); + assertTrue(jobQueues.getNotEmptyDuration(cg1).toMillis() >= 4); + assertTrue( + jobQueues.getNotEmptyDuration(cg1).compareTo(jobQueues.getNotEmptyDuration(cg2)) > 0); + + // adding a job should not reset the duration + jobQueues.add(extent2, List.of(newJob((short) 1, 3, cg1))); + assertTrue(jobQueues.getNotEmptyDuration(cg2).toMillis() >= 2); + assertTrue(jobQueues.getNotEmptyDuration(cg1).toMillis() >= 4); + assertTrue( + jobQueues.getNotEmptyDuration(cg1).compareTo(jobQueues.getNotEmptyDuration(cg2)) > 0); + + // Removing all jobs from the queue should reset the duration + assertEquals(0, jobQueues.getDequeuedJobs(cg1)); + assertNotNull(jobQueues.poll(cg1)); + assertNotNull(jobQueues.poll(cg1)); + assertTrue(jobQueues.getNotEmptyDuration(cg2).toMillis() >= 2); + assertEquals(Duration.ZERO, jobQueues.getNotEmptyDuration(cg1)); + + assertEquals(2, jobQueues.getDequeuedJobs(cg1)); + assertEquals(0, jobQueues.getDequeuedJobs(cg2)); + + // Adding another should should reset the non empty timer + jobQueues.add(extent1, List.of(newJob((short) 1, 5, cg1))); + Thread.sleep(2); + assertTrue(jobQueues.getNotEmptyDuration(cg2).toMillis() >= 4); + assertTrue(jobQueues.getNotEmptyDuration(cg1).toMillis() >= 2); + assertTrue( + jobQueues.getNotEmptyDuration(cg1).compareTo(jobQueues.getNotEmptyDuration(cg2)) < 0); + + // remove everything from all queues + assertNotNull(jobQueues.poll(cg1)); + assertNotNull(jobQueues.poll(cg2)); + assertEquals(Duration.ZERO, jobQueues.getNotEmptyDuration(cg1)); + assertEquals(Duration.ZERO, jobQueues.getNotEmptyDuration(cg2)); + assertEquals(3, jobQueues.getDequeuedJobs(cg1)); + assertEquals(1, jobQueues.getDequeuedJobs(cg2)); + + } }