Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ public enum ContainerHealthState {
"Containers in OPEN state without any healthy Pipeline",
"OpenContainersWithoutPipeline"),

/**
* Acknowledge missing containers which are not problematic.
*/
ACK_MISSING((short) 10,
"Acknowledge missing containers which are not problematic",
"AcknowledgeMissingContainers"),

// ========== Actual Combinations Found in Code (100+) ==========

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ public static ContainerInfo fromProtobuf(HddsProtos.ContainerInfoProto info) {
.setReplicationConfig(config)
.setSequenceId(info.getSequenceId());

if (info.hasAckMissing() && info.getAckMissing()) {
builder.setHealthState(ContainerHealthState.ACK_MISSING);
}

if (info.hasPipelineID()) {
builder.setPipelineID(PipelineID.getFromProtobuf(info.getPipelineID()));
}
Expand Down Expand Up @@ -291,6 +295,11 @@ public HddsProtos.ContainerInfoProto getProtobuf() {
builder.setPipelineID(getPipelineID().getProtobuf());
}

// Only persist ACK_MISSING health state, others are dynamic
if (healthState == ContainerHealthState.ACK_MISSING) {
builder.setAckMissing(true);
}

return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.hdds.scm.container;

import static org.apache.hadoop.hdds.scm.container.ContainerHealthState.ACK_MISSING;
import static org.apache.hadoop.hdds.scm.container.ContainerHealthState.EMPTY;
import static org.apache.hadoop.hdds.scm.container.ContainerHealthState.HEALTHY;
import static org.apache.hadoop.hdds.scm.container.ContainerHealthState.MISSING;
Expand All @@ -37,6 +38,9 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.junit.jupiter.api.Test;

/**
Expand All @@ -59,6 +63,7 @@ public void testIndividualStateValues() {
assertEquals(7, OPEN_UNHEALTHY.getValue());
assertEquals(8, QUASI_CLOSED_STUCK.getValue());
assertEquals(9, OPEN_WITHOUT_PIPELINE.getValue());
assertEquals(10, ACK_MISSING.getValue());
}

@Test
Expand All @@ -69,7 +74,7 @@ public void testCombinationStateValues() {
assertEquals(102, MISSING_UNDER_REPLICATED.getValue());
assertEquals(103, QUASI_CLOSED_STUCK_UNDER_REPLICATED.getValue());
assertEquals(104, QUASI_CLOSED_STUCK_OVER_REPLICATED.getValue());
assertEquals(105, ContainerHealthState.QUASI_CLOSED_STUCK_MISSING.getValue());
assertEquals(105, QUASI_CLOSED_STUCK_MISSING.getValue());
}

@Test
Expand Down Expand Up @@ -101,6 +106,7 @@ public void testFromValueIndividualStates() {
assertEquals(OPEN_UNHEALTHY, ContainerHealthState.fromValue((short) 7));
assertEquals(QUASI_CLOSED_STUCK, ContainerHealthState.fromValue((short) 8));
assertEquals(OPEN_WITHOUT_PIPELINE, ContainerHealthState.fromValue((short) 9));
assertEquals(ACK_MISSING, ContainerHealthState.fromValue((short) 10));
}

@Test
Expand All @@ -126,7 +132,7 @@ public void testFromValueUnknownReturnsHealthy() {
@Test
public void testAllEnumValuesAreUnique() {
// Verify all enum constants have unique values
java.util.Set<Short> values = new java.util.HashSet<>();
Set<Short> values = new HashSet<>();
for (ContainerHealthState state : ContainerHealthState.values()) {
assertFalse(values.contains(state.getValue()),
"Duplicate value found: " + state.getValue());
Expand All @@ -137,16 +143,16 @@ public void testAllEnumValuesAreUnique() {
@Test
public void testIndividualStateCount() {
// Should have 10 individual states (0-9)
long individualCount = java.util.Arrays.stream(ContainerHealthState.values())
long individualCount = Arrays.stream(ContainerHealthState.values())
.filter(s -> s.getValue() >= 0 && s.getValue() <= 99)
.count();
assertEquals(10, individualCount, "Expected 10 individual states");
assertEquals(11, individualCount, "Expected 10 individual states");
}

@Test
public void testCombinationStateCount() {
// Should have 6 combination states (100-105)
long combinationCount = java.util.Arrays.stream(ContainerHealthState.values())
long combinationCount = Arrays.stream(ContainerHealthState.values())
.filter(s -> s.getValue() >= 100)
.count();
assertEquals(6, combinationCount, "Expected 6 combination states");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,4 +463,12 @@ DecommissionScmResponseProto decommissionScm(
*/
void reconcileContainer(long containerID) throws IOException;

/**
* Set or unset the ACK_MISSING state for a container.
*
* @param containerId The ID of the container.
* @param acknowledge true to set ACK_MISSING, false to unset to MISSING.
* @throws IOException
*/
void setAckMissingContainer(long containerId, boolean acknowledge) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -511,4 +511,13 @@ DecommissionScmResponseProto decommissionScm(
* @throws IOException On error
*/
void reconcileContainer(long containerID) throws IOException;

/**
* Set or unset the ACK_MISSING state for a container.
*
* @param containerId The ID of the container.
* @param acknowledge true to set ACK_MISSING, false to unset to MISSING.
* @throws IOException
*/
void setAckMissingContainer(long containerId, boolean acknowledge) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationRequest;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationRequest.Builder;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SetAckMissingContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SingleNodeQueryRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SingleNodeQueryResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerRequestProto;
Expand Down Expand Up @@ -1240,4 +1241,14 @@ public void reconcileContainer(long containerID) throws IOException {
// TODO check error handling.
submitRequest(Type.ReconcileContainer, builder -> builder.setReconcileContainerRequest(request));
}

@Override
public void setAckMissingContainer(long containerID, boolean acknowledge)
throws IOException {
SetAckMissingContainerRequestProto request = SetAckMissingContainerRequestProto.newBuilder()
.setContainerID(containerID)
.setAcknowledge(acknowledge)
.build();
submitRequest(Type.SetAckMissingContainer, builder -> builder.setSetAckMissingContainerRequest(request));
}
}
11 changes: 11 additions & 0 deletions hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ message ScmContainerLocationRequest {
optional ContainerBalancerStatusInfoRequestProto containerBalancerStatusInfoRequest = 48;
optional ReconcileContainerRequestProto reconcileContainerRequest = 49;
optional GetDeletedBlocksTxnSummaryRequestProto getDeletedBlocksTxnSummaryRequest = 50;
optional SetAckMissingContainerRequestProto setAckMissingContainerRequest = 51;
}

message ScmContainerLocationResponse {
Expand Down Expand Up @@ -145,6 +146,7 @@ message ScmContainerLocationResponse {
optional ContainerBalancerStatusInfoResponseProto containerBalancerStatusInfoResponse = 48;
optional ReconcileContainerResponseProto reconcileContainerResponse = 49;
optional GetDeletedBlocksTxnSummaryResponseProto getDeletedBlocksTxnSummaryResponse = 50;
optional SetAckMissingContainerResponseProto setAckMissingContainerResponse = 51;

enum Status {
OK = 1;
Expand Down Expand Up @@ -202,6 +204,7 @@ enum Type {
GetContainerBalancerStatusInfo = 44;
ReconcileContainer = 45;
GetDeletedBlocksTransactionSummary = 46;
SetAckMissingContainer = 47;
}

/**
Expand Down Expand Up @@ -695,6 +698,14 @@ message ReconcileContainerRequestProto {
message ReconcileContainerResponseProto {
}

message SetAckMissingContainerRequestProto {
optional int64 containerID = 1;
optional bool acknowledge = 2;
}

message SetAckMissingContainerResponseProto {
}

/**
* Protocol used from an HDFS node to StorageContainerManager. See the request
* and response messages for details of the RPC calls.
Expand Down
1 change: 1 addition & 0 deletions hadoop-hdds/interface-client/src/main/proto/hdds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ message ContainerInfoProto {
optional ReplicationFactor replicationFactor = 10;
required ReplicationType replicationType = 11;
optional ECReplicationConfig ecReplicationConfig = 12;
optional bool ackMissing = 13;
}

message ContainerWithPipeline {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
Expand Down Expand Up @@ -222,4 +223,14 @@ void deleteContainer(ContainerID containerID)
* @return containerStateManger
*/
ContainerStateManager getContainerStateManager();

/**
* Update container info in the container manager.
* This is used for updating container metadata like ackMissing flag.
*
* @param containerInfo Updated container info proto
* @throws IOException
*/
void updateContainerInfo(ContainerID containerID, ContainerInfoProto containerInfo)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,21 @@ public void updateContainerState(final ContainerID cid,
}
}

@Override
public void updateContainerInfo(final ContainerID cid, ContainerInfoProto containerInfo)
throws IOException {
lock.lock();
try {
if (containerExist(cid)) {
containerStateManager.updateContainerInfo(containerInfo);
} else {
throw new ContainerNotFoundException(cid);
}
} finally {
lock.unlock();
}
}

@Override
public void transitionDeletingOrDeletedToClosedState(ContainerID containerID) throws IOException {
HddsProtos.ContainerID proto = containerID.getProtobuf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,14 @@ void removeContainer(HddsProtos.ContainerID containerInfo)
*/
void reinitialize(Table<ContainerID, ContainerInfo> containerStore)
throws IOException;

/**
* Update container info.
*
* @param containerInfo Updated container info proto
* @throws IOException
*/
@Replicate
void updateContainerInfo(HddsProtos.ContainerInfoProto containerInfo)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,34 @@ public void reinitialize(
}
}

@Override
public void updateContainerInfo(HddsProtos.ContainerInfoProto updatedInfoProto)
throws IOException {
ContainerInfo updatedInfo = ContainerInfo.fromProtobuf(updatedInfoProto);
ContainerID containerID = updatedInfo.containerID();

try (AutoCloseableLock ignored = writeLock(containerID)) {
final ContainerInfo currentInfo = containers.getContainerInfo(containerID);
if (currentInfo == null) {
throw new ContainerNotFoundException(containerID);
}

// Only persist ACK_MISSING health state changes
// Other health states are dynamic and computed by ReplicationManager
ContainerHealthState newHealthState = updatedInfo.getHealthState();
if (newHealthState == ContainerHealthState.ACK_MISSING) {
currentInfo.setHealthState(ContainerHealthState.ACK_MISSING);
LOG.debug("Persisting ACK_MISSING state for container: {}", containerID);
} else if (currentInfo.getHealthState() == ContainerHealthState.ACK_MISSING) {
currentInfo.setHealthState(null);
LOG.debug("Clearing ACK_MISSING state for container: {}, new state: {}",
containerID, newHealthState);
}
transactionBuffer.addToBuffer(containerStore, containerID, currentInfo);
LOG.debug("Updated container info for container: {}, healthState={}", containerID, currentInfo.getHealthState());
}
}

private AutoCloseableLock readLock() {
return AutoCloseableLock.acquire(lock.readLock());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerHealthState;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
Expand Down Expand Up @@ -854,6 +855,14 @@ protected boolean processContainer(ContainerInfo containerInfo,
ReplicationQueue repQueue, ReplicationManagerReport report,
boolean readOnly) throws ContainerNotFoundException {
synchronized (containerInfo) {
// Skip containers that are acknowledged as missing
// These containers are persisted with ACK_MISSING state and should not be
// processed by ReplicationManager until unacknowledged
if (containerInfo.getHealthState() == ContainerHealthState.ACK_MISSING) {
LOG.debug("Skipping ACK_MISSING container: {}", containerInfo.getContainerID());
return false;
}

// Reset health state to HEALTHY before processing this container
report.resetContainerHealthState();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public boolean handle(ContainerCheckRequest request) {
boolean forceClose = containerInfo.getReplicationConfig()
.getReplicationType() != ReplicationType.RATIS;

// TODO - review this logic - may need an empty check here
if (request.getContainerReplicas().isEmpty()) {
// Report MISSING only for containers with no replicas and keys > 0
if (request.getContainerReplicas().isEmpty() && containerInfo.getNumberOfKeys() > 0) {
request.getReport().incrementAndSample(ContainerHealthState.MISSING, containerInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationRequest;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse.Status;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SetAckMissingContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SetAckMissingContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SingleNodeQueryRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SingleNodeQueryResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerRequestProto;
Expand Down Expand Up @@ -748,6 +750,13 @@ public ScmContainerLocationResponse processRequest(
.setStatus(Status.OK)
.setReconcileContainerResponse(reconcileContainer(request.getReconcileContainerRequest()))
.build();
case SetAckMissingContainer:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setSetAckMissingContainerResponse(
setAckMissingContainer(request.getSetAckMissingContainerRequest()))
.build();
default:
throw new IllegalArgumentException(
"Unknown command type: " + request.getCmdType());
Expand Down Expand Up @@ -1387,4 +1396,9 @@ public ReconcileContainerResponseProto reconcileContainer(ReconcileContainerRequ
return ReconcileContainerResponseProto.getDefaultInstance();
}

public SetAckMissingContainerResponseProto setAckMissingContainer(
SetAckMissingContainerRequestProto request) throws IOException {
impl.setAckMissingContainer(request.getContainerID(), request.getAcknowledge());
return SetAckMissingContainerResponseProto.getDefaultInstance();
}
}
Loading