From b101567d06a88b7ae2b8bee7505089f6741142b3 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 23 Mar 2026 22:57:25 +0000 Subject: [PATCH 1/3] Simplifies warn logging about inactive queues in the coordinator. Simplifies warn logging about inactive queues in the coordinator by using only information from the job queue to do the logging. Sets of compactors and information from the job queue was being used previously. Removes a usage of the running cache in the coordinator. Simplifying this logic to not use the running cache or sets of compactors will be helpful for #6217. --- .../coordinator/CompactionCoordinator.java | 100 +++++++----------- .../queue/CompactionJobPriorityQueue.java | 4 + .../compaction/queue/CompactionJobQueues.java | 11 ++ .../compaction/queue/SizeTrackingTreeMap.java | 18 ++++ .../queue/CompactionJobQueuesTest.java | 63 +++++++++++ 5 files changed, 132 insertions(+), 64 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index aee9679cc01..335b2b9ed73 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -117,6 +117,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; @@ -207,9 +208,6 @@ static FailureCounts incrementSuccess(Object key, FailureCounts counts) { protected final Map RUNNING_CACHE = new ConcurrentHashMap<>(); - /* Map of group name to last time compactor called to get a compaction job */ - private final Map TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); - private final ServerContext ctx; private final AuditedSecurityOperation security; private final CompactionJobQueues jobQueues; @@ -382,30 +380,6 @@ public void run() { LOG.info("Shutting down"); } - private Map> getIdleCompactors(Set runningCompactors) { - - final Map> allCompactors = new HashMap<>(); - runningCompactors.forEach((csi) -> allCompactors - .computeIfAbsent(csi.getResourceGroup().canonical(), (k) -> new HashSet<>()) - .add(HostAndPort.fromParts(csi.getHost(), csi.getPort()))); - - final Set emptyQueues = new HashSet<>(); - - // Remove all of the compactors that are running a compaction - RUNNING_CACHE.values().forEach(tec -> { - Set busyCompactors = allCompactors.get(tec.getGroupName()); - if (busyCompactors != null - && busyCompactors.remove(HostAndPort.fromString(tec.getCompactor()))) { - if (busyCompactors.isEmpty()) { - emptyQueues.add(tec.getGroupName()); - } - } - }); - // Remove entries with empty queues - emptyQueues.forEach(e -> allCompactors.remove(e)); - return allCompactors; - } - protected void startDeadCompactionDetector() { deadCompactionDetector.start(); } @@ -438,7 +412,6 @@ public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials } ResourceGroupId groupId = ResourceGroupId.of(groupName); LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, compactorAddress); - TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis()); TExternalCompactionJob result = null; @@ -1203,17 +1176,46 @@ private Set getCompactionServicesConfigurationGroups() return groups; } + record DequeuedSample(Timer timer, long dequeuedCount) { + } + + // Used to track how long a resource groups dequeued count is the same. + private final Map dequeuedCountTracker = + Collections.synchronizedMap(new HashMap<>()); + + private void logNonEmptyQueuesThatAreInactive() { + dequeuedCountTracker.keySet().retainAll(jobQueues.getQueueIds()); + + Duration warnDuration = Duration.ofMillis(getMissingCompactorWarningTime()); + + for (var rgid : jobQueues.getQueueIds()) { + var last = dequeuedCountTracker.get(rgid); + long dequeued = jobQueues.getDequeuedJobs(rgid); + if (last == null || last.dequeuedCount != dequeued) { + dequeuedCountTracker.put(rgid, new DequeuedSample(Timer.startNew(), dequeued)); + continue; + } + + Duration notEmptyDuration = jobQueues.getNotEmptyDuration(rgid); + if (last.timer.elapsed().compareTo(warnDuration) > 0 + && notEmptyDuration.compareTo(warnDuration) > 0) { + // This queue has been non empty and nothing has been dequeued from it for greater than the + // warn duration. + LOG.warn( + "Compactor group {} has {} queued jobs, has had queued jobs for {}ms, and nothing was dequeued for {}ms", + rgid, jobQueues.getQueuedJobs(rgid), notEmptyDuration.toMillis(), + last.timer.elapsed(TimeUnit.MILLISECONDS)); + } + } + } + public void cleanUpInternalState() { // This method does the following: // - // 1. Removes entries from RUNNING_CACHE and LONG_RUNNING_COMPACTIONS_BY_RG that are not really - // running + // 1. Removes entries from RUNNING_CACHE that are not really running // 2. Cancels running compactions for groups that are not in the current configuration - // 3. Remove groups not in configuration from TIME_COMPACTOR_LAST_CHECKED - // 4. Log groups with no compactors // 5. Log compactors with no groups - // 6. Log groups with compactors and queued jos that have not checked in var config = ctx.getConfiguration(); ThreadPools.resizePool(reservationPools.get(DataLevel.ROOT), config, @@ -1269,11 +1271,6 @@ public void cleanUpInternalState() { cancelCompactionOnCompactor(tec.getCompactor(), tec.getJob().getExternalCompactionId()); } }); - - final Set trackedGroups = Set.copyOf(TIME_COMPACTOR_LAST_CHECKED.keySet()); - TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(groupsInConfiguration); - LOG.debug("No longer tracking compactor check-in times for groups: {}", - Sets.difference(trackedGroups, TIME_COMPACTOR_LAST_CHECKED.keySet())); } final Set runningCompactors = getRunningCompactors(); @@ -1282,18 +1279,6 @@ public void cleanUpInternalState() { runningCompactors.forEach( c -> runningCompactorGroups.add(ResourceGroupId.of(c.getResourceGroup().canonical()))); - final Set groupsWithNoCompactors = - Sets.difference(groupsInConfiguration, runningCompactorGroups); - if (groupsWithNoCompactors != null && !groupsWithNoCompactors.isEmpty()) { - for (ResourceGroupId group : groupsWithNoCompactors) { - long queuedJobCount = jobQueues.getQueuedJobs(group); - if (queuedJobCount > 0) { - LOG.warn("Compactor group {} has {} queued compactions but no running compactors", group, - queuedJobCount); - } - } - } - final Set compactorsWithNoGroups = Sets.difference(runningCompactorGroups, groupsInConfiguration); if (compactorsWithNoGroups != null && !compactorsWithNoGroups.isEmpty()) { @@ -1302,19 +1287,6 @@ public void cleanUpInternalState() { compactorsWithNoGroups); } - final long now = System.currentTimeMillis(); - final long warningTime = getMissingCompactorWarningTime(); - Map> idleCompactors = getIdleCompactors(runningCompactors); - for (ResourceGroupId groupName : groupsInConfiguration) { - long lastCheckTime = - TIME_COMPACTOR_LAST_CHECKED.getOrDefault(groupName, coordinatorStartTime); - if ((now - lastCheckTime) > warningTime && jobQueues.getQueuedJobs(groupName) > 0 - && idleCompactors.containsKey(groupName.canonical())) { - LOG.warn( - "The group {} has queued jobs and {} idle compactors, however none have checked in " - + "with coordinator for {}ms", - groupName, idleCompactors.get(groupName.canonical()).size(), warningTime); - } - } + logNonEmptyQueuesThatAreInactive(); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index f0b0bec8c6d..62d6e149cfd 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -238,6 +238,10 @@ public synchronized int add(KeyExtent extent, Collection jobs, lo return jobsAdded; } + public synchronized Duration getNotEmptyDuration() { + return jobQueue.getNotEmptyDuration(); + } + public synchronized long getMaxSize() { return maxSize.get(); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java index e87c231bed9..36b69815d6f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.manager.compaction.queue; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.EnumMap; @@ -108,6 +109,16 @@ public long getQueuedJobs(ResourceGroupId groupId) { return prioQ == null ? 0 : prioQ.getQueuedJobs(); } + public long getDequeuedJobs(ResourceGroupId groupId) { + var prioQ = priorityQueues.get(groupId); + return prioQ == null ? 0 : prioQ.getDequeuedJobs(); + } + + public Duration getNotEmptyDuration(ResourceGroupId groupId) { + var prioQ = priorityQueues.get(groupId); + return prioQ == null ? Duration.ZERO : prioQ.getNotEmptyDuration(); + } + public long getQueueCount() { return priorityQueues.mappingCount(); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java index ab1e8f5f521..9c3a9d88ecf 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java @@ -18,10 +18,13 @@ */ package org.apache.accumulo.manager.compaction.queue; +import java.time.Duration; import java.util.AbstractMap; import java.util.Map; import java.util.TreeMap; +import org.apache.accumulo.core.util.Timer; + import com.google.common.base.Preconditions; /** @@ -43,6 +46,7 @@ private ValueWrapper(V2 val, long computedSize) { private final TreeMap> map = new TreeMap<>(); private long dataSize = 0; private Weigher weigher; + private final Timer notEmptyTimer = Timer.startNew(); private Map.Entry unwrap(Map.Entry> wrapperEntry) { if (wrapperEntry == null) { @@ -54,6 +58,9 @@ private Map.Entry unwrap(Map.Entry> wrapperEntry) { private void incrementDataSize(ValueWrapper val) { Preconditions.checkState(dataSize >= 0); + if (dataSize == 0) { + notEmptyTimer.restart(); + } dataSize += val.computedSize; } @@ -98,6 +105,17 @@ public Map.Entry firstEntry() { return unwrap(map.firstEntry()); } + /** + * @return The duration since transitioning from empty to not empty. + */ + public Duration getNotEmptyDuration() { + if (map.isEmpty()) { + return Duration.ZERO; + } else { + return notEmptyTimer.elapsed(); + } + } + public void remove(K key) { var prev = map.remove(key); decrementDataSize(prev); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java index d3c1acd469b..b6205bc9847 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java @@ -21,12 +21,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.net.URI; import java.net.URISyntaxException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -426,4 +428,65 @@ public void testResetSize() throws Exception { assertEquals(500000, jobQueues.getQueueMaxSize(cg1)); assertEquals(500000, jobQueues.getQueueMaxSize(cg2)); } + + @Test + public void testNonEmptyDuration() throws Exception { + CompactionJobQueues jobQueues = new CompactionJobQueues(1000000); + + var tid = TableId.of("1"); + var extent1 = new KeyExtent(tid, new Text("z"), new Text("q")); + var extent2 = new KeyExtent(tid, new Text("c"), new Text("b")); + + var cg1 = ResourceGroupId.of("CG1"); + var cg2 = ResourceGroupId.of("CG2"); + + assertEquals(Duration.ZERO, jobQueues.getNotEmptyDuration(cg1)); + + jobQueues.add(extent1, List.of(newJob((short) 1, 5, cg1))); + Thread.sleep(2); + assertTrue(jobQueues.getNotEmptyDuration(cg1).toMillis() >= 2); + + assertEquals(Duration.ZERO, jobQueues.getNotEmptyDuration(cg2)); + + jobQueues.add(extent1, List.of(newJob((short) 1, 5, cg2))); + Thread.sleep(2); + assertTrue(jobQueues.getNotEmptyDuration(cg2).toMillis() >= 2); + assertTrue(jobQueues.getNotEmptyDuration(cg1).toMillis() >= 4); + assertTrue( + jobQueues.getNotEmptyDuration(cg1).compareTo(jobQueues.getNotEmptyDuration(cg2)) > 0); + + // adding a job should not reset the duration + jobQueues.add(extent2, List.of(newJob((short) 1, 3, cg1))); + assertTrue(jobQueues.getNotEmptyDuration(cg2).toMillis() >= 2); + assertTrue(jobQueues.getNotEmptyDuration(cg1).toMillis() >= 4); + assertTrue( + jobQueues.getNotEmptyDuration(cg1).compareTo(jobQueues.getNotEmptyDuration(cg2)) > 0); + + // Removing all jobs from the queue should reset the duration + assertEquals(0, jobQueues.getDequeuedJobs(cg1)); + assertNotNull(jobQueues.poll(cg1)); + assertNotNull(jobQueues.poll(cg1)); + assertTrue(jobQueues.getNotEmptyDuration(cg2).toMillis() >= 2); + assertEquals(Duration.ZERO, jobQueues.getNotEmptyDuration(cg1)); + + assertEquals(2, jobQueues.getDequeuedJobs(cg1)); + assertEquals(0, jobQueues.getDequeuedJobs(cg2)); + + // Adding another should should reset the non empty timer + jobQueues.add(extent1, List.of(newJob((short) 1, 5, cg1))); + Thread.sleep(2); + assertTrue(jobQueues.getNotEmptyDuration(cg2).toMillis() >= 4); + assertTrue(jobQueues.getNotEmptyDuration(cg1).toMillis() >= 2); + assertTrue( + jobQueues.getNotEmptyDuration(cg1).compareTo(jobQueues.getNotEmptyDuration(cg2)) < 0); + + // remove everything from all queues + assertNotNull(jobQueues.poll(cg1)); + assertNotNull(jobQueues.poll(cg2)); + assertEquals(Duration.ZERO, jobQueues.getNotEmptyDuration(cg1)); + assertEquals(Duration.ZERO, jobQueues.getNotEmptyDuration(cg2)); + assertEquals(3, jobQueues.getDequeuedJobs(cg1)); + assertEquals(1, jobQueues.getDequeuedJobs(cg2)); + + } } From 9522baa0b1fa75a5296da675d7d3fe9b5d71ac5e Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 23 Mar 2026 23:24:58 +0000 Subject: [PATCH 2/3] remove unused method --- .../manager/compaction/coordinator/CompactionCoordinator.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 335b2b9ed73..17ca7e98a21 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -388,10 +388,6 @@ protected long getMissingCompactorWarningTime() { return this.ctx.getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME) * 3; } - public long getNumRunningCompactions() { - return RUNNING_CACHE.size(); - } - /** * Return the next compaction job from the queue to a Compactor * From 157cff3d3b622519af78635618428e8d164250b2 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 23 Mar 2026 23:54:48 +0000 Subject: [PATCH 3/3] fix test --- .../manager/compaction/CompactionCoordinatorTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index a11e345a993..881cfa51ce3 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -39,7 +39,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.thrift.TInfo; @@ -370,11 +369,8 @@ public void testGetCompactionJobNoJobs() throws Exception { @Test public void testCleanUpRunning() throws Exception { TExternalCompaction ext1 = createMock(TExternalCompaction.class); - expect(ext1.getGroupName()).andReturn(Constants.DEFAULT_RESOURCE_GROUP_NAME).atLeastOnce(); TExternalCompaction ext2 = createMock(TExternalCompaction.class); - expect(ext2.getGroupName()).andReturn(Constants.DEFAULT_RESOURCE_GROUP_NAME).atLeastOnce(); TExternalCompaction ext3 = createMock(TExternalCompaction.class); - expect(ext3.getGroupName()).andReturn(Constants.DEFAULT_RESOURCE_GROUP_NAME).atLeastOnce(); replay(ext1, ext2, ext3); TestCoordinator coordinator = new TestCoordinator(manager, new ArrayList<>());