Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@
import com.google.common.base.Preconditions;
import com.google.inject.Singleton;
import jakarta.annotation.Nonnull;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.sql.Timestamp;
import java.text.ParseException;
Expand All @@ -49,8 +52,12 @@
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.BlockingQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
Expand Down Expand Up @@ -877,4 +884,38 @@ public static long extractLongMetricValue(Map<String, Object> metrics, String ke
}
return -1;
}

public static <T> Response downloadCsv(
String fileName,
List<String> headers,
List<T> data,
List<Function<T, Object>> columnExtractors) {

StreamingOutput stream = output -> {
CSVFormat format = CSVFormat.DEFAULT.builder()
.setHeader(headers.toArray(new String[0]))
.build();

try (CSVPrinter printer = new CSVPrinter(
new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8)),
format)) {

for (T item : data) {
List<Object> row = new ArrayList<>();
for (Function<T, Object> extractor : columnExtractors) {
row.add(extractor.apply(item));
}
printer.printRecord(row);
}

printer.flush();
}
};

return Response.ok(stream)
.type("text/csv")
.header("Content-Disposition",
"attachment; filename=\"" + fileName + "\"")
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,16 @@

package org.apache.hadoop.ozone.recon.api;

import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse;
import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics;
import org.apache.hadoop.ozone.recon.api.types.ScmPendingDeletion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -89,51 +81,6 @@ public Response getPendingDeletionByComponent(
}
}

@GET
@Path("/download")
public Response downloadPendingDeleteData() {
DataNodeMetricsServiceResponse dnMetricsResponse = dataNodeMetricsService.getCollectedMetrics(null);

if (dnMetricsResponse.getStatus() != DataNodeMetricsService.MetricCollectionStatus.FINISHED) {
return Response.status(Response.Status.ACCEPTED)
.entity(dnMetricsResponse)
.type("application/json")
.build();
}

if (null == dnMetricsResponse.getPendingDeletionPerDataNode()) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity("Metrics data is missing despite FINISHED status.")
.type("text/plain")
.build();
}

StreamingOutput stream = output -> {
CSVFormat format = CSVFormat.DEFAULT.builder()
.setHeader("HostName", "Datanode UUID", "Pending Block Size (bytes)").build();
try (CSVPrinter csvPrinter = new CSVPrinter(
new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8)), format)) {
for (DatanodePendingDeletionMetrics metric : dnMetricsResponse.getPendingDeletionPerDataNode()) {
csvPrinter.printRecord(
metric.getHostName(),
metric.getDatanodeUuid(),
metric.getPendingBlockSize()
);
}
csvPrinter.flush();
} catch (Exception e) {
LOG.error("Failed to stream CSV", e);
throw new WebApplicationException("Failed to generate CSV", e);
}
};

return Response.status(Response.Status.ACCEPTED)
.entity(stream)
.type("text/csv")
.header("Content-Disposition", "attachment; filename=\"pending_deletion_all_datanode_stats.csv\"")
.build();
}

private Response handleDataNodeMetrics(Integer limit) {
if (null != limit && limit < 1) {
return Response.status(Response.Status.BAD_REQUEST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,28 @@
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.hadoop.hdds.fs.SpaceUsageSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.api.types.DUResponse;
import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse;
import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics;
import org.apache.hadoop.ozone.recon.api.types.DatanodeStorageReport;
import org.apache.hadoop.ozone.recon.api.types.GlobalNamespaceReport;
import org.apache.hadoop.ozone.recon.api.types.GlobalStorageReport;
Expand Down Expand Up @@ -71,16 +77,19 @@ public class StorageDistributionEndpoint {
private static final Logger LOG = LoggerFactory.getLogger(StorageDistributionEndpoint.class);
private final ReconGlobalStatsManager reconGlobalStatsManager;
private final ReconGlobalMetricsService reconGlobalMetricsService;
private final DataNodeMetricsService dataNodeMetricsService;

@Inject
public StorageDistributionEndpoint(OzoneStorageContainerManager reconSCM,
NSSummaryEndpoint nsSummaryEndpoint,
ReconGlobalStatsManager reconGlobalStatsManager,
ReconGlobalMetricsService reconGlobalMetricsService) {
ReconGlobalMetricsService reconGlobalMetricsService,
DataNodeMetricsService dataNodeMetricsService) {
this.nodeManager = (ReconNodeManager) reconSCM.getScmNodeManager();
this.nsSummaryEndpoint = nsSummaryEndpoint;
this.reconGlobalStatsManager = reconGlobalStatsManager;
this.reconGlobalMetricsService = reconGlobalMetricsService;
this.dataNodeMetricsService = dataNodeMetricsService;
}

@GET
Expand Down Expand Up @@ -114,6 +123,97 @@ public Response getStorageDistribution() {
}
}

/**
* Downloads the distribution of data node metrics in a CSV file format.
* This method collects metrics related to pending deletions and various storage statistics
* for each data node. If the metrics collection is still in progress, it returns an
* intermediate response. If the collection is complete and metrics data are available,
* it generates and streams a CSV file containing detailed statistics.
*
* The CSV includes the following headers: HostName, Datanode UUID, Filesystem Capacity,
* Filesystem Used Space, Filesystem Remaining Space, Ozone Capacity, Ozone Used Space,
* Ozone Remaining Space, PreAllocated Container Space, Reserved Space, Minimum Free
* Space, and Pending Block Size.
*
* @return A Response object. Depending on the state of metrics collection, this can be:
* - An HTTP 202 (Accepted) response with a status and metrics data if the
* collection is not yet complete.
* - An HTTP 500 (Internal Server Error) if the metrics data is missing despite
* the collection status being marked as finished.
* - An HTTP 202 (Accepted) response containing a CSV file of data node metrics
* if the collection is complete and valid metrics data are available.
*/
@GET
@Path("/download")
public Response downloadDataNodeStorageDistribution() {

DataNodeMetricsServiceResponse metricsResponse =
dataNodeMetricsService.getCollectedMetrics(null);

if (metricsResponse.getStatus() != DataNodeMetricsService.MetricCollectionStatus.FINISHED) {
return Response.status(Response.Status.ACCEPTED)
.entity(metricsResponse)
.type(MediaType.APPLICATION_JSON)
.build();
}

List<DatanodePendingDeletionMetrics> pendingDeletionMetrics =
metricsResponse.getPendingDeletionPerDataNode();

if (pendingDeletionMetrics == null) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity("Metrics data is missing despite FINISHED status.")
.type(MediaType.TEXT_PLAIN)
.build();
}

Map<String, DatanodeStorageReport> reportByUuid =
collectDatanodeReports().stream()
.collect(Collectors.toMap(
DatanodeStorageReport::getDatanodeUuid,
Function.identity()));

List<DataNodeStoragePendingDeletionView> data = pendingDeletionMetrics.stream()
.map(metric -> {
return new DataNodeStoragePendingDeletionView(metric, reportByUuid.get(metric.getDatanodeUuid()));
})
.filter(Objects::nonNull)
.collect(Collectors.toList());

List<String> headers = Arrays.asList(
"HostName",
"Datanode UUID",
"Filesystem Capacity",
"Filesystem Used Space",
"Filesystem Remaining Space",
"Ozone Capacity",
"Ozone Used Space",
"Ozone Remaining Space",
"PreAllocated Container Space",
"Reserved Space",
"Minimum Free Space",
"Pending Block Size"
);

List<Function<DataNodeStoragePendingDeletionView, Object>> columns =
Arrays.asList(
v -> v.getMetric() != null ? v.getMetric().getHostName() : "Unknown",
v -> v.getMetric() != null ? v.getMetric().getDatanodeUuid() : "Unknown",
v -> v.getReport() != null ? v.getReport().getFilesystemCapacity() : -1,
v -> v.getReport() != null ? v.getReport().getFilesystemUsed() : -1,
v -> v.getReport() != null ? v.getReport().getFilesystemAvailable() : -1,
v -> v.getReport() != null ? v.getReport().getCapacity() : -1,
v -> v.getReport() != null ? v.getReport().getUsed() : -1,
v -> v.getReport() != null ? v.getReport().getRemaining() : -1,
v -> v.getReport() != null ? v.getReport().getCommitted() : -1,
v -> v.getReport() != null ? v.getReport().getReserved() : -1,
v -> v.getReport() != null ? v.getReport().getMinimumFreeSpace() : -1,
v -> v.getReport() != null ? v.getMetric().getPendingBlockSize() : -1
);

return ReconUtils.downloadCsv("datanode_storage_and_pending_deletion_stats.csv", headers, data, columns);
}

private GlobalStorageReport calculateGlobalStorageReport() {
try {
SCMNodeStat stats = nodeManager.getStats();
Expand Down Expand Up @@ -189,7 +289,7 @@ private StorageCapacityDistributionResponse buildStorageDistributionResponse(
.build();
}

private List<DatanodeStorageReport> collectDatanodeReports() {
public List<DatanodeStorageReport> collectDatanodeReports() {
return nodeManager.getAllNodes().stream()
.map(this::getStorageReport)
.filter(Objects::nonNull) // Filter out null reports
Expand Down Expand Up @@ -260,4 +360,27 @@ private DatanodeStorageReport getStorageReport(DatanodeDetails datanode) {
return null; // Return null on any error
}
}

/**
* Represents a view that combines pending deletion metrics and storage report data
* for a specific Datanode. This view is used to encapsulate both metric and storage
* details for understanding the state of a datanode in terms of storage and pending deletions.
*/
private static class DataNodeStoragePendingDeletionView {
private final DatanodePendingDeletionMetrics metric;
private final DatanodeStorageReport report;

DataNodeStoragePendingDeletionView(DatanodePendingDeletionMetrics metric, DatanodeStorageReport report) {
this.metric = metric;
this.report = report;
}

DatanodePendingDeletionMetrics getMetric() {
return metric;
}

DatanodeStorageReport getReport() {
return report;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type CapacityState = {

const Capacity: React.FC<object> = () => {
const PENDING_POLL_INTERVAL = 5 * 1000;
const DN_CSV_DOWNLOAD_URL = '/api/v1/pendingDeletion/download';
const DN_CSV_DOWNLOAD_URL = '/api/v1/storageDistribution/download';
const DN_STATUS_URL = '/api/v1/pendingDeletion?component=dn';
const DOWNLOAD_POLL_TIMEOUT_MS = 10 * 60 * 1000;

Expand Down
Loading