Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.cache.Caches.CacheName;
import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
Expand Down Expand Up @@ -207,9 +208,6 @@ static FailureCounts incrementSuccess(Object key, FailureCounts counts) {
protected final Map<ExternalCompactionId,TExternalCompaction> RUNNING_CACHE =
new ConcurrentHashMap<>();

/* Map of group name to last time compactor called to get a compaction job */
private final Map<ResourceGroupId,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>();

private final ServerContext ctx;
private final AuditedSecurityOperation security;
private final CompactionJobQueues jobQueues;
Expand Down Expand Up @@ -382,30 +380,6 @@ public void run() {
LOG.info("Shutting down");
}

private Map<String,Set<HostAndPort>> getIdleCompactors(Set<ServerId> runningCompactors) {

final Map<String,Set<HostAndPort>> allCompactors = new HashMap<>();
runningCompactors.forEach((csi) -> allCompactors
.computeIfAbsent(csi.getResourceGroup().canonical(), (k) -> new HashSet<>())
.add(HostAndPort.fromParts(csi.getHost(), csi.getPort())));

final Set<String> emptyQueues = new HashSet<>();

// Remove all of the compactors that are running a compaction
RUNNING_CACHE.values().forEach(tec -> {
Set<HostAndPort> busyCompactors = allCompactors.get(tec.getGroupName());
if (busyCompactors != null
&& busyCompactors.remove(HostAndPort.fromString(tec.getCompactor()))) {
if (busyCompactors.isEmpty()) {
emptyQueues.add(tec.getGroupName());
}
}
});
// Remove entries with empty queues
emptyQueues.forEach(e -> allCompactors.remove(e));
return allCompactors;
}

protected void startDeadCompactionDetector() {
deadCompactionDetector.start();
}
Expand All @@ -414,10 +388,6 @@ protected long getMissingCompactorWarningTime() {
return this.ctx.getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME) * 3;
}

public long getNumRunningCompactions() {
return RUNNING_CACHE.size();
}

