From 4f3a5c23e5fffd7e7433c0999bc6be7a1826c15b Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Wed, 25 Mar 2026 21:21:45 +0000 Subject: [PATCH] Modified Endpoints for multiple managers and new compaction summary Closes #6247 --- .../monitor/next/CustomObjectMapper.java | 3 + .../accumulo/monitor/next/Endpoints.java | 37 ++++++--- .../monitor/next/InformationFetcher.java | 2 +- .../monitor/next/SystemInformation.java | 80 ++++++++++++++----- .../next/serializers/ServerIdSerializer.java | 41 ++++++++++ 5 files changed, 128 insertions(+), 35 deletions(-) create mode 100644 server/monitor/src/main/java/org/apache/accumulo/monitor/next/serializers/ServerIdSerializer.java diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/CustomObjectMapper.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/CustomObjectMapper.java index 5f98e3a6197..e2d81bed48b 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/CustomObjectMapper.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/CustomObjectMapper.java @@ -21,6 +21,7 @@ import jakarta.ws.rs.ext.ContextResolver; import jakarta.ws.rs.ext.Provider; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.metrics.flatbuffers.FMetric; @@ -30,6 +31,7 @@ import org.apache.accumulo.monitor.next.serializers.FMetricSerializer; import org.apache.accumulo.monitor.next.serializers.IdSerializer; import org.apache.accumulo.monitor.next.serializers.MetricResponseSerializer; +import org.apache.accumulo.monitor.next.serializers.ServerIdSerializer; import org.apache.accumulo.monitor.next.serializers.TabletIdSerializer; import org.apache.accumulo.monitor.next.serializers.ThriftSerializer; @@ -50,6 +52,7 @@ public CustomObjectMapper() { // Configure the ObjectMapper mapper = new ObjectMapper(); SimpleModule module = new SimpleModule(); + module.addKeySerializer(ServerId.class, new ServerIdSerializer()); module.addKeySerializer(Id.class, new IdSerializer()); module.addSerializer(FMetric.class, new FMetricSerializer()); module.addSerializer(MetricResponse.class, new MetricResponseSerializer()); diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/Endpoints.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/Endpoints.java index d2fdaa26ef3..1a69d85eb54 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/Endpoints.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/Endpoints.java @@ -24,6 +24,7 @@ import java.lang.annotation.Target; import java.lang.reflect.Method; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -50,6 +51,7 @@ import org.apache.accumulo.core.util.compaction.RunningCompactionInfo; import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.monitor.next.InformationFetcher.InstanceSummary; +import org.apache.accumulo.monitor.next.SystemInformation.CompactionSummary; import org.apache.accumulo.monitor.next.SystemInformation.ProcessSummary; import org.apache.accumulo.monitor.next.SystemInformation.TableSummary; import org.apache.accumulo.monitor.next.SystemInformation.TimeOrderedRunningCompactionSet; @@ -149,24 +151,35 @@ public Collection getAll() { @Path("manager") @Produces(MediaType.APPLICATION_JSON) @Description("Returns the metric response for the Manager") - public MetricResponse getManager() { - final ServerId s = monitor.getInformationFetcher().getSummaryForEndpoint().getManager(); - if (s == null) { - throw new NotFoundException("Manager not found"); + public Collection getManagers() { + final Set managers = + monitor.getInformationFetcher().getSummaryForEndpoint().getManagers(); + if (managers == null) { + return List.of(); } - return monitor.getInformationFetcher().getAllMetrics().asMap().get(s); + return monitor.getInformationFetcher().getAllMetrics().getAllPresent(managers).values(); } @GET @Path("manager/metrics") @Produces(MediaType.APPLICATION_JSON) @Description("Returns the metrics for the Manager") - public List getManagerMetrics() { - var managerMetrics = getManager().getMetrics(); - if (managerMetrics != null) { - return managerMetrics.stream().map(FMetric::getRootAsFMetric).collect(Collectors.toList()); + public Map> getManagerMetrics() { + final Set managers = + monitor.getInformationFetcher().getSummaryForEndpoint().getManagers(); + if (managers == null) { + return Map.of(); + } + + Map> results = new HashMap<>(managers.size()); + for (ServerId s : managers) { + MetricResponse mr = monitor.getInformationFetcher().getAllMetrics().getIfPresent(s); + if (mr != null) { + results.put(s, + mr.getMetrics().stream().map(FMetric::getRootAsFMetric).collect(Collectors.toList())); + } } - return List.of(); + return results; } @GET @@ -320,8 +333,8 @@ public Map getTabletServerAllMetricSummary() { @GET @Path("compactions/summary") @Produces(MediaType.APPLICATION_JSON) - @Description("Returns the metrics for all compaction queues") - public Map> getCompactionMetricSummary() { + @Description("Returns summary information for compactions") + public CompactionSummary getCompactionSummary() { return monitor.getInformationFetcher().getSummaryForEndpoint().getCompactionMetricSummary(); } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java index cb67c61fc4f..097de350223 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java @@ -340,7 +340,7 @@ public void run() { LOG.info("Finished fetching metrics from servers"); LOG.info( "All: {}, Manager: {}, Garbage Collector: {}, Compactors: {}, Scan Servers: {}, Tablet Servers: {}", - allMetrics.estimatedSize(), summary.getManager() != null, + allMetrics.estimatedSize(), summary.getManagers().size(), summary.getGarbageCollector() != null, summary.getCompactorAllMetricSummary().isEmpty() ? 0 : summary.getCompactorAllMetricSummary().entrySet().iterator().next().getValue() diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java index 2a5148010fb..fbade4c5aef 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -356,6 +357,23 @@ public Stream stream() { } + public record CompactionSummary(Map tableCompactions, + Map groupCompactions, + Map>> compactionQueueMetrics) { + + public List getQueueMetrics(String resourceGroup) { + final List metrics = new ArrayList<>(); + this.compactionQueueMetrics().forEach((k, v) -> { + v.forEach((k2, v2) -> { + if (k2.equals(resourceGroup)) { + metrics.addAll(v2); + } + }); + }); + return metrics; + } + }; + private static final Logger LOG = LoggerFactory.getLogger(SystemInformation.class); private final DistributionStatisticConfig DSC = @@ -368,7 +386,7 @@ public Stream stream() { private final Set resourceGroups = ConcurrentHashMap.newKeySet(); private final Set problemHosts = ConcurrentHashMap.newKeySet(); - private final AtomicReference manager = new AtomicReference<>(); + private final Set managers = ConcurrentHashMap.newKeySet(); private final AtomicReference gc = new AtomicReference<>(); // index of resource group name to set of servers @@ -395,7 +413,8 @@ public Stream stream() { new ConcurrentHashMap<>(); // Compaction Information - private final Map> queueMetrics = new ConcurrentHashMap<>(); + private final AtomicReference compactionSummary = new AtomicReference<>(); + private volatile Set registeredCompactors = Set.of(); private volatile HostAndPort coordinatorHost; @@ -429,6 +448,8 @@ public SystemInformation(Cache allMetrics, ServerContex public void clear() { resourceGroups.clear(); problemHosts.clear(); + managers.clear(); + gc.set(null); compactors.clear(); sservers.clear(); tservers.clear(); @@ -438,9 +459,9 @@ public void clear() { rgCompactorMetrics.clear(); rgSServerMetrics.clear(); rgTServerMetrics.clear(); - queueMetrics.clear(); registeredCompactors = Set.of(); coordinatorHost = null; + compactionSummary.set(null); longRunningCompactionsByRg.clear(); tables.clear(); tablets.clear(); @@ -488,20 +509,35 @@ private void updateAggregates(final MetricResponse response, } - private void createCompactionSummary(MetricResponse response) { - if (response.getMetrics() != null) { - for (final ByteBuffer binary : response.getMetrics()) { - FMetric fm = FMetric.getRootAsFMetric(binary); - for (int i = 0; i < fm.tagsLength(); i++) { - FTag t = fm.tags(i); - if (t.key().equals("queue.id")) { - queueMetrics - .computeIfAbsent(t.value(), (k) -> Collections.synchronizedList(new ArrayList<>())) - .add(fm); + private void createCompactionSummary() { + + Map tableCompactions = new HashMap<>(); + runningCompactionsPerTable.forEach((k, v) -> tableCompactions.put(k, v.sum())); + + Map groupCompactions = new HashMap<>(); + runningCompactionsPerGroup.forEach((k, v) -> groupCompactions.put(k, v.sum())); + + Map>> coordinatorMetrics = new HashMap<>(); + + for (ServerId manager : managers) { + MetricResponse managerMetrics = allMetrics.getIfPresent(manager); + if (managerMetrics.getMetrics() != null) { + Map> queueMetrics = new HashMap<>(); + for (final ByteBuffer binary : managerMetrics.getMetrics()) { + FMetric fm = FMetric.getRootAsFMetric(binary); + for (int i = 0; i < fm.tagsLength(); i++) { + FTag t = fm.tags(i); + if (t.key().equals("queue.id")) { + queueMetrics.computeIfAbsent(t.value(), + (k) -> Collections.synchronizedList(new ArrayList<>())).add(fm); + } } } + coordinatorMetrics.put(manager, queueMetrics); } } + compactionSummary + .set(new CompactionSummary(tableCompactions, groupCompactions, coordinatorMetrics)); } public void processResponse(final ServerId server, final MetricResponse response) { @@ -523,10 +559,7 @@ public void processResponse(final ServerId server, final MetricResponse response } break; case MANAGER: - if (manager.get() == null || !manager.get().equals(server)) { - manager.set(server); - } - createCompactionSummary(response); + managers.add(server); break; case SCAN_SERVER: sservers.computeIfAbsent(response.getResourceGroup(), (rg) -> ConcurrentHashMap.newKeySet()) @@ -588,6 +621,9 @@ public void finish() { .computeIfAbsent(serverId.getType().name(), t -> new ProcessSummary()) .addNotResponded(serverId); }); + // Create a summary of compaction information + createCompactionSummary(); + for (SystemTables table : SystemTables.values()) { TableConfiguration tconf = this.ctx.getTableConfiguration(table.tableId()); String balancerRG = tconf.get(TableLoadBalancer.TABLE_ASSIGNMENT_GROUP_PROPERTY); @@ -599,7 +635,7 @@ public void finish() { } for (String rg : getResourceGroups()) { Set rgCompactors = getCompactorResourceGroupServers(rg); - List metrics = queueMetrics.get(rg); + List metrics = compactionSummary.get().getQueueMetrics(rg); Optional queued = metrics.stream() .filter(fm -> fm.name().equals(Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.getName())) .findFirst(); @@ -645,8 +681,8 @@ public Set getProblemHosts() { return this.problemHosts; } - public ServerId getManager() { - return this.manager.get(); + public Set getManagers() { + return this.managers; } public ServerId getGarbageCollector() { @@ -692,8 +728,8 @@ public Map getTServerAllMetricSummary() { return this.totalTServerMetrics; } - public Map> getCompactionMetricSummary() { - return this.queueMetrics; + public CompactionSummary getCompactionMetricSummary() { + return this.compactionSummary.get(); } public Set getCompactorServers() { diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/serializers/ServerIdSerializer.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/serializers/ServerIdSerializer.java new file mode 100644 index 00000000000..5bc498ff36b --- /dev/null +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/serializers/ServerIdSerializer.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.monitor.next.serializers; + +import java.io.IOException; + +import org.apache.accumulo.core.client.admin.servers.ServerId; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +public class ServerIdSerializer extends JsonSerializer { + + @Override + public void serialize(ServerId value, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + StringBuilder buf = new StringBuilder(); + buf.append("type=").append(value.getType().name()).append(";group=") + .append(value.getResourceGroup().canonical()).append(";address=") + .append(value.toHostPortString()); + gen.writeFieldName(buf.toString()); + } + +}