From 37b2383e484443e87ea3f81092d14fa0896d953a Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 23 Mar 2026 23:32:32 +0000 Subject: [PATCH 1/3] removes CoordinatorSummaryLogger This logger was useful in 2.1. However in 4.0 its redundant with functionality in the monitor and is just extra code to maintain. Removing it also removes a usage of the running cache which may be helpful for #6217 --- .../coordinator/CompactionCoordinator.java | 10 --- .../coordinator/CoordinatorSummaryLogger.java | 88 ------------------- .../compaction/CompactionCoordinatorTest.java | 3 - 3 files changed, 101 deletions(-) delete mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index aee9679cc01..135fa77abac 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -369,7 +369,6 @@ public void run() { } startDeadCompactionDetector(); - startQueueRunningSummaryLogging(); startFailureSummaryLogging(); startInternalStateCleaner(ctx.getScheduledExecutor()); @@ -804,15 +803,6 @@ private void captureFailure(ExternalCompactionId ecid, KeyExtent extent) { failingTables.compute(extent.tableId(), FailureCounts::incrementFailure); } - protected void startQueueRunningSummaryLogging() { - CoordinatorSummaryLogger summaryLogger = - new CoordinatorSummaryLogger(ctx, this.jobQueues, this.RUNNING_CACHE, compactorCounts); - - ScheduledFuture future = ctx.getScheduledExecutor() - .scheduleWithFixedDelay(summaryLogger::logSummary, 0, 1, TimeUnit.MINUTES); - ThreadPools.watchNonCriticalScheduledTask(future); - } - protected void startFailureSummaryLogging() { ScheduledFuture future = ctx.getScheduledExecutor().scheduleWithFixedDelay(this::printStats, 0, 5, TimeUnit.MINUTES); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java deleted file mode 100644 index 2d71c2d7969..00000000000 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.manager.compaction.coordinator; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; -import org.apache.accumulo.core.data.ResourceGroupId; -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; -import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; -import org.apache.accumulo.server.ServerContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.benmanes.caffeine.cache.Cache; - -public class CoordinatorSummaryLogger { - private static final Logger LOG = LoggerFactory.getLogger(CoordinatorSummaryLogger.class); - - private final ServerContext ctx; - private final CompactionJobQueues jobQueues; - private final Map running; - private final Cache compactorCounts; - - public CoordinatorSummaryLogger(ServerContext ctx, CompactionJobQueues jobQueues, - Map running, - Cache compactorCounts) { - this.ctx = ctx; - this.jobQueues = jobQueues; - this.running = running; - this.compactorCounts = compactorCounts; - } - - public void logSummary() { - - final Map perQueueRunningCount = new HashMap<>(); - final Map perTableRunningCount = new HashMap<>(); - - running.values().forEach(rc -> { - TableId tid = KeyExtent.fromThrift(rc.getJob().getExtent()).tableId(); - String tableName = null; - try { - tableName = ctx.getQualifiedTableName(tid); - } catch (TableNotFoundException e) { - tableName = "Unmapped table id: " + tid.canonical(); - } - perQueueRunningCount - .computeIfAbsent(ResourceGroupId.of(rc.getGroupName()), q -> new AtomicLong(0)) - .incrementAndGet(); - perTableRunningCount.computeIfAbsent(tableName, t -> new AtomicLong(0)).incrementAndGet(); - }); - - perQueueRunningCount.forEach((groupId, count) -> { - LOG.info( - "Queue {}: compactors: {}, queued majc (minimum, possibly higher): {}, running majc: {}", - groupId, compactorCounts.asMap().getOrDefault(groupId, 0), - // This map only contains the highest priority for each tserver. So when tservers have - // other priorities that need to compact or have more than one compaction for a - // priority level this count will be lower than the actual number of queued. - jobQueues.getQueuedJobs(groupId), count.get()); - - }); - perTableRunningCount - .forEach((t, count) -> LOG.info("Running compactions for table {}: {}", t, count)); - } - -} diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index a11e345a993..9603d8a53c0 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -118,9 +118,6 @@ protected int countCompactors(ResourceGroupId groupName) { return 3; } - @Override - protected void startQueueRunningSummaryLogging() {} - @Override protected void startFailureSummaryLogging() {} From 66e1f8cee56f5e07a4e9744f03c265d80268c383 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 25 Mar 2026 15:28:31 +0000 Subject: [PATCH 2/3] adds running compaction stats to monitor --- .../accumulo/monitor/next/SystemInformation.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java index 36bf500a99e..b59e7f3550e 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; import java.util.stream.Stream; import org.apache.accumulo.core.Constants; @@ -398,6 +399,9 @@ public Stream stream() { protected final Map longRunningCompactionsByRg = new ConcurrentHashMap<>(); + protected final Map runningCompactionsPerTable = new ConcurrentHashMap<>(); + protected final Map runningCompactionsPerGroup = new ConcurrentHashMap<>(); + // Table Information private final Map tables = new ConcurrentHashMap<>(); private final Map> tablets = new ConcurrentHashMap<>(); @@ -542,6 +546,12 @@ public void processResponse(final ServerId server, final MetricResponse response } public void processExternalCompaction(TExternalCompaction tec) { + + var tableId = KeyExtent.fromThrift(tec.getJob().extent).tableId(); + runningCompactionsPerTable.computeIfAbsent(tableId, t -> new LongAdder()).increment(); + runningCompactionsPerGroup.computeIfAbsent(tec.getGroupName(), t -> new LongAdder()) + .increment(); + this.longRunningCompactionsByRg.computeIfAbsent(tec.getGroupName(), k -> new TimeOrderedRunningCompactionSet(rgLongRunningCompactionSize)).add(tec); } From 2834f10429ba7e4f2446a21c96a1b2e75e82c98e Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 25 Mar 2026 16:40:46 +0000 Subject: [PATCH 3/3] code review update --- .../org/apache/accumulo/monitor/next/SystemInformation.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java index b59e7f3550e..06e6810b978 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java @@ -443,6 +443,8 @@ public void clear() { tablets.clear(); deployment.clear(); suggestions.clear(); + runningCompactionsPerGroup.clear(); + runningCompactionsPerTable.clear(); scanServerView = null; }