/**
* Return the next compaction job from the queue to a Compactor
*
Expand All @@ -438,7 +408,6 @@ public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials
}
ResourceGroupId groupId = ResourceGroupId.of(groupName);
LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, compactorAddress);
TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis());

TExternalCompactionJob result = null;

Expand Down Expand Up @@ -1203,17 +1172,46 @@ private Set<ResourceGroupId> getCompactionServicesConfigurationGroups()
return groups;
}

record DequeuedSample(Timer timer, long dequeuedCount) {
}

// Used to track how long a resource groups dequeued count is the same.
private final Map<ResourceGroupId,DequeuedSample> dequeuedCountTracker =
Collections.synchronizedMap(new HashMap<>());

private void logNonEmptyQueuesThatAreInactive() {
dequeuedCountTracker.keySet().retainAll(jobQueues.getQueueIds());

Duration warnDuration = Duration.ofMillis(getMissingCompactorWarningTime());

for (var rgid : jobQueues.getQueueIds()) {
var last = dequeuedCountTracker.get(rgid);
long dequeued = jobQueues.getDequeuedJobs(rgid);
if (last == null || last.dequeuedCount != dequeued) {
dequeuedCountTracker.put(rgid, new DequeuedSample(Timer.startNew(), dequeued));
continue;
}

Duration notEmptyDuration = jobQueues.getNotEmptyDuration(rgid);
if (last.timer.elapsed().compareTo(warnDuration) > 0
&& notEmptyDuration.compareTo(warnDuration) > 0) {
// This queue has been non empty and nothing has been dequeued from it for greater than the
// warn duration.
LOG.warn(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this message is a little different than the one it's replacing. This just says that nothing has been dequeued for some amount of time, but says nothing about the compactors. This could be normal if all of the compactors are busy running long compactions. The message this is replacing indicates that something is wrong because there are idle compactors in the group.

I wonder if this should move to the Monitor as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, do not want to warn for the case when the queue is inactive because of lots of long running compactions. Moving it to the monitor sounds good to me. Can close this PR out.

"Compactor group {} has {} queued jobs, has had queued jobs for {}ms, and nothing was dequeued for {}ms",
rgid, jobQueues.getQueuedJobs(rgid), notEmptyDuration.toMillis(),
last.timer.elapsed(TimeUnit.MILLISECONDS));
}
}
}

public void cleanUpInternalState() {

// This method does the following:
//
// 1. Removes entries from RUNNING_CACHE and LONG_RUNNING_COMPACTIONS_BY_RG that are not really
// running
// 1. Removes entries from RUNNING_CACHE that are not really running
// 2. Cancels running compactions for groups that are not in the current configuration
// 3. Remove groups not in configuration from TIME_COMPACTOR_LAST_CHECKED
// 4. Log groups with no compactors
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is useful. We might be able to add it to the Monitor as one of the suggestions / problems.

// 5. Log compactors with no groups
// 6. Log groups with compactors and queued jos that have not checked in

var config = ctx.getConfiguration();
ThreadPools.resizePool(reservationPools.get(DataLevel.ROOT), config,
Expand Down Expand Up @@ -1269,11 +1267,6 @@ public void cleanUpInternalState() {
cancelCompactionOnCompactor(tec.getCompactor(), tec.getJob().getExternalCompactionId());
}
});

final Set<ResourceGroupId> trackedGroups = Set.copyOf(TIME_COMPACTOR_LAST_CHECKED.keySet());
TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(groupsInConfiguration);
LOG.debug("No longer tracking compactor check-in times for groups: {}",
Sets.difference(trackedGroups, TIME_COMPACTOR_LAST_CHECKED.keySet()));
}

final Set<ServerId> runningCompactors = getRunningCompactors();
Expand All @@ -1282,18 +1275,6 @@ public void cleanUpInternalState() {
runningCompactors.forEach(
c -> runningCompactorGroups.add(ResourceGroupId.of(c.getResourceGroup().canonical())));

final Set<ResourceGroupId> groupsWithNoCompactors =
Sets.difference(groupsInConfiguration, runningCompactorGroups);
if (groupsWithNoCompactors != null && !groupsWithNoCompactors.isEmpty()) {
for (ResourceGroupId group : groupsWithNoCompactors) {
long queuedJobCount = jobQueues.getQueuedJobs(group);
if (queuedJobCount > 0) {
LOG.warn("Compactor group {} has {} queued compactions but no running compactors", group,
queuedJobCount);
}
}
}

final Set<ResourceGroupId> compactorsWithNoGroups =
Sets.difference(runningCompactorGroups, groupsInConfiguration);
if (compactorsWithNoGroups != null && !compactorsWithNoGroups.isEmpty()) {
Expand All @@ -1302,19 +1283,6 @@ public void cleanUpInternalState() {
compactorsWithNoGroups);
}

final long now = System.currentTimeMillis();
final long warningTime = getMissingCompactorWarningTime();
Map<String,Set<HostAndPort>> idleCompactors = getIdleCompactors(runningCompactors);
for (ResourceGroupId groupName : groupsInConfiguration) {
long lastCheckTime =
TIME_COMPACTOR_LAST_CHECKED.getOrDefault(groupName, coordinatorStartTime);
if ((now - lastCheckTime) > warningTime && jobQueues.getQueuedJobs(groupName) > 0
&& idleCompactors.containsKey(groupName.canonical())) {
LOG.warn(
"The group {} has queued jobs and {} idle compactors, however none have checked in "
+ "with coordinator for {}ms",
groupName, idleCompactors.get(groupName.canonical()).size(), warningTime);
}
}
logNonEmptyQueuesThatAreInactive();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ public synchronized int add(KeyExtent extent, Collection<CompactionJob> jobs, lo
return jobsAdded;
}

public synchronized Duration getNotEmptyDuration() {
return jobQueue.getNotEmptyDuration();
}

public synchronized long getMaxSize() {
return maxSize.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.accumulo.manager.compaction.queue;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
Expand Down Expand Up @@ -108,6 +109,16 @@ public long getQueuedJobs(ResourceGroupId groupId) {
return prioQ == null ? 0 : prioQ.getQueuedJobs();
}

public long getDequeuedJobs(ResourceGroupId groupId) {
var prioQ = priorityQueues.get(groupId);
return prioQ == null ? 0 : prioQ.getDequeuedJobs();
}

public Duration getNotEmptyDuration(ResourceGroupId groupId) {
var prioQ = priorityQueues.get(groupId);
return prioQ == null ? Duration.ZERO : prioQ.getNotEmptyDuration();
}

public long getQueueCount() {
return priorityQueues.mappingCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.accumulo.manager.compaction.queue;

import java.time.Duration;
import java.util.AbstractMap;
import java.util.Map;
import java.util.TreeMap;

import org.apache.accumulo.core.util.Timer;

import com.google.common.base.Preconditions;

/**
Expand All @@ -43,6 +46,7 @@ private ValueWrapper(V2 val, long computedSize) {
private final TreeMap<K,ValueWrapper<V>> map = new TreeMap<>();
private long dataSize = 0;
private Weigher<V> weigher;
private final Timer notEmptyTimer = Timer.startNew();

private Map.Entry<K,V> unwrap(Map.Entry<K,ValueWrapper<V>> wrapperEntry) {
if (wrapperEntry == null) {
Expand All @@ -54,6 +58,9 @@ private Map.Entry<K,V> unwrap(Map.Entry<K,ValueWrapper<V>> wrapperEntry) {

private void incrementDataSize(ValueWrapper<V> val) {
Preconditions.checkState(dataSize >= 0);
if (dataSize == 0) {
notEmptyTimer.restart();
}
dataSize += val.computedSize;
}

Expand Down Expand Up @@ -98,6 +105,17 @@ public Map.Entry<K,V> firstEntry() {
return unwrap(map.firstEntry());
}

/**
* @return The duration since transitioning from empty to not empty.
*/
public Duration getNotEmptyDuration() {
if (map.isEmpty()) {
return Duration.ZERO;
} else {
return notEmptyTimer.elapsed();
}
}

