Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import jakarta.ws.rs.ext.ContextResolver;
import jakarta.ws.rs.ext.Provider;

import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.metrics.flatbuffers.FMetric;
Expand All @@ -30,6 +31,7 @@
import org.apache.accumulo.monitor.next.serializers.FMetricSerializer;
import org.apache.accumulo.monitor.next.serializers.IdSerializer;
import org.apache.accumulo.monitor.next.serializers.MetricResponseSerializer;
import org.apache.accumulo.monitor.next.serializers.ServerIdSerializer;
import org.apache.accumulo.monitor.next.serializers.TabletIdSerializer;
import org.apache.accumulo.monitor.next.serializers.ThriftSerializer;

Expand All @@ -50,6 +52,7 @@ public CustomObjectMapper() {
// Configure the ObjectMapper
mapper = new ObjectMapper();
SimpleModule module = new SimpleModule();
module.addKeySerializer(ServerId.class, new ServerIdSerializer());
module.addKeySerializer(Id.class, new IdSerializer());
module.addSerializer(FMetric.class, new FMetricSerializer());
module.addSerializer(MetricResponse.class, new MetricResponseSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.lang.annotation.Target;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -50,6 +51,7 @@
import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
import org.apache.accumulo.monitor.Monitor;
import org.apache.accumulo.monitor.next.InformationFetcher.InstanceSummary;
import org.apache.accumulo.monitor.next.SystemInformation.CompactionSummary;
import org.apache.accumulo.monitor.next.SystemInformation.ProcessSummary;
import org.apache.accumulo.monitor.next.SystemInformation.TableSummary;
import org.apache.accumulo.monitor.next.SystemInformation.TimeOrderedRunningCompactionSet;
Expand Down Expand Up @@ -149,24 +151,35 @@ public Collection<MetricResponse> getAll() {
@Path("manager")
@Produces(MediaType.APPLICATION_JSON)
@Description("Returns the metric response for the Manager")
public MetricResponse getManager() {
final ServerId s = monitor.getInformationFetcher().getSummaryForEndpoint().getManager();
if (s == null) {
throw new NotFoundException("Manager not found");
public Collection<MetricResponse> getManagers() {
final Set<ServerId> managers =
monitor.getInformationFetcher().getSummaryForEndpoint().getManagers();
if (managers == null) {
return List.of();
}
return monitor.getInformationFetcher().getAllMetrics().asMap().get(s);
return monitor.getInformationFetcher().getAllMetrics().getAllPresent(managers).values();
}

@GET
@Path("manager/metrics")
@Produces(MediaType.APPLICATION_JSON)
@Description("Returns the metrics for the Manager")
public List<FMetric> getManagerMetrics() {
var managerMetrics = getManager().getMetrics();
if (managerMetrics != null) {
return managerMetrics.stream().map(FMetric::getRootAsFMetric).collect(Collectors.toList());
public Map<ServerId,List<FMetric>> getManagerMetrics() {
final Set<ServerId> managers =
monitor.getInformationFetcher().getSummaryForEndpoint().getManagers();
if (managers == null) {
return Map.of();
}

Map<ServerId,List<FMetric>> results = new HashMap<>(managers.size());
for (ServerId s : managers) {
MetricResponse mr = monitor.getInformationFetcher().getAllMetrics().getIfPresent(s);
if (mr != null) {
results.put(s,
mr.getMetrics().stream().map(FMetric::getRootAsFMetric).collect(Collectors.toList()));
}
}
return List.of();
return results;
}

@GET
Expand Down Expand Up @@ -320,8 +333,8 @@ public Map<Id,CumulativeDistributionSummary> getTabletServerAllMetricSummary() {
@GET
@Path("compactions/summary")
@Produces(MediaType.APPLICATION_JSON)
@Description("Returns the metrics for all compaction queues")
public Map<String,List<FMetric>> getCompactionMetricSummary() {
@Description("Returns summary information for compactions")
public CompactionSummary getCompactionSummary() {
return monitor.getInformationFetcher().getSummaryForEndpoint().getCompactionMetricSummary();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public void run() {
LOG.info("Finished fetching metrics from servers");
LOG.info(
"All: {}, Manager: {}, Garbage Collector: {}, Compactors: {}, Scan Servers: {}, Tablet Servers: {}",
allMetrics.estimatedSize(), summary.getManager() != null,
allMetrics.estimatedSize(), summary.getManagers().size(),
summary.getGarbageCollector() != null,
summary.getCompactorAllMetricSummary().isEmpty() ? 0
: summary.getCompactorAllMetricSummary().entrySet().iterator().next().getValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -356,6 +357,23 @@ public Stream<RunningCompactionInfo> stream() {

}

public record CompactionSummary(Map<TableId,Long> tableCompactions,
Map<String,Long> groupCompactions,
Map<ServerId,Map<String,List<FMetric>>> compactionQueueMetrics) {

public List<FMetric> getQueueMetrics(String resourceGroup) {
final List<FMetric> metrics = new ArrayList<>();
this.compactionQueueMetrics().forEach((k, v) -> {
v.forEach((k2, v2) -> {
if (k2.equals(resourceGroup)) {
metrics.addAll(v2);
}
});
});
return metrics;
}
};

private static final Logger LOG = LoggerFactory.getLogger(SystemInformation.class);

private final DistributionStatisticConfig DSC =
Expand All @@ -368,7 +386,7 @@ public Stream<RunningCompactionInfo> stream() {

private final Set<String> resourceGroups = ConcurrentHashMap.newKeySet();
private final Set<ServerId> problemHosts = ConcurrentHashMap.newKeySet();
private final AtomicReference<ServerId> manager = new AtomicReference<>();
private final Set<ServerId> managers = ConcurrentHashMap.newKeySet();
private final AtomicReference<ServerId> gc = new AtomicReference<>();

// index of resource group name to set of servers
Expand All @@ -395,7 +413,8 @@ public Stream<RunningCompactionInfo> stream() {
new ConcurrentHashMap<>();

// Compaction Information
private final Map<String,List<FMetric>> queueMetrics = new ConcurrentHashMap<>();
private final AtomicReference<CompactionSummary> compactionSummary = new AtomicReference<>();

private volatile Set<ServerId> registeredCompactors = Set.of();
private volatile HostAndPort coordinatorHost;

Expand Down Expand Up @@ -431,6 +450,8 @@ public SystemInformation(Cache<ServerId,MetricResponse> allMetrics, ServerContex
public void clear() {
resourceGroups.clear();
problemHosts.clear();
managers.clear();
gc.set(null);
compactors.clear();
sservers.clear();
tservers.clear();
Expand All @@ -440,9 +461,9 @@ public void clear() {
rgCompactorMetrics.clear();
rgSServerMetrics.clear();
rgTServerMetrics.clear();
queueMetrics.clear();
registeredCompactors = Set.of();
coordinatorHost = null;
compactionSummary.set(null);
longRunningCompactionsByRg.clear();
tables.clear();
tablets.clear();
Expand Down Expand Up @@ -491,20 +512,35 @@ private void updateAggregates(final MetricResponse response,

}

private void createCompactionSummary(MetricResponse response) {
if (response.getMetrics() != null) {
for (final ByteBuffer binary : response.getMetrics()) {
FMetric fm = FMetric.getRootAsFMetric(binary);
for (int i = 0; i < fm.tagsLength(); i++) {
FTag t = fm.tags(i);
if (t.key().equals("queue.id")) {
queueMetrics
.computeIfAbsent(t.value(), (k) -> Collections.synchronizedList(new ArrayList<>()))
.add(fm);
private void createCompactionSummary() {

Map<TableId,Long> tableCompactions = new HashMap<>();
runningCompactionsPerTable.forEach((k, v) -> tableCompactions.put(k, v.sum()));

Map<String,Long> groupCompactions = new HashMap<>();
runningCompactionsPerGroup.forEach((k, v) -> groupCompactions.put(k, v.sum()));

Map<ServerId,Map<String,List<FMetric>>> coordinatorMetrics = new HashMap<>();

for (ServerId manager : managers) {
MetricResponse managerMetrics = allMetrics.getIfPresent(manager);
if (managerMetrics.getMetrics() != null) {
Map<String,List<FMetric>> queueMetrics = new HashMap<>();
for (final ByteBuffer binary : managerMetrics.getMetrics()) {
FMetric fm = FMetric.getRootAsFMetric(binary);
for (int i = 0; i < fm.tagsLength(); i++) {
FTag t = fm.tags(i);
if (t.key().equals("queue.id")) {
queueMetrics.computeIfAbsent(t.value(),
(k) -> Collections.synchronizedList(new ArrayList<>())).add(fm);
}
}
}
coordinatorMetrics.put(manager, queueMetrics);
}
}
compactionSummary
.set(new CompactionSummary(tableCompactions, groupCompactions, coordinatorMetrics));
}

public void processResponse(final ServerId server, final MetricResponse response) {
Expand All @@ -526,10 +562,7 @@ public void processResponse(final ServerId server, final MetricResponse response
}
break;
case MANAGER:
if (manager.get() == null || !manager.get().equals(server)) {
manager.set(server);
}
createCompactionSummary(response);
managers.add(server);
break;
case SCAN_SERVER:
sservers.computeIfAbsent(response.getResourceGroup(), (rg) -> ConcurrentHashMap.newKeySet())
Expand Down Expand Up @@ -595,6 +628,9 @@ public void finish() {
.computeIfAbsent(serverId.getType().name(), t -> new ProcessSummary())
.addNotResponded(serverId);
});
// Create a summary of compaction information
createCompactionSummary();

for (SystemTables table : SystemTables.values()) {
TableConfiguration tconf = this.ctx.getTableConfiguration(table.tableId());
String balancerRG = tconf.get(TableLoadBalancer.TABLE_ASSIGNMENT_GROUP_PROPERTY);
Expand All @@ -606,7 +642,7 @@ public void finish() {
}
for (String rg : getResourceGroups()) {
Set<ServerId> rgCompactors = getCompactorResourceGroupServers(rg);
List<FMetric> metrics = queueMetrics.get(rg);
List<FMetric> metrics = compactionSummary.get().getQueueMetrics(rg);
Optional<FMetric> queued = metrics.stream()
.filter(fm -> fm.name().equals(Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.getName()))
.findFirst();
Expand Down Expand Up @@ -660,8 +696,8 @@ public Set<ServerId> getProblemHosts() {
return this.problemHosts;
}

public ServerId getManager() {
return this.manager.get();
public Set<ServerId> getManagers() {
return this.managers;
}

public ServerId getGarbageCollector() {
Expand Down Expand Up @@ -707,8 +743,8 @@ public Map<Id,CumulativeDistributionSummary> getTServerAllMetricSummary() {
return this.totalTServerMetrics;
}

public Map<String,List<FMetric>> getCompactionMetricSummary() {
return this.queueMetrics;
public CompactionSummary getCompactionMetricSummary() {
return this.compactionSummary.get();
}

public Set<ServerId> getCompactorServers() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.monitor.next.serializers;

import java.io.IOException;

import org.apache.accumulo.core.client.admin.servers.ServerId;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;

public class ServerIdSerializer extends JsonSerializer<ServerId> {

@Override
public void serialize(ServerId value, JsonGenerator gen, SerializerProvider serializers)
throws IOException {
StringBuilder buf = new StringBuilder();
buf.append("type=").append(value.getType().name()).append(";group=")
.append(value.getResourceGroup().canonical()).append(";address=")
.append(value.toHostPortString());
gen.writeFieldName(buf.toString());
}

}