diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerHealthState.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerHealthState.java index 56dd1c5620b6..ac9a82d190d1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerHealthState.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerHealthState.java @@ -107,6 +107,13 @@ public enum ContainerHealthState { "Containers in OPEN state without any healthy Pipeline", "OpenContainersWithoutPipeline"), + /** + * Acknowledge missing containers which are not problematic. + */ + ACK_MISSING((short) 10, + "Acknowledge missing containers which are not problematic", + "AcknowledgeMissingContainers"), + // ========== Actual Combinations Found in Code (100+) ========== /** diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java index 2beef2abf885..40d82bfa6a48 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java @@ -123,6 +123,10 @@ public static ContainerInfo fromProtobuf(HddsProtos.ContainerInfoProto info) { .setReplicationConfig(config) .setSequenceId(info.getSequenceId()); + if (info.hasAckMissing() && info.getAckMissing()) { + builder.setHealthState(ContainerHealthState.ACK_MISSING); + } + if (info.hasPipelineID()) { builder.setPipelineID(PipelineID.getFromProtobuf(info.getPipelineID())); } @@ -291,6 +295,11 @@ public HddsProtos.ContainerInfoProto getProtobuf() { builder.setPipelineID(getPipelineID().getProtobuf()); } + // Only persist ACK_MISSING health state, others are dynamic + if (healthState == ContainerHealthState.ACK_MISSING) { + builder.setAckMissing(true); + } + return builder.build(); } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerHealthState.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerHealthState.java index 6ffc678ea175..763a2e5a9a21 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerHealthState.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerHealthState.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.container; +import static org.apache.hadoop.hdds.scm.container.ContainerHealthState.ACK_MISSING; import static org.apache.hadoop.hdds.scm.container.ContainerHealthState.EMPTY; import static org.apache.hadoop.hdds.scm.container.ContainerHealthState.HEALTHY; import static org.apache.hadoop.hdds.scm.container.ContainerHealthState.MISSING; @@ -37,6 +38,9 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import org.junit.jupiter.api.Test; /** @@ -59,6 +63,7 @@ public void testIndividualStateValues() { assertEquals(7, OPEN_UNHEALTHY.getValue()); assertEquals(8, QUASI_CLOSED_STUCK.getValue()); assertEquals(9, OPEN_WITHOUT_PIPELINE.getValue()); + assertEquals(10, ACK_MISSING.getValue()); } @Test @@ -69,7 +74,7 @@ public void testCombinationStateValues() { assertEquals(102, MISSING_UNDER_REPLICATED.getValue()); assertEquals(103, QUASI_CLOSED_STUCK_UNDER_REPLICATED.getValue()); assertEquals(104, QUASI_CLOSED_STUCK_OVER_REPLICATED.getValue()); - assertEquals(105, ContainerHealthState.QUASI_CLOSED_STUCK_MISSING.getValue()); + assertEquals(105, QUASI_CLOSED_STUCK_MISSING.getValue()); } @Test @@ -101,6 +106,7 @@ public void testFromValueIndividualStates() { assertEquals(OPEN_UNHEALTHY, ContainerHealthState.fromValue((short) 7)); assertEquals(QUASI_CLOSED_STUCK, ContainerHealthState.fromValue((short) 8)); assertEquals(OPEN_WITHOUT_PIPELINE, ContainerHealthState.fromValue((short) 9)); + assertEquals(ACK_MISSING, ContainerHealthState.fromValue((short) 10)); } @Test @@ -126,7 +132,7 @@ public void testFromValueUnknownReturnsHealthy() { @Test public void testAllEnumValuesAreUnique() { // Verify all enum constants have unique values - java.util.Set values = new java.util.HashSet<>(); + Set values = new HashSet<>(); for (ContainerHealthState state : ContainerHealthState.values()) { assertFalse(values.contains(state.getValue()), "Duplicate value found: " + state.getValue()); @@ -137,16 +143,16 @@ public void testAllEnumValuesAreUnique() { @Test public void testIndividualStateCount() { // Should have 10 individual states (0-9) - long individualCount = java.util.Arrays.stream(ContainerHealthState.values()) + long individualCount = Arrays.stream(ContainerHealthState.values()) .filter(s -> s.getValue() >= 0 && s.getValue() <= 99) .count(); - assertEquals(10, individualCount, "Expected 10 individual states"); + assertEquals(11, individualCount, "Expected 10 individual states"); } @Test public void testCombinationStateCount() { // Should have 6 combination states (100-105) - long combinationCount = java.util.Arrays.stream(ContainerHealthState.values()) + long combinationCount = Arrays.stream(ContainerHealthState.values()) .filter(s -> s.getValue() >= 100) .count(); assertEquals(6, combinationCount, "Expected 6 combination states"); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index 892dd4de1ff8..ea2b5b2848c9 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -463,4 +463,12 @@ DecommissionScmResponseProto decommissionScm( */ void reconcileContainer(long containerID) throws IOException; + /** + * Set or unset the ACK_MISSING state for a container. + * + * @param containerId The ID of the container. + * @param acknowledge true to set ACK_MISSING, false to unset to MISSING. + * @throws IOException + */ + void setAckMissingContainer(long containerId, boolean acknowledge) throws IOException; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index 92ddfa7eb8dc..83a9e70c572a 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -511,4 +511,13 @@ DecommissionScmResponseProto decommissionScm( * @throws IOException On error */ void reconcileContainer(long containerID) throws IOException; + + /** + * Set or unset the ACK_MISSING state for a container. + * + * @param containerId The ID of the container. + * @param acknowledge true to set ACK_MISSING, false to unset to MISSING. + * @throws IOException + */ + void setAckMissingContainer(long containerId, boolean acknowledge) throws IOException; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 94b2230e68ba..98795e746fe5 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -115,6 +115,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationRequest; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationRequest.Builder; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SetAckMissingContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SingleNodeQueryRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SingleNodeQueryResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerRequestProto; @@ -1240,4 +1241,14 @@ public void reconcileContainer(long containerID) throws IOException { // TODO check error handling. submitRequest(Type.ReconcileContainer, builder -> builder.setReconcileContainerRequest(request)); } + + @Override + public void setAckMissingContainer(long containerID, boolean acknowledge) + throws IOException { + SetAckMissingContainerRequestProto request = SetAckMissingContainerRequestProto.newBuilder() + .setContainerID(containerID) + .setAcknowledge(acknowledge) + .build(); + submitRequest(Type.SetAckMissingContainer, builder -> builder.setSetAckMissingContainerRequest(request)); + } } diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto index f80a50a3be97..6cb9be66df22 100644 --- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto +++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto @@ -87,6 +87,7 @@ message ScmContainerLocationRequest { optional ContainerBalancerStatusInfoRequestProto containerBalancerStatusInfoRequest = 48; optional ReconcileContainerRequestProto reconcileContainerRequest = 49; optional GetDeletedBlocksTxnSummaryRequestProto getDeletedBlocksTxnSummaryRequest = 50; + optional SetAckMissingContainerRequestProto setAckMissingContainerRequest = 51; } message ScmContainerLocationResponse { @@ -145,6 +146,7 @@ message ScmContainerLocationResponse { optional ContainerBalancerStatusInfoResponseProto containerBalancerStatusInfoResponse = 48; optional ReconcileContainerResponseProto reconcileContainerResponse = 49; optional GetDeletedBlocksTxnSummaryResponseProto getDeletedBlocksTxnSummaryResponse = 50; + optional SetAckMissingContainerResponseProto setAckMissingContainerResponse = 51; enum Status { OK = 1; @@ -202,6 +204,7 @@ enum Type { GetContainerBalancerStatusInfo = 44; ReconcileContainer = 45; GetDeletedBlocksTransactionSummary = 46; + SetAckMissingContainer = 47; } /** @@ -695,6 +698,14 @@ message ReconcileContainerRequestProto { message ReconcileContainerResponseProto { } +message SetAckMissingContainerRequestProto { + optional int64 containerID = 1; + optional bool acknowledge = 2; +} + +message SetAckMissingContainerResponseProto { +} + /** * Protocol used from an HDFS node to StorageContainerManager. See the request * and response messages for details of the RPC calls. diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index eb819b80a3e8..b95569e3bac2 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -271,6 +271,7 @@ message ContainerInfoProto { optional ReplicationFactor replicationFactor = 10; required ReplicationType replicationType = 11; optional ECReplicationConfig ecReplicationConfig = 12; + optional bool ackMissing = 13; } message ContainerWithPipeline { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java index 370c219ac601..295f2e152814 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; @@ -222,4 +223,14 @@ void deleteContainer(ContainerID containerID) * @return containerStateManger */ ContainerStateManager getContainerStateManager(); + + /** + * Update container info in the container manager. + * This is used for updating container metadata like ackMissing flag. + * + * @param containerInfo Updated container info proto + * @throws IOException + */ + void updateContainerInfo(ContainerID containerID, ContainerInfoProto containerInfo) + throws IOException; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index dc701a0be661..64ac028609a1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -294,6 +294,21 @@ public void updateContainerState(final ContainerID cid, } } + @Override + public void updateContainerInfo(final ContainerID cid, ContainerInfoProto containerInfo) + throws IOException { + lock.lock(); + try { + if (containerExist(cid)) { + containerStateManager.updateContainerInfo(containerInfo); + } else { + throw new ContainerNotFoundException(cid); + } + } finally { + lock.unlock(); + } + } + @Override public void transitionDeletingOrDeletedToClosedState(ContainerID containerID) throws IOException { HddsProtos.ContainerID proto = containerID.getProtobuf(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index f5a2334b7cd2..4ce28fa40fb2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -210,4 +210,14 @@ void removeContainer(HddsProtos.ContainerID containerInfo) */ void reinitialize(Table containerStore) throws IOException; + + /** + * Update container info. + * + * @param containerInfo Updated container info proto + * @throws IOException + */ + @Replicate + void updateContainerInfo(HddsProtos.ContainerInfoProto containerInfo) + throws IOException; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java index d971b19c406c..e7c216b8f422 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java @@ -551,6 +551,34 @@ public void reinitialize( } } + @Override + public void updateContainerInfo(HddsProtos.ContainerInfoProto updatedInfoProto) + throws IOException { + ContainerInfo updatedInfo = ContainerInfo.fromProtobuf(updatedInfoProto); + ContainerID containerID = updatedInfo.containerID(); + + try (AutoCloseableLock ignored = writeLock(containerID)) { + final ContainerInfo currentInfo = containers.getContainerInfo(containerID); + if (currentInfo == null) { + throw new ContainerNotFoundException(containerID); + } + + // Only persist ACK_MISSING health state changes + // Other health states are dynamic and computed by ReplicationManager + ContainerHealthState newHealthState = updatedInfo.getHealthState(); + if (newHealthState == ContainerHealthState.ACK_MISSING) { + currentInfo.setHealthState(ContainerHealthState.ACK_MISSING); + LOG.debug("Persisting ACK_MISSING state for container: {}", containerID); + } else if (currentInfo.getHealthState() == ContainerHealthState.ACK_MISSING) { + currentInfo.setHealthState(null); + LOG.debug("Clearing ACK_MISSING state for container: {}, new state: {}", + containerID, newHealthState); + } + transactionBuffer.addToBuffer(containerStore, containerID, currentInfo); + LOG.debug("Updated container info for container: {}, healthState={}", containerID, currentInfo.getHealthState()); + } + } + private AutoCloseableLock readLock() { return AutoCloseableLock.acquire(lock.readLock()); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index 83d3825b66c0..d1d317108c89 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerHealthState; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; @@ -854,6 +855,14 @@ protected boolean processContainer(ContainerInfo containerInfo, ReplicationQueue repQueue, ReplicationManagerReport report, boolean readOnly) throws ContainerNotFoundException { synchronized (containerInfo) { + // Skip containers that are acknowledged as missing + // These containers are persisted with ACK_MISSING state and should not be + // processed by ReplicationManager until unacknowledged + if (containerInfo.getHealthState() == ContainerHealthState.ACK_MISSING) { + LOG.debug("Skipping ACK_MISSING container: {}", containerInfo.getContainerID()); + return false; + } + // Reset health state to HEALTHY before processing this container report.resetContainerHealthState(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosingContainerHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosingContainerHandler.java index a09f5079ffe5..2f1e5bdf897d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosingContainerHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosingContainerHandler.java @@ -68,8 +68,8 @@ public boolean handle(ContainerCheckRequest request) { boolean forceClose = containerInfo.getReplicationConfig() .getReplicationType() != ReplicationType.RATIS; - // TODO - review this logic - may need an empty check here - if (request.getContainerReplicas().isEmpty()) { + // Report MISSING only for containers with no replicas and keys > 0 + if (request.getContainerReplicas().isEmpty() && containerInfo.getNumberOfKeys() > 0) { request.getReport().incrementAndSample(ContainerHealthState.MISSING, containerInfo); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index 62f06079bf0b..e9eca075a183 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -122,6 +122,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationRequest; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse.Status; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SetAckMissingContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SetAckMissingContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SingleNodeQueryRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SingleNodeQueryResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerRequestProto; @@ -748,6 +750,13 @@ public ScmContainerLocationResponse processRequest( .setStatus(Status.OK) .setReconcileContainerResponse(reconcileContainer(request.getReconcileContainerRequest())) .build(); + case SetAckMissingContainer: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setSetAckMissingContainerResponse( + setAckMissingContainer(request.getSetAckMissingContainerRequest())) + .build(); default: throw new IllegalArgumentException( "Unknown command type: " + request.getCmdType()); @@ -1387,4 +1396,9 @@ public ReconcileContainerResponseProto reconcileContainer(ReconcileContainerRequ return ReconcileContainerResponseProto.getDefaultInstance(); } + public SetAckMissingContainerResponseProto setAckMissingContainer( + SetAckMissingContainerRequestProto request) throws IOException { + impl.setAckMissingContainer(request.getContainerID(), request.getAcknowledge()); + return SetAckMissingContainerResponseProto.getDefaultInstance(); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 5726d3449a7d..bc25344d9786 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hdds.scm.DatanodeAdminError; import org.apache.hadoop.hdds.scm.FetchMetrics; import org.apache.hadoop.hdds.scm.ScmInfo; +import org.apache.hadoop.hdds.scm.container.ContainerHealthState; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerListResult; @@ -1680,4 +1681,41 @@ public void reconcileContainer(long longContainerID) throws IOException { throw ex; } } + + @Override + public void setAckMissingContainer(long longContainerID, boolean acknowledge) throws IOException { + ContainerID containerID = ContainerID.valueOf(longContainerID); + final Map auditMap = new HashMap<>(); + auditMap.put("containerID", containerID.toString()); + auditMap.put("acknowledge", String.valueOf(acknowledge)); + + try { + getScm().checkAdminAccess(getRemoteUser(), false); + ContainerInfo containerInfo = scm.getContainerManager().getContainer(containerID); + + if (acknowledge) { + // Validation for setting ACK_MISSING + Set replicas = scm.getContainerManager().getContainerReplicas(containerID); + if (replicas != null && !replicas.isEmpty()) { + throw new IOException("Container " + longContainerID + " has " + replicas.size() + + " replicas and cannot be acknowledged as missing"); + } + if (containerInfo.getNumberOfKeys() == 0) { + throw new IOException("Container " + longContainerID + " is empty (0 keys) and cannot be acknowledged."); + } + // Set to ACK_MISSING + containerInfo.setHealthState(ContainerHealthState.ACK_MISSING); + AUDIT.logWriteSuccess(buildAuditMessageForSuccess(SCMAction.ACKNOWLEDGE_MISSING_CONTAINER, auditMap)); + } else { + containerInfo.setHealthState(null); + AUDIT.logWriteSuccess(buildAuditMessageForSuccess(SCMAction.UNACKNOWLEDGE_MISSING_CONTAINER, auditMap)); + } + scm.getContainerManager().updateContainerInfo(containerID, containerInfo.getProtobuf()); + } catch (IOException ex) { + SCMAction action = acknowledge ? + SCMAction.ACKNOWLEDGE_MISSING_CONTAINER : SCMAction.UNACKNOWLEDGE_MISSING_CONTAINER; + AUDIT.logWriteFailure(buildAuditMessageForFailure(action, auditMap, ex)); + throw ex; + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java index 52cd943c4dbb..e3f0be24a4f6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java @@ -68,7 +68,9 @@ public enum SCMAction implements AuditAction { QUERY_NODE, GET_PIPELINE, RECONCILE_CONTAINER, - GET_DELETED_BLOCK_SUMMARY; + GET_DELETED_BLOCK_SUMMARY, + ACKNOWLEDGE_MISSING_CONTAINER, + UNACKNOWLEDGE_MISSING_CONTAINER; @Override public String getAction() { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestClosingContainerHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestClosingContainerHandler.java index dca89171f0e3..d75b2d982514 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestClosingContainerHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestClosingContainerHandler.java @@ -213,7 +213,7 @@ public void testClosingContainerStateIsNotUpdatedWhenThereAreReplicas() { @Test public void testClosingContainerStateIsUpdatedWhenThereAreNotReplicas() { ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo( - RATIS_REPLICATION_CONFIG, 1, CLOSING); + RATIS_REPLICATION_CONFIG, 1, CLOSING, 1, 10); Set containerReplicas = new HashSet<>(); ReplicationManagerReport report = new ReplicationManagerReport(rmConf.getContainerSampleLimit()); ContainerCheckRequest request = new ContainerCheckRequest.Builder() diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java index 61c0f4150c34..0774fb62df31 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java @@ -598,4 +598,9 @@ public String getMetrics(String query) throws IOException { public void reconcileContainer(long id) throws IOException { storageContainerLocationClient.reconcileContainer(id); } + + @Override + public void setAckMissingContainer(long containerId, boolean acknowledge) throws IOException { + storageContainerLocationClient.setAckMissingContainer(containerId, acknowledge); + } } diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/container/AckMissingSubcommand.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/container/AckMissingSubcommand.java new file mode 100644 index 000000000000..49cb86b9c9e3 --- /dev/null +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/container/AckMissingSubcommand.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.cli.container; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.scm.cli.ScmSubcommand; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import org.apache.hadoop.hdds.scm.container.ContainerHealthState; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerListResult; +import picocli.CommandLine; + +/** + * Acknowledge missing container(s) to suppress them from Replication Manager Report. + */ +@CommandLine.Command( + name = "ack", + description = "Acknowledge missing container(s) to suppress them from reports", + mixinStandardHelpOptions = true, + versionProvider = HddsVersionProvider.class) +public class AckMissingSubcommand extends ScmSubcommand { + + @CommandLine.Parameters(description = "Container IDs to acknowledge (comma-separated)", + arity = "0..1") + private String containers; + + @CommandLine.Option(names = {"--list"}, + description = "List all acknowledged missing containers") + private boolean list; + + @Override + public void execute(ScmClient scmClient) throws IOException { + if (list) { + // List acknowledged containers + ContainerListResult result = scmClient.listContainer(1, Integer.MAX_VALUE); + for (ContainerInfo info : result.getContainerInfoList()) { + if (info.getHealthState() == ContainerHealthState.ACK_MISSING) { + out().println(info.getContainerID()); + } + } + } else if (containers != null && !containers.isEmpty()) { + // Acknowledge containers + Set ids = parseContainerIds(containers); + for (Long id : ids) { + try { + int replicaCount = scmClient.getContainerReplicas(id).size(); + if (replicaCount > 0) { + err().println("Cannot acknowledge container " + id + ": has " + replicaCount + " replica(s). " + + "Only containers with 0 replicas can be acknowledged as missing."); + continue; + } + + ContainerInfo containerInfo = scmClient.getContainer(id); + if (containerInfo.getNumberOfKeys() == 0) { + err().println("Cannot acknowledge container " + id + ": container is empty (0 keys). " + + "Empty containers are auto-deleted and don't need acknowledgement."); + continue; + } + + scmClient.setAckMissingContainer(id, true); + out().println("Acknowledged container: " + id); + } catch (IOException e) { + err().println("Failed to acknowledge container " + id + ": " + e.getMessage()); + } + } + } else { + throw new IllegalArgumentException( + "Either provide container IDs or use --list option"); + } + } + + private Set parseContainerIds(String input) { + return Arrays.stream(input.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .map(Long::parseLong) + .collect(Collectors.toSet()); + } +} diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ContainerCommands.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ContainerCommands.java index b340c4077f44..aafe88415951 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ContainerCommands.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ContainerCommands.java @@ -37,7 +37,9 @@ CloseSubcommand.class, ReportSubcommand.class, UpgradeSubcommand.class, - ReconcileSubcommand.class + ReconcileSubcommand.class, + AckMissingSubcommand.class, + UnackMissingSubcommand.class }) @MetaInfServices(AdminSubcommand.class) public class ContainerCommands implements AdminSubcommand { diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/container/UnackMissingSubcommand.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/container/UnackMissingSubcommand.java new file mode 100644 index 000000000000..bafaedcdb184 --- /dev/null +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/container/UnackMissingSubcommand.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.cli.container; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.scm.cli.ScmSubcommand; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import org.apache.hadoop.hdds.scm.container.ContainerHealthState; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import picocli.CommandLine; + +/** + * Unacknowledge missing container(s) to report them again. + */ +@CommandLine.Command( + name = "unack", + description = "Unacknowledge missing container(s) to report them again", + mixinStandardHelpOptions = true, + versionProvider = HddsVersionProvider.class) +public class UnackMissingSubcommand extends ScmSubcommand { + + @CommandLine.Parameters(description = "Container IDs to unacknowledge (comma-separated)") + private String containers; + + @Override + public void execute(ScmClient scmClient) throws IOException { + if (containers == null || containers.isEmpty()) { + throw new IllegalArgumentException( + "Container IDs must be provided"); + } + + Set ids = parseContainerIds(containers); + for (Long id : ids) { + try { + ContainerInfo containerInfo = scmClient.getContainer(id); + if (containerInfo.getHealthState() != ContainerHealthState.ACK_MISSING) { + err().println("Cannot unacknowledge container " + id + ": " + + "Only acknowledged missing containers can be unacknowledged."); + continue; + } + scmClient.setAckMissingContainer(id, false); + out().println("Unacknowledged container: " + id); + } catch (IOException e) { + err().println("Failed to unacknowledge container " + id + ": " + e.getMessage()); + } + } + } + + private Set parseContainerIds(String input) { + return Arrays.stream(input.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .map(Long::parseLong) + .collect(Collectors.toSet()); + } +} diff --git a/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestAckMissingContainerSubcommand.java b/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestAckMissingContainerSubcommand.java new file mode 100644 index 000000000000..dc75850a58a4 --- /dev/null +++ b/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestAckMissingContainerSubcommand.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.cli.container; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import org.apache.hadoop.hdds.scm.container.ContainerHealthState; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerListResult; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import picocli.CommandLine; + +/** + * Tests for AckMissingContainerSubcommand. + */ +public class TestAckMissingContainerSubcommand { + + private ScmClient scmClient; + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private final ByteArrayOutputStream errContent = new ByteArrayOutputStream(); + private static final String DEFAULT_ENCODING = StandardCharsets.UTF_8.name(); + + @BeforeEach + public void setup() throws IOException { + scmClient = mock(ScmClient.class); + + System.setOut(new PrintStream(outContent, false, DEFAULT_ENCODING)); + System.setErr(new PrintStream(errContent, false, DEFAULT_ENCODING)); + } + + @Test + public void testAckMissingContainer() throws Exception { + ContainerInfo container = mockContainer(1, false); + when(scmClient.getContainer(1L)).thenReturn(container); + + AckMissingSubcommand cmd = new AckMissingSubcommand(); + new CommandLine(cmd).parseArgs("1"); + cmd.execute(scmClient); + verify(scmClient, times(1)).setAckMissingContainer(1L, true); + + String output = outContent.toString(DEFAULT_ENCODING); + assertThat(output).contains("Acknowledged container: 1"); + } + + @Test + public void testListAcknowledgedContainers() throws Exception { + ContainerInfo container1 = mockContainer(1, true); + ContainerInfo container2 = mockContainer(2, false); + + List allContainers = Arrays.asList(container1, container2); + ContainerListResult result = new ContainerListResult(allContainers, 2); + when(scmClient.listContainer(anyLong(), anyInt())).thenReturn(result); + + AckMissingSubcommand cmd = new AckMissingSubcommand(); + new CommandLine(cmd).parseArgs("--list"); + cmd.execute(scmClient); + + String output = outContent.toString(DEFAULT_ENCODING); + assertThat(output).contains("1"); + assertThat(output).doesNotContain("2"); + } + + @Test + public void testUnacknowledgeMissingContainer() throws Exception { + ContainerInfo container = mockContainer(1, true); + when(scmClient.getContainer(1L)).thenReturn(container); + + UnackMissingSubcommand cmd = new UnackMissingSubcommand(); + new CommandLine(cmd).parseArgs("1"); + cmd.execute(scmClient); + verify(scmClient, times(1)).setAckMissingContainer(1L, false); + + String output = outContent.toString(DEFAULT_ENCODING); + assertThat(output).contains("Unacknowledged container: 1"); + } + + private ContainerInfo mockContainer(long containerID, boolean ackMissing) { + ContainerHealthState healthState = ackMissing ? + ContainerHealthState.ACK_MISSING : ContainerHealthState.MISSING; + return new ContainerInfo.Builder() + .setContainerID(containerID) + .setState(OPEN) + .setHealthState(healthState) + .setReplicationConfig(RatisReplicationConfig.getInstance(ONE)) + .setNumberOfKeys(1) + .build(); + } +}