diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index aee9679cc01..135fa77abac 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -369,7 +369,6 @@ public void run() { } startDeadCompactionDetector(); - startQueueRunningSummaryLogging(); startFailureSummaryLogging(); startInternalStateCleaner(ctx.getScheduledExecutor()); @@ -804,15 +803,6 @@ private void captureFailure(ExternalCompactionId ecid, KeyExtent extent) { failingTables.compute(extent.tableId(), FailureCounts::incrementFailure); } - protected void startQueueRunningSummaryLogging() { - CoordinatorSummaryLogger summaryLogger = - new CoordinatorSummaryLogger(ctx, this.jobQueues, this.RUNNING_CACHE, compactorCounts); - - ScheduledFuture future = ctx.getScheduledExecutor() - .scheduleWithFixedDelay(summaryLogger::logSummary, 0, 1, TimeUnit.MINUTES); - ThreadPools.watchNonCriticalScheduledTask(future); - } - protected void startFailureSummaryLogging() { ScheduledFuture future = ctx.getScheduledExecutor().scheduleWithFixedDelay(this::printStats, 0, 5, TimeUnit.MINUTES); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java deleted file mode 100644 index 2d71c2d7969..00000000000 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CoordinatorSummaryLogger.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.manager.compaction.coordinator; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; -import org.apache.accumulo.core.data.ResourceGroupId; -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; -import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; -import org.apache.accumulo.server.ServerContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.benmanes.caffeine.cache.Cache; - -public class CoordinatorSummaryLogger { - private static final Logger LOG = LoggerFactory.getLogger(CoordinatorSummaryLogger.class); - - private final ServerContext ctx; - private final CompactionJobQueues jobQueues; - private final Map running; - private final Cache compactorCounts; - - public CoordinatorSummaryLogger(ServerContext ctx, CompactionJobQueues jobQueues, - Map running, - Cache compactorCounts) { - this.ctx = ctx; - this.jobQueues = jobQueues; - this.running = running; - this.compactorCounts = compactorCounts; - } - - public void logSummary() { - - final Map perQueueRunningCount = new HashMap<>(); - final Map perTableRunningCount = new HashMap<>(); - - running.values().forEach(rc -> { - TableId tid = KeyExtent.fromThrift(rc.getJob().getExtent()).tableId(); - String tableName = null; - try { - tableName = ctx.getQualifiedTableName(tid); - } catch (TableNotFoundException e) { - tableName = "Unmapped table id: " + tid.canonical(); - } - perQueueRunningCount - .computeIfAbsent(ResourceGroupId.of(rc.getGroupName()), q -> new AtomicLong(0)) - .incrementAndGet(); - perTableRunningCount.computeIfAbsent(tableName, t -> new AtomicLong(0)).incrementAndGet(); - }); - - perQueueRunningCount.forEach((groupId, count) -> { - LOG.info( - "Queue {}: compactors: {}, queued majc (minimum, possibly higher): {}, running majc: {}", - groupId, compactorCounts.asMap().getOrDefault(groupId, 0), - // This map only contains the highest priority for each tserver. So when tservers have - // other priorities that need to compact or have more than one compaction for a - // priority level this count will be lower than the actual number of queued. - jobQueues.getQueuedJobs(groupId), count.get()); - - }); - perTableRunningCount - .forEach((t, count) -> LOG.info("Running compactions for table {}: {}", t, count)); - } - -} diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index a11e345a993..9603d8a53c0 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -118,9 +118,6 @@ protected int countCompactors(ResourceGroupId groupName) { return 3; } - @Override - protected void startQueueRunningSummaryLogging() {} - @Override protected void startFailureSummaryLogging() {} diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java index 36bf500a99e..06e6810b978 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; import java.util.stream.Stream; import org.apache.accumulo.core.Constants; @@ -398,6 +399,9 @@ public Stream stream() { protected final Map longRunningCompactionsByRg = new ConcurrentHashMap<>(); + protected final Map runningCompactionsPerTable = new ConcurrentHashMap<>(); + protected final Map runningCompactionsPerGroup = new ConcurrentHashMap<>(); + // Table Information private final Map tables = new ConcurrentHashMap<>(); private final Map> tablets = new ConcurrentHashMap<>(); @@ -439,6 +443,8 @@ public void clear() { tablets.clear(); deployment.clear(); suggestions.clear(); + runningCompactionsPerGroup.clear(); + runningCompactionsPerTable.clear(); scanServerView = null; } @@ -542,6 +548,12 @@ public void processResponse(final ServerId server, final MetricResponse response } public void processExternalCompaction(TExternalCompaction tec) { + + var tableId = KeyExtent.fromThrift(tec.getJob().extent).tableId(); + runningCompactionsPerTable.computeIfAbsent(tableId, t -> new LongAdder()).increment(); + runningCompactionsPerGroup.computeIfAbsent(tec.getGroupName(), t -> new LongAdder()) + .increment(); + this.longRunningCompactionsByRg.computeIfAbsent(tec.getGroupName(), k -> new TimeOrderedRunningCompactionSet(rgLongRunningCompactionSize)).add(tec); }