public void remove(K key) {
var prev = map.remove(key);
decrementDataSize(prev);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.thrift.TInfo;
Expand Down Expand Up @@ -370,11 +369,8 @@ public void testGetCompactionJobNoJobs() throws Exception {
@Test
public void testCleanUpRunning() throws Exception {
TExternalCompaction ext1 = createMock(TExternalCompaction.class);
expect(ext1.getGroupName()).andReturn(Constants.DEFAULT_RESOURCE_GROUP_NAME).atLeastOnce();
TExternalCompaction ext2 = createMock(TExternalCompaction.class);
expect(ext2.getGroupName()).andReturn(Constants.DEFAULT_RESOURCE_GROUP_NAME).atLeastOnce();
TExternalCompaction ext3 = createMock(TExternalCompaction.class);
expect(ext3.getGroupName()).andReturn(Constants.DEFAULT_RESOURCE_GROUP_NAME).atLeastOnce();
replay(ext1, ext2, ext3);

TestCoordinator coordinator = new TestCoordinator(manager, new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -426,4 +428,65 @@ public void testResetSize() throws Exception {
assertEquals(500000, jobQueues.getQueueMaxSize(cg1));
assertEquals(500000, jobQueues.getQueueMaxSize(cg2));
}

@Test
public void testNonEmptyDuration() throws Exception {
CompactionJobQueues jobQueues = new CompactionJobQueues(1000000);

var tid = TableId.of("1");
var extent1 = new KeyExtent(tid, new Text("z"), new Text("q"));
var extent2 = new KeyExtent(tid, new Text("c"), new Text("b"));

var cg1 = ResourceGroupId.of("CG1");
var cg2 = ResourceGroupId.of("CG2");

assertEquals(Duration.ZERO, jobQueues.getNotEmptyDuration(cg1));

jobQueues.add(extent1, List.of(newJob((short) 1, 5, cg1)));
Thread.sleep(2);
assertTrue(jobQueues.getNotEmptyDuration(cg1).toMillis() >= 2);

assertEquals(Duration.ZERO, jobQueues.getNotEmptyDuration(cg2));

jobQueues.add(extent1, List.of(newJob((short) 1, 5, cg2)));
Thread.sleep(2);
assertTrue(jobQueues.getNotEmptyDuration(cg2).toMillis() >= 2);
assertTrue(jobQueues.getNotEmptyDuration(cg1).toMillis() >= 4);
assertTrue(
jobQueues.getNotEmptyDuration(cg1).compareTo(jobQueues.getNotEmptyDuration(cg2)) > 0);

// adding a job should not reset the duration
jobQueues.add(extent2, List.of(newJob((short) 1, 3, cg1)));
assertTrue(jobQueues.getNotEmptyDuration(cg2).toMillis() >= 2);
assertTrue(jobQueues.getNotEmptyDuration(cg1).toMillis() >= 4);
assertTrue(
jobQueues.getNotEmptyDuration(cg1).compareTo(jobQueues.getNotEmptyDuration(cg2)) > 0);

// Removing all jobs from the queue should reset the duration
assertEquals(0, jobQueues.getDequeuedJobs(cg1));
assertNotNull(jobQueues.poll(cg1));
assertNotNull(jobQueues.poll(cg1));
assertTrue(jobQueues.getNotEmptyDuration(cg2).toMillis() >= 2);
assertEquals(Duration.ZERO, jobQueues.getNotEmptyDuration(cg1));

assertEquals(2, jobQueues.getDequeuedJobs(cg1));
assertEquals(0, jobQueues.getDequeuedJobs(cg2));

// Adding another should should reset the non empty timer
jobQueues.add(extent1, List.of(newJob((short) 1, 5, cg1)));
Thread.sleep(2);
assertTrue(jobQueues.getNotEmptyDuration(cg2).toMillis() >= 4);
assertTrue(jobQueues.getNotEmptyDuration(cg1).toMillis() >= 2);
assertTrue(
jobQueues.getNotEmptyDuration(cg1).compareTo(jobQueues.getNotEmptyDuration(cg2)) < 0);

// remove everything from all queues
assertNotNull(jobQueues.poll(cg1));
assertNotNull(jobQueues.poll(cg2));
assertEquals(Duration.ZERO, jobQueues.getNotEmptyDuration(cg1));
assertEquals(Duration.ZERO, jobQueues.getNotEmptyDuration(cg2));
assertEquals(3, jobQueues.getDequeuedJobs(cg1));
assertEquals(1, jobQueues.getDequeuedJobs(cg2));

}
}