From 74f3f7fe398f50fe286528e51f3778be588d0c6d Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Wed, 28 Jan 2026 13:31:05 +0530 Subject: [PATCH 1/2] HDDS-14517. Adding all the storage fields to report. --- .../recon/api/PendingDeletionEndpoint.java | 53 ----- .../api/StorageDistributionEndpoint.java | 97 ++++++++- .../src/v2/pages/capacity/capacity.tsx | 2 +- .../api/TestPendingDeletionEndpoint.java | 60 ------ .../api/TestStorageDistributionEndpoint.java | 186 ++++++++++++++++++ 5 files changed, 282 insertions(+), 116 deletions(-) create mode 100644 hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java index 534c036dcf6..b1dafc6c474 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/PendingDeletionEndpoint.java @@ -17,24 +17,16 @@ package org.apache.hadoop.ozone.recon.api; -import java.io.BufferedWriter; -import java.io.OutputStreamWriter; -import java.nio.charset.StandardCharsets; import java.util.Map; import javax.inject.Inject; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; -import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; -import javax.ws.rs.core.StreamingOutput; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVPrinter; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; -import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; import org.apache.hadoop.ozone.recon.api.types.ScmPendingDeletion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,51 +81,6 @@ public Response getPendingDeletionByComponent( } } - @GET - @Path("/download") - public Response downloadPendingDeleteData() { - DataNodeMetricsServiceResponse dnMetricsResponse = dataNodeMetricsService.getCollectedMetrics(null); - - if (dnMetricsResponse.getStatus() != DataNodeMetricsService.MetricCollectionStatus.FINISHED) { - return Response.status(Response.Status.ACCEPTED) - .entity(dnMetricsResponse) - .type("application/json") - .build(); - } - - if (null == dnMetricsResponse.getPendingDeletionPerDataNode()) { - return Response.status(Response.Status.INTERNAL_SERVER_ERROR) - .entity("Metrics data is missing despite FINISHED status.") - .type("text/plain") - .build(); - } - - StreamingOutput stream = output -> { - CSVFormat format = CSVFormat.DEFAULT.builder() - .setHeader("HostName", "Datanode UUID", "Pending Block Size (bytes)").build(); - try (CSVPrinter csvPrinter = new CSVPrinter( - new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8)), format)) { - for (DatanodePendingDeletionMetrics metric : dnMetricsResponse.getPendingDeletionPerDataNode()) { - csvPrinter.printRecord( - metric.getHostName(), - metric.getDatanodeUuid(), - metric.getPendingBlockSize() - ); - } - csvPrinter.flush(); - } catch (Exception e) { - LOG.error("Failed to stream CSV", e); - throw new WebApplicationException("Failed to generate CSV", e); - } - }; - - return Response.status(Response.Status.ACCEPTED) - .entity(stream) - .type("text/csv") - .header("Content-Disposition", "attachment; filename=\"pending_deletion_all_datanode_stats.csv\"") - .build(); - } - private Response handleDataNodeMetrics(Integer limit) { if (null != limit && limit < 1) { return Response.status(Response.Status.BAD_REQUEST) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/StorageDistributionEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/StorageDistributionEndpoint.java index e5ae2550475..25ed673bfa6 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/StorageDistributionEndpoint.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/StorageDistributionEndpoint.java @@ -20,23 +20,34 @@ import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE; +import java.io.BufferedWriter; import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import java.util.stream.Collectors; import javax.inject.Inject; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; import org.apache.hadoop.hdds.fs.SpaceUsageSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; import org.apache.hadoop.ozone.recon.api.types.DUResponse; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; import org.apache.hadoop.ozone.recon.api.types.DatanodeStorageReport; import org.apache.hadoop.ozone.recon.api.types.GlobalNamespaceReport; import org.apache.hadoop.ozone.recon.api.types.GlobalStorageReport; @@ -71,16 +82,19 @@ public class StorageDistributionEndpoint { private static final Logger LOG = LoggerFactory.getLogger(StorageDistributionEndpoint.class); private final ReconGlobalStatsManager reconGlobalStatsManager; private final ReconGlobalMetricsService reconGlobalMetricsService; + private final DataNodeMetricsService dataNodeMetricsService; @Inject public StorageDistributionEndpoint(OzoneStorageContainerManager reconSCM, NSSummaryEndpoint nsSummaryEndpoint, ReconGlobalStatsManager reconGlobalStatsManager, - ReconGlobalMetricsService reconGlobalMetricsService) { + ReconGlobalMetricsService reconGlobalMetricsService, + DataNodeMetricsService dataNodeMetricsService) { this.nodeManager = (ReconNodeManager) reconSCM.getScmNodeManager(); this.nsSummaryEndpoint = nsSummaryEndpoint; this.reconGlobalStatsManager = reconGlobalStatsManager; this.reconGlobalMetricsService = reconGlobalMetricsService; + this.dataNodeMetricsService = dataNodeMetricsService; } @GET @@ -114,6 +128,85 @@ public Response getStorageDistribution() { } } + @GET + @Path("/download") + public Response downloadDataNodeDistribution() { + DataNodeMetricsServiceResponse metricsResponse = + dataNodeMetricsService.getCollectedMetrics(null); + + if (metricsResponse.getStatus() != DataNodeMetricsService.MetricCollectionStatus.FINISHED) { + return Response.status(Response.Status.ACCEPTED) + .entity(metricsResponse) + .type(MediaType.APPLICATION_JSON) + .build(); + } + + List pendingDeletionMetrics = + metricsResponse.getPendingDeletionPerDataNode(); + + if (pendingDeletionMetrics == null) { + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity("Metrics data is missing despite FINISHED status.") + .type(MediaType.TEXT_PLAIN) + .build(); + } + + Map reportByUuid = + collectDatanodeReports().stream() + .collect(Collectors.toMap( + DatanodeStorageReport::getDatanodeUuid, + Function.identity())); + + StreamingOutput stream = output -> { + CSVFormat format = CSVFormat.DEFAULT.builder() + .setHeader( + "HostName", + "Datanode UUID", + "Capacity", + "Used Space", + "Remaining Space", + "Committed Space", + "Reserved Space", + "Minimum Free Space", + "Pending Block Size") + .build(); + + try (CSVPrinter printer = new CSVPrinter( + new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8)), + format)) { + + for (DatanodePendingDeletionMetrics metric : pendingDeletionMetrics) { + DatanodeStorageReport report = reportByUuid.get(metric.getDatanodeUuid()); + if (report == null) { + continue; // skip if report is missing + } + + printer.printRecord( + metric.getHostName(), + metric.getDatanodeUuid(), + report.getCapacity(), + report.getUsed(), + report.getRemaining(), + report.getCommitted(), + report.getReserved(), + report.getMinimumFreeSpace(), + metric.getPendingBlockSize() + ); + } + printer.flush(); + } catch (Exception e) { + LOG.error("Failed to stream CSV", e); + throw new WebApplicationException("Failed to generate CSV", e); + } + }; + + return Response.status(Response.Status.ACCEPTED) + .entity(stream) + .type("text/csv") + .header("Content-Disposition", "attachment; filename=\"datanode_storage_and_pending_deletion_stats.csv\"") + .build(); + } + private GlobalStorageReport calculateGlobalStorageReport() { try { SCMNodeStat stats = nodeManager.getStats(); @@ -189,7 +282,7 @@ private StorageCapacityDistributionResponse buildStorageDistributionResponse( .build(); } - private List collectDatanodeReports() { + public List collectDatanodeReports() { return nodeManager.getAllNodes().stream() .map(this::getStorageReport) .filter(Objects::nonNull) // Filter out null reports diff --git a/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/capacity/capacity.tsx b/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/capacity/capacity.tsx index ec9ad436e59..0fb9d0baed7 100644 --- a/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/capacity/capacity.tsx +++ b/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/capacity/capacity.tsx @@ -47,7 +47,7 @@ type CapacityState = { const Capacity: React.FC = () => { const PENDING_POLL_INTERVAL = 5 * 1000; - const DN_CSV_DOWNLOAD_URL = '/api/v1/pendingDeletion/download'; + const DN_CSV_DOWNLOAD_URL = '/api/v1/storageDistribution/download'; const DN_STATUS_URL = '/api/v1/pendingDeletion?component=dn'; const DOWNLOAD_POLL_TIMEOUT_MS = 10 * 60 * 1000; diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestPendingDeletionEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestPendingDeletionEndpoint.java index 3c9af15d4dc..7951f9b2b3e 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestPendingDeletionEndpoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestPendingDeletionEndpoint.java @@ -18,20 +18,14 @@ package org.apache.hadoop.ozone.recon.api; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.io.ByteArrayOutputStream; -import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; import javax.ws.rs.core.Response; -import javax.ws.rs.core.StreamingOutput; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; @@ -212,58 +206,4 @@ public void testOmComponentReturnsPendingDeletionSizes() { assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); assertEquals(pendingSizes, response.getEntity()); } - - @Test - public void testDownloadReturnsAcceptedWhenCollectionInProgress() { - DataNodeMetricsServiceResponse metricsResponse = DataNodeMetricsServiceResponse.newBuilder() - .setStatus(DataNodeMetricsService.MetricCollectionStatus.IN_PROGRESS) - .build(); - when(dataNodeMetricsService.getCollectedMetrics(null)).thenReturn(metricsResponse); - - Response response = pendingDeletionEndpoint.downloadPendingDeleteData(); - - assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus()); - assertEquals("application/json", response.getMediaType().toString()); - assertEquals(metricsResponse, response.getEntity()); - } - - @Test - public void testDownloadReturnsServerErrorWhenMetricsMissing() { - DataNodeMetricsServiceResponse metricsResponse = DataNodeMetricsServiceResponse.newBuilder() - .setStatus(DataNodeMetricsService.MetricCollectionStatus.FINISHED) - .build(); - when(dataNodeMetricsService.getCollectedMetrics(null)).thenReturn(metricsResponse); - - Response response = pendingDeletionEndpoint.downloadPendingDeleteData(); - - assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus()); - assertEquals("Metrics data is missing despite FINISHED status.", response.getEntity()); - assertEquals("text/plain", response.getMediaType().toString()); - } - - @Test - public void testDownloadReturnsCsvWithMetrics() throws Exception { - List pendingDeletionMetrics = Arrays.asList( - new DatanodePendingDeletionMetrics("dn1", "uuid-1", 10L), - new DatanodePendingDeletionMetrics("dn2", "uuid-2", 20L)); - DataNodeMetricsServiceResponse metricsResponse = DataNodeMetricsServiceResponse.newBuilder() - .setStatus(DataNodeMetricsService.MetricCollectionStatus.FINISHED) - .setPendingDeletion(pendingDeletionMetrics) - .build(); - when(dataNodeMetricsService.getCollectedMetrics(null)).thenReturn(metricsResponse); - - Response response = pendingDeletionEndpoint.downloadPendingDeleteData(); - - assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus()); - assertEquals("text/csv", response.getMediaType().toString()); - assertEquals("attachment; filename=\"pending_deletion_all_datanode_stats.csv\"", - response.getHeaderString("Content-Disposition")); - StreamingOutput streamingOutput = assertInstanceOf(StreamingOutput.class, response.getEntity()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - streamingOutput.write(outputStream); - String csv = new String(outputStream.toByteArray(), StandardCharsets.UTF_8); - assertTrue(csv.contains("HostName,Datanode UUID,Pending Block Size (bytes)")); - assertTrue(csv.contains("dn1,uuid-1,10")); - assertTrue(csv.contains("dn2,uuid-2,20")); - } } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java new file mode 100644 index 00000000000..6786985b77f --- /dev/null +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.node.DatanodeInfo; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; +import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; +import org.apache.hadoop.ozone.recon.api.types.DatanodeStorageReport; +import org.apache.hadoop.ozone.recon.scm.ReconNodeManager; +import org.apache.hadoop.ozone.recon.spi.ReconGlobalStatsManager; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * The TestStorageDistributionEndpoint class contains unit tests for verifying + * the functionality of the {@link StorageDistributionEndpoint} class. + * + */ +public class TestStorageDistributionEndpoint { + private DataNodeMetricsService dataNodeMetricsService; + private StorageDistributionEndpoint storageDistributionEndpoint; + private ReconNodeManager nodeManager = mock(ReconNodeManager.class); + + @BeforeEach + public void setup() { + ReconGlobalMetricsService reconGlobalMetricsService = mock(ReconGlobalMetricsService.class); + dataNodeMetricsService = mock(DataNodeMetricsService.class); + NSSummaryEndpoint nssummaryEndpoint = mock(NSSummaryEndpoint.class); + OzoneStorageContainerManager reconSCM = mock(OzoneStorageContainerManager.class); + when(reconSCM.getScmNodeManager()).thenReturn(nodeManager); + ReconGlobalStatsManager reconGlobalStatsManager = mock(ReconGlobalStatsManager.class); + storageDistributionEndpoint = new StorageDistributionEndpoint(reconSCM, + nssummaryEndpoint, + reconGlobalStatsManager, + reconGlobalMetricsService, + dataNodeMetricsService); + } + + @Test + public void testDownloadReturnsAcceptedWhenCollectionInProgress() { + DataNodeMetricsServiceResponse metricsResponse = DataNodeMetricsServiceResponse.newBuilder() + .setStatus(DataNodeMetricsService.MetricCollectionStatus.IN_PROGRESS) + .build(); + when(dataNodeMetricsService.getCollectedMetrics(null)).thenReturn(metricsResponse); + Response response = storageDistributionEndpoint.downloadDataNodeDistribution(); + + assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus()); + assertEquals("application/json", response.getMediaType().toString()); + assertEquals(metricsResponse, response.getEntity()); + } + + @Test + public void testDownloadReturnsServerErrorWhenMetricsMissing() { + DataNodeMetricsServiceResponse metricsResponse = DataNodeMetricsServiceResponse.newBuilder() + .setStatus(DataNodeMetricsService.MetricCollectionStatus.FINISHED) + .build(); + when(dataNodeMetricsService.getCollectedMetrics(null)).thenReturn(metricsResponse); + Response response = storageDistributionEndpoint.downloadDataNodeDistribution(); + + assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus()); + assertEquals("Metrics data is missing despite FINISHED status.", response.getEntity()); + assertEquals("text/plain", response.getMediaType().toString()); + } + + @Test + public void testDownloadReturnsCsvWithMetrics() throws Exception { + // given + UUID uuid1 = UUID.randomUUID(); + UUID uuid2 = UUID.randomUUID(); + + String dataNode1 = "dn1"; + String dataNode2 = "dn2"; + + List pendingDeletionMetrics = Arrays.asList( + new DatanodePendingDeletionMetrics(dataNode1, uuid1.toString(), 10L), + new DatanodePendingDeletionMetrics(dataNode2, uuid2.toString(), 20L) + ); + + DataNodeMetricsServiceResponse metricsResponse = + DataNodeMetricsServiceResponse.newBuilder() + .setStatus(DataNodeMetricsService.MetricCollectionStatus.FINISHED) + .setPendingDeletion(pendingDeletionMetrics) + .build(); + + when(dataNodeMetricsService.getCollectedMetrics(null)) + .thenReturn(metricsResponse); + + mockDatanodeStorageReports(pendingDeletionMetrics); + mockNodeManagerStats(uuid1, uuid2); + + Response response = storageDistributionEndpoint.downloadDataNodeDistribution(); + + // then + assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus()); + assertEquals("text/csv", response.getMediaType().toString()); + assertEquals( + "attachment; filename=\"datanode_storage_and_pending_deletion_stats.csv\"", + response.getHeaderString("Content-Disposition") + ); + + String csv = readCsv(response); + + assertTrue(csv.contains( + "HostName,Datanode UUID,Capacity,Used Space,Remaining Space," + + "Committed Space,Reserved Space,Minimum Free Space,Pending Block Size" + )); + assertTrue(csv.contains(dataNode1 + "," + uuid1 + ",100,10,10,10,5,5,10")); + assertTrue(csv.contains(dataNode2 + "," + uuid2 + ",100,10,10,10,5,5,20")); + } + + private void mockDatanodeStorageReports( + List metrics) { + + List reports = metrics.stream() + .map(m -> DatanodeStorageReport.newBuilder() + .setDatanodeUuid(m.getDatanodeUuid()) + .setHostName(m.getHostName()) + .build()) + .collect(Collectors.toList()); + + when(storageDistributionEndpoint.collectDatanodeReports()) + .thenReturn(reports); + } + + private void mockNodeManagerStats(UUID uuid1, UUID uuid2) { + DatanodeDetails dn1 = DatanodeDetails.newBuilder() + .setUuid(uuid1) + .setHostName("dn1") + .build(); + + DatanodeDetails dn2 = DatanodeDetails.newBuilder() + .setUuid(uuid2) + .setHostName("dn3") + .build(); + + List datanodes = Arrays.asList( + new DatanodeInfo(dn1, null, null), + new DatanodeInfo(dn2, null, null) + ); + + when(nodeManager.getAllNodes()).thenReturn(datanodes); + when(nodeManager.getNodeStat(dn1)) + .thenReturn(new SCMNodeMetric(100, 10, 10, 10, 5, 5)); + when(nodeManager.getNodeStat(dn2)) + .thenReturn(new SCMNodeMetric(100, 10, 10, 10, 5, 5)); + } + + private String readCsv(Response response) throws Exception { + StreamingOutput output = assertInstanceOf(StreamingOutput.class, response.getEntity()); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + output.write(outputStream); + return new String(outputStream.toByteArray(), StandardCharsets.UTF_8); + } +} From f8281820b3afbb025330bf3134e64a3f8b25446b Mon Sep 17 00:00:00 2001 From: Priyesh Karatha Date: Tue, 17 Feb 2026 09:17:11 +0530 Subject: [PATCH 2/2] addressing review comments. --- .../apache/hadoop/ozone/recon/ReconUtils.java | 41 +++ .../api/StorageDistributionEndpoint.java | 140 ++++++---- .../api/TestStorageDistributionEndpoint.java | 258 ++++++++++++------ 3 files changed, 308 insertions(+), 131 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java index 58306f89f37..5896266d708 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java @@ -31,10 +31,13 @@ import com.google.common.base.Preconditions; import com.google.inject.Singleton; import jakarta.annotation.Nonnull; +import java.io.BufferedWriter; import java.io.File; import java.io.IOException; +import java.io.OutputStreamWriter; import java.net.HttpURLConnection; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.sql.Timestamp; import java.text.ParseException; @@ -49,8 +52,12 @@ import java.util.Set; import java.util.TimeZone; import java.util.concurrent.BlockingQueue; +import java.util.function.Function; import java.util.stream.Collectors; import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -877,4 +884,38 @@ public static long extractLongMetricValue(Map metrics, String ke } return -1; } + + public static Response downloadCsv( + String fileName, + List headers, + List data, + List> columnExtractors) { + + StreamingOutput stream = output -> { + CSVFormat format = CSVFormat.DEFAULT.builder() + .setHeader(headers.toArray(new String[0])) + .build(); + + try (CSVPrinter printer = new CSVPrinter( + new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8)), + format)) { + + for (T item : data) { + List row = new ArrayList<>(); + for (Function extractor : columnExtractors) { + row.add(extractor.apply(item)); + } + printer.printRecord(row); + } + + printer.flush(); + } + }; + + return Response.ok(stream) + .type("text/csv") + .header("Content-Disposition", + "attachment; filename=\"" + fileName + "\"") + .build(); + } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/StorageDistributionEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/StorageDistributionEndpoint.java index 25ed673bfa6..1365c31f2df 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/StorageDistributionEndpoint.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/StorageDistributionEndpoint.java @@ -20,10 +20,8 @@ import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE; -import java.io.BufferedWriter; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,17 +32,14 @@ import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; -import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import javax.ws.rs.core.StreamingOutput; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVPrinter; import org.apache.hadoop.hdds.fs.SpaceUsageSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.ozone.recon.ReconUtils; import org.apache.hadoop.ozone.recon.api.types.DUResponse; import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; @@ -128,9 +123,30 @@ public Response getStorageDistribution() { } } + /** + * Downloads the distribution of data node metrics in a CSV file format. + * This method collects metrics related to pending deletions and various storage statistics + * for each data node. If the metrics collection is still in progress, it returns an + * intermediate response. If the collection is complete and metrics data are available, + * it generates and streams a CSV file containing detailed statistics. + * + * The CSV includes the following headers: HostName, Datanode UUID, Filesystem Capacity, + * Filesystem Used Space, Filesystem Remaining Space, Ozone Capacity, Ozone Used Space, + * Ozone Remaining Space, PreAllocated Container Space, Reserved Space, Minimum Free + * Space, and Pending Block Size. + * + * @return A Response object. Depending on the state of metrics collection, this can be: + * - An HTTP 202 (Accepted) response with a status and metrics data if the + * collection is not yet complete. + * - An HTTP 500 (Internal Server Error) if the metrics data is missing despite + * the collection status being marked as finished. + * - An HTTP 202 (Accepted) response containing a CSV file of data node metrics + * if the collection is complete and valid metrics data are available. + */ @GET @Path("/download") - public Response downloadDataNodeDistribution() { + public Response downloadDataNodeStorageDistribution() { + DataNodeMetricsServiceResponse metricsResponse = dataNodeMetricsService.getCollectedMetrics(null); @@ -157,54 +173,45 @@ public Response downloadDataNodeDistribution() { DatanodeStorageReport::getDatanodeUuid, Function.identity())); - StreamingOutput stream = output -> { - CSVFormat format = CSVFormat.DEFAULT.builder() - .setHeader( - "HostName", - "Datanode UUID", - "Capacity", - "Used Space", - "Remaining Space", - "Committed Space", - "Reserved Space", - "Minimum Free Space", - "Pending Block Size") - .build(); - - try (CSVPrinter printer = new CSVPrinter( - new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8)), - format)) { - - for (DatanodePendingDeletionMetrics metric : pendingDeletionMetrics) { - DatanodeStorageReport report = reportByUuid.get(metric.getDatanodeUuid()); - if (report == null) { - continue; // skip if report is missing - } - - printer.printRecord( - metric.getHostName(), - metric.getDatanodeUuid(), - report.getCapacity(), - report.getUsed(), - report.getRemaining(), - report.getCommitted(), - report.getReserved(), - report.getMinimumFreeSpace(), - metric.getPendingBlockSize() - ); - } - printer.flush(); - } catch (Exception e) { - LOG.error("Failed to stream CSV", e); - throw new WebApplicationException("Failed to generate CSV", e); - } - }; + List data = pendingDeletionMetrics.stream() + .map(metric -> { + return new DataNodeStoragePendingDeletionView(metric, reportByUuid.get(metric.getDatanodeUuid())); + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); - return Response.status(Response.Status.ACCEPTED) - .entity(stream) - .type("text/csv") - .header("Content-Disposition", "attachment; filename=\"datanode_storage_and_pending_deletion_stats.csv\"") - .build(); + List headers = Arrays.asList( + "HostName", + "Datanode UUID", + "Filesystem Capacity", + "Filesystem Used Space", + "Filesystem Remaining Space", + "Ozone Capacity", + "Ozone Used Space", + "Ozone Remaining Space", + "PreAllocated Container Space", + "Reserved Space", + "Minimum Free Space", + "Pending Block Size" + ); + + List> columns = + Arrays.asList( + v -> v.getMetric() != null ? v.getMetric().getHostName() : "Unknown", + v -> v.getMetric() != null ? v.getMetric().getDatanodeUuid() : "Unknown", + v -> v.getReport() != null ? v.getReport().getFilesystemCapacity() : -1, + v -> v.getReport() != null ? v.getReport().getFilesystemUsed() : -1, + v -> v.getReport() != null ? v.getReport().getFilesystemAvailable() : -1, + v -> v.getReport() != null ? v.getReport().getCapacity() : -1, + v -> v.getReport() != null ? v.getReport().getUsed() : -1, + v -> v.getReport() != null ? v.getReport().getRemaining() : -1, + v -> v.getReport() != null ? v.getReport().getCommitted() : -1, + v -> v.getReport() != null ? v.getReport().getReserved() : -1, + v -> v.getReport() != null ? v.getReport().getMinimumFreeSpace() : -1, + v -> v.getReport() != null ? v.getMetric().getPendingBlockSize() : -1 + ); + + return ReconUtils.downloadCsv("datanode_storage_and_pending_deletion_stats.csv", headers, data, columns); } private GlobalStorageReport calculateGlobalStorageReport() { @@ -353,4 +360,27 @@ private DatanodeStorageReport getStorageReport(DatanodeDetails datanode) { return null; // Return null on any error } } + + /** + * Represents a view that combines pending deletion metrics and storage report data + * for a specific Datanode. This view is used to encapsulate both metric and storage + * details for understanding the state of a datanode in terms of storage and pending deletions. + */ + private static class DataNodeStoragePendingDeletionView { + private final DatanodePendingDeletionMetrics metric; + private final DatanodeStorageReport report; + + DataNodeStoragePendingDeletionView(DatanodePendingDeletionMetrics metric, DatanodeStorageReport report) { + this.metric = metric; + this.report = report; + } + + DatanodePendingDeletionMetrics getMetric() { + return metric; + } + + DatanodeStorageReport getReport() { + return report; + } + } } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java index 6786985b77f..6062ee8a1a5 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java @@ -19,27 +19,37 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; -import java.util.stream.Collectors; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; +import org.apache.hadoop.hdds.fs.SpaceUsageSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.ozone.recon.api.types.DUResponse; import org.apache.hadoop.ozone.recon.api.types.DataNodeMetricsServiceResponse; import org.apache.hadoop.ozone.recon.api.types.DatanodePendingDeletionMetrics; import org.apache.hadoop.ozone.recon.api.types.DatanodeStorageReport; +import org.apache.hadoop.ozone.recon.api.types.StorageCapacityDistributionResponse; import org.apache.hadoop.ozone.recon.scm.ReconNodeManager; import org.apache.hadoop.ozone.recon.spi.ReconGlobalStatsManager; +import org.apache.hadoop.ozone.recon.tasks.GlobalStatsValue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -49,18 +59,55 @@ * */ public class TestStorageDistributionEndpoint { + private static final int EXPECTED_GLOBAL_TOTAL_KEYS = 14; + private static final long PENDING_DELETION_SIZE = 10L; + private static final long FS_CAPACITY = 1000L; + private static final long FS_USED = 500L; + private static final long FS_AVAILABLE = 500L; + private static final long RESERVED = 100L; + private static final long MIN_FREE_SPACE = 100L; + private static final long COMMITTED = 20L; + private static final long PENDING_DIRECTORY_SIZE = 120L; + private static final long PENDING_KEY_SIZE = 30L; + private static final long OPEN_KEY_BYTES = 200L; + private static final long OPEN_MPU_KEY_BYTES = 20L; + private static final long NON_OZONE_USED_SPACE_BYTES = 100L; + private static final long EXPECTED_COMMITTED_KEY_BYTES = 300L; + private static final long GLOBAL_STAT_KEY_COUNT = 7L; + private static final long OZONE_CAPACITY = FS_CAPACITY - RESERVED; + private static final long OZONE_USED = FS_USED - NON_OZONE_USED_SPACE_BYTES; + private static final long OZONE_REMAINING = FS_AVAILABLE - MIN_FREE_SPACE; + private static final String CSV_SEPARATOR = ","; + private static final String APPLICATION_JSON = "application/json"; + private static final String TEXT_PLAIN = "text/plain"; + private static final String TEXT_CSV = "text/csv"; + private static final String CONTENT_DISPOSITION = "Content-Disposition"; + private static final String DOWNLOAD_CONTENT_DISPOSITION = + "attachment; filename=\"datanode_storage_and_pending_deletion_stats.csv\""; + private static final String METRICS_MISSING_ERROR = + "Metrics data is missing despite FINISHED status."; + private static final String ROOT_PATH = "/"; + private static final String HOSTNAME_PREFIX = "datanode-"; + private static final String PENDING_DIRECTORY_SIZE_KEY = "pendingDirectorySize"; + private static final String PENDING_KEY_SIZE_KEY = "pendingKeySize"; + private static final String TOTAL_REPLICATED_DATA_SIZE_KEY = "totalReplicatedDataSize"; + private DataNodeMetricsService dataNodeMetricsService; private StorageDistributionEndpoint storageDistributionEndpoint; - private ReconNodeManager nodeManager = mock(ReconNodeManager.class); + private ReconNodeManager nodeManager; + private ReconGlobalMetricsService reconGlobalMetricsService; + private NSSummaryEndpoint nssummaryEndpoint; + private ReconGlobalStatsManager reconGlobalStatsManager; @BeforeEach public void setup() { - ReconGlobalMetricsService reconGlobalMetricsService = mock(ReconGlobalMetricsService.class); + reconGlobalMetricsService = mock(ReconGlobalMetricsService.class); + nodeManager = mock(ReconNodeManager.class); dataNodeMetricsService = mock(DataNodeMetricsService.class); - NSSummaryEndpoint nssummaryEndpoint = mock(NSSummaryEndpoint.class); + nssummaryEndpoint = mock(NSSummaryEndpoint.class); OzoneStorageContainerManager reconSCM = mock(OzoneStorageContainerManager.class); when(reconSCM.getScmNodeManager()).thenReturn(nodeManager); - ReconGlobalStatsManager reconGlobalStatsManager = mock(ReconGlobalStatsManager.class); + reconGlobalStatsManager = mock(ReconGlobalStatsManager.class); storageDistributionEndpoint = new StorageDistributionEndpoint(reconSCM, nssummaryEndpoint, reconGlobalStatsManager, @@ -68,16 +115,51 @@ public void setup() { dataNodeMetricsService); } + @Test + public void testStorageDistributionApiReturnsSuccess() throws Exception { + mockStorageDistributionData(3); + Response response = storageDistributionEndpoint.getStorageDistribution(); + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + StorageCapacityDistributionResponse distributionResponse = assertInstanceOf( + StorageCapacityDistributionResponse.class, response.getEntity()); + + long totalNameSpace = PENDING_KEY_SIZE + PENDING_DIRECTORY_SIZE + OPEN_KEY_BYTES + + OPEN_MPU_KEY_BYTES + EXPECTED_COMMITTED_KEY_BYTES; + + assertEquals(OZONE_USED * 3, distributionResponse.getGlobalStorage().getTotalUsedSpace()); + assertEquals(OZONE_REMAINING * 3, distributionResponse.getGlobalStorage().getTotalFreeSpace()); + assertEquals(OZONE_CAPACITY * 3, distributionResponse.getGlobalStorage().getTotalCapacity()); + assertEquals(totalNameSpace, distributionResponse.getGlobalNamespace().getTotalUsedSpace()); + assertEquals(EXPECTED_GLOBAL_TOTAL_KEYS, distributionResponse.getGlobalNamespace().getTotalKeys()); + assertEquals(OPEN_KEY_BYTES + OPEN_MPU_KEY_BYTES, + distributionResponse.getUsedSpaceBreakDown().getOpenKeyBytes()); + assertEquals(EXPECTED_COMMITTED_KEY_BYTES, + distributionResponse.getUsedSpaceBreakDown().getCommittedKeyBytes()); + assertEquals(COMMITTED * 3, + distributionResponse.getUsedSpaceBreakDown().getPreAllocatedContainerBytes()); + for (int i = 0; i < 3; i++) { + DatanodeStorageReport report = distributionResponse.getDataNodeUsage().get(i); + assertEquals(OZONE_CAPACITY, report.getCapacity()); + assertEquals(OZONE_USED, report.getUsed()); + assertEquals(OZONE_REMAINING, report.getRemaining()); + assertEquals(COMMITTED, report.getCommitted()); + assertEquals(RESERVED, report.getReserved()); + assertEquals(MIN_FREE_SPACE, report.getMinimumFreeSpace()); + assertEquals(report.getHostName(), HOSTNAME_PREFIX + i); + assertNotNull(report.getDatanodeUuid()); + } + } + @Test public void testDownloadReturnsAcceptedWhenCollectionInProgress() { DataNodeMetricsServiceResponse metricsResponse = DataNodeMetricsServiceResponse.newBuilder() .setStatus(DataNodeMetricsService.MetricCollectionStatus.IN_PROGRESS) .build(); when(dataNodeMetricsService.getCollectedMetrics(null)).thenReturn(metricsResponse); - Response response = storageDistributionEndpoint.downloadDataNodeDistribution(); + Response response = storageDistributionEndpoint.downloadDataNodeStorageDistribution(); assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus()); - assertEquals("application/json", response.getMediaType().toString()); + assertEquals(APPLICATION_JSON, response.getMediaType().toString()); assertEquals(metricsResponse, response.getEntity()); } @@ -87,94 +169,118 @@ public void testDownloadReturnsServerErrorWhenMetricsMissing() { .setStatus(DataNodeMetricsService.MetricCollectionStatus.FINISHED) .build(); when(dataNodeMetricsService.getCollectedMetrics(null)).thenReturn(metricsResponse); - Response response = storageDistributionEndpoint.downloadDataNodeDistribution(); + Response response = storageDistributionEndpoint.downloadDataNodeStorageDistribution(); assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus()); - assertEquals("Metrics data is missing despite FINISHED status.", response.getEntity()); - assertEquals("text/plain", response.getMediaType().toString()); + assertEquals(METRICS_MISSING_ERROR, response.getEntity()); + assertEquals(TEXT_PLAIN, response.getMediaType().toString()); } @Test public void testDownloadReturnsCsvWithMetrics() throws Exception { - // given - UUID uuid1 = UUID.randomUUID(); - UUID uuid2 = UUID.randomUUID(); - - String dataNode1 = "dn1"; - String dataNode2 = "dn2"; - List pendingDeletionMetrics = Arrays.asList( - new DatanodePendingDeletionMetrics(dataNode1, uuid1.toString(), 10L), - new DatanodePendingDeletionMetrics(dataNode2, uuid2.toString(), 20L) - ); + List csvRows = mockStorageDistributionData(3); + Response response = storageDistributionEndpoint.downloadDataNodeStorageDistribution(); - DataNodeMetricsServiceResponse metricsResponse = - DataNodeMetricsServiceResponse.newBuilder() - .setStatus(DataNodeMetricsService.MetricCollectionStatus.FINISHED) - .setPendingDeletion(pendingDeletionMetrics) - .build(); - - when(dataNodeMetricsService.getCollectedMetrics(null)) - .thenReturn(metricsResponse); - - mockDatanodeStorageReports(pendingDeletionMetrics); - mockNodeManagerStats(uuid1, uuid2); + // then + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + assertEquals(TEXT_CSV, response.getMediaType().toString()); + assertEquals(DOWNLOAD_CONTENT_DISPOSITION, response.getHeaderString(CONTENT_DISPOSITION)); + String csv = readCsv(response); + for (String row : csvRows) { + assertTrue(csv.contains(row)); + } + } - Response response = storageDistributionEndpoint.downloadDataNodeDistribution(); + private List mockStorageDistributionData(int numNodes) throws Exception { + List csvRows = new ArrayList<>(); + List headers = Arrays.asList( + "HostName", + "Datanode UUID", + "Filesystem Capacity", + "Filesystem Used Space", + "Filesystem Remaining Space", + "Ozone Capacity", + "Ozone Used Space", + "Ozone Remaining Space", + "PreAllocated Container Space", + "Reserved Space", + "Minimum Free Space", + "Pending Block Size"); + csvRows.add(String.join(",", headers)); - // then - assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus()); - assertEquals("text/csv", response.getMediaType().toString()); - assertEquals( - "attachment; filename=\"datanode_storage_and_pending_deletion_stats.csv\"", - response.getHeaderString("Content-Disposition") - ); + List pendingDeletionMetrics = new ArrayList<>(); + List dataNodes = new ArrayList<>(); - String csv = readCsv(response); + for (int i = 0; i < numNodes; i++) { + UUID uuid = UUID.randomUUID(); + String hostName = HOSTNAME_PREFIX + i; + DatanodeDetails datanode = DatanodeDetails.newBuilder() + .setUuid(uuid) + .setHostName(hostName) + .build(); + pendingDeletionMetrics.add(new DatanodePendingDeletionMetrics(hostName, + uuid.toString(), PENDING_DELETION_SIZE)); + dataNodes.add(new DatanodeInfo(datanode, null, null)); + when(nodeManager.getNodeStat(datanode)) + .thenReturn(new SCMNodeMetric(OZONE_CAPACITY, OZONE_USED, OZONE_REMAINING, COMMITTED, + MIN_FREE_SPACE, RESERVED)); + when(nodeManager.getTotalFilesystemUsage(datanode)) + .thenReturn(new SpaceUsageSource.Fixed(FS_CAPACITY, FS_AVAILABLE, FS_USED)); - assertTrue(csv.contains( - "HostName,Datanode UUID,Capacity,Used Space,Remaining Space," + - "Committed Space,Reserved Space,Minimum Free Space,Pending Block Size" - )); - assertTrue(csv.contains(dataNode1 + "," + uuid1 + ",100,10,10,10,5,5,10")); - assertTrue(csv.contains(dataNode2 + "," + uuid2 + ",100,10,10,10,5,5,20")); - } + csvRows.add(String.join(CSV_SEPARATOR, + Arrays.asList(hostName, + uuid.toString(), + String.valueOf(FS_CAPACITY), + String.valueOf(FS_USED), + String.valueOf(FS_AVAILABLE), + String.valueOf(OZONE_CAPACITY), + String.valueOf(OZONE_USED), + String.valueOf(OZONE_REMAINING), + String.valueOf(COMMITTED), + String.valueOf(RESERVED), + String.valueOf(MIN_FREE_SPACE), + String.valueOf(PENDING_DELETION_SIZE)))); - private void mockDatanodeStorageReports( - List metrics) { + } + when(nodeManager.getAllNodes()).thenReturn(dataNodes); + when(nodeManager.getStats()) + .thenReturn(new SCMNodeStat( + OZONE_CAPACITY * numNodes, + OZONE_USED * numNodes, + OZONE_REMAINING * numNodes, + COMMITTED * numNodes, + MIN_FREE_SPACE * numNodes, + RESERVED * numNodes)); - List reports = metrics.stream() - .map(m -> DatanodeStorageReport.newBuilder() - .setDatanodeUuid(m.getDatanodeUuid()) - .setHostName(m.getHostName()) - .build()) - .collect(Collectors.toList()); - when(storageDistributionEndpoint.collectDatanodeReports()) - .thenReturn(reports); - } + Map pendingSizes = new HashMap<>(); + pendingSizes.put(PENDING_DIRECTORY_SIZE_KEY, PENDING_DIRECTORY_SIZE); + pendingSizes.put(PENDING_KEY_SIZE_KEY, PENDING_KEY_SIZE); - private void mockNodeManagerStats(UUID uuid1, UUID uuid2) { - DatanodeDetails dn1 = DatanodeDetails.newBuilder() - .setUuid(uuid1) - .setHostName("dn1") - .build(); + when(reconGlobalMetricsService.calculatePendingSizes()) + .thenReturn(pendingSizes); - DatanodeDetails dn2 = DatanodeDetails.newBuilder() - .setUuid(uuid2) - .setHostName("dn3") - .build(); + when(reconGlobalMetricsService.getOpenKeySummary()) + .thenReturn(Collections.singletonMap(TOTAL_REPLICATED_DATA_SIZE_KEY, OPEN_KEY_BYTES)); + when(reconGlobalMetricsService.getMPUKeySummary()) + .thenReturn(Collections.singletonMap(TOTAL_REPLICATED_DATA_SIZE_KEY, OPEN_MPU_KEY_BYTES)); - List datanodes = Arrays.asList( - new DatanodeInfo(dn1, null, null), - new DatanodeInfo(dn2, null, null) - ); + DUResponse duResponse = new DUResponse(); + duResponse.setSizeWithReplica(EXPECTED_COMMITTED_KEY_BYTES); + when(nssummaryEndpoint.getDiskUsage(ROOT_PATH, false, true, false)) + .thenReturn(Response.ok(duResponse).build()); + when(reconGlobalStatsManager.getGlobalStatsValue(anyString())) + .thenReturn(new GlobalStatsValue(GLOBAL_STAT_KEY_COUNT)); - when(nodeManager.getAllNodes()).thenReturn(datanodes); - when(nodeManager.getNodeStat(dn1)) - .thenReturn(new SCMNodeMetric(100, 10, 10, 10, 5, 5)); - when(nodeManager.getNodeStat(dn2)) - .thenReturn(new SCMNodeMetric(100, 10, 10, 10, 5, 5)); + DataNodeMetricsServiceResponse metricsResponse = + DataNodeMetricsServiceResponse.newBuilder() + .setStatus(DataNodeMetricsService.MetricCollectionStatus.FINISHED) + .setPendingDeletion(pendingDeletionMetrics) + .build(); + when(dataNodeMetricsService.getCollectedMetrics(null)) + .thenReturn(metricsResponse); + return csvRows; } private String readCsv(Response response) throws Exception {