Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17658. HDFS decommissioning does not consider if Under Construction blocks are sufficiently replicated which causes HDFS Data Loss #7179

Open
wants to merge 5 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1744,6 +1744,39 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean
DFS_DATANODE_LOCKMANAGER_TRACE_DEFAULT = false;

/**
* Determines whether the NameNode tracks under-construction blocks
* when decommissioning DataNodes.
*
* A DataNode should only enter decommissioned state if all blocks on
* the DataNode are sufficiently replicated to other live DataNodes.
*
* When this setting is disabled, the NameNode does not consider
* under-construction blocks in determining if a DataNode can be
* decommissioned. This can result in scenarios where DataNodes
* enter decommissioned state before their blocks are sufficiently
* replicated to other live DataNodes.
*
* This situation can lead to HDFS write failures and data loss if
* all the DataNodes in the block write pipeline are decommissioned
* and terminated at around the same time.
*
* Enable this setting to have the NameNode track and consider
* under-construction blocks when identifying if a DataNode can
* be decommissioned.
*/
public static final String DFS_DECOMMISSION_TRACK_UNDER_CONSTRUCTION_BLOCKS =
"dfs.namenode.decommission.track.underconstructionblocks";
/**
* "dfs.namenode.decommission.track.underconstructionblocks" feature is
* disabled by default. Enabling this feature will benefit HDFS clusters
* with datanode decommissioning operations and HDFS blocks held
* open for extended periods of time. These HDFS clusters will see
* reduction in HDFS write failures and HDFS data loss.
*/
public static final boolean DFS_DECOMMISSION_TRACK_UNDER_CONSTRUCTION_BLOCKS_DEFAULT =
false;

// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ public class BlockManager implements BlockStatsMXBean {

private final PendingDataNodeMessages pendingDNMessages =
new PendingDataNodeMessages();
private final UnderConstructionBlocks ucBlocks;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this UnderConstruction or UnderReplication ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I am definitely open to changing the terminology here.

I don't know if UnderReplication is the correct term though, I think this may be more applicable to a block which needs to be replicated asynchronously by the Namenode to meet replication factor (i.e. due to datanode decommissioning or datanode failures).

From the Namenode logs we can see the block replica state is reported as RBW (replica being written) \ RECEIVING_BLOCK:

2024-12-05 15:51:09,399 DEBUG blockmanagement.BlockManager: Reported block blk_1073741825_1001 on 172.31.93.89:9866 size 268435456 replicaState = RBW
2024-12-05 15:51:09,399 DEBUG BlockStateChange: BLOCK* block RECEIVING_BLOCK: blk_1073741825_1001 is received from 172.31.93.89:9866

I took the term UnderConstruction from here in the code:

However, upon further inspection it seems this term is related to a BlockCollection (as opposed to a block replica):

Let me know your thoughts. I can refactor UnderConstructionBlocks to something like RbwBlocks or ReplicaBeingWrittenBlocks if this is a more accurate term

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am seeing that "UnderConstruction" terminology is already used in the code for both blocks & block replicas:

Therefore, I do think that the name UnderConstructionBlocks aligns with existing terminology already used in the code base

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, appreciate you taking time to refer to related code and reply in detail.


private volatile long pendingReconstructionBlocksCount = 0L;
private volatile long corruptReplicaBlocksCount = 0L;
Expand All @@ -206,6 +207,25 @@ public class BlockManager implements BlockStatsMXBean {
private final BlockReportLeaseManager blockReportLeaseManager;
private ObjectName mxBeanName;

/**
* When an HDFS client notifies the Namenode a block is closed/committed,
* the block is added to the StorageInfos and therefore needs to be
* removed from UnderConstructionBlocks.
*
* @param dn - datanode storing the Under Construction block.
* @param block - Under Construction block to stop tracking for the datanode.
*/
public void removeUcBlock(DatanodeDescriptor dn, Block block) {
ucBlocks.removeUcBlock(dn, block);
}

/**
* @return - the data structure for tracking Under Construction block replicas.
*/
public UnderConstructionBlocks getUnderConstructionBlocks() {
return ucBlocks;
}

/** Used by metrics */
public long getPendingReconstructionBlocksCount() {
return pendingReconstructionBlocksCount;
Expand Down Expand Up @@ -609,6 +629,9 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT,
DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT));

// Create in-memory data structure to track Under Construction blocks.
this.ucBlocks = new UnderConstructionBlocks(conf);

printInitialConfigs();
}

Expand Down Expand Up @@ -1817,6 +1840,8 @@ void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
}
// Remove all pending DN messages referencing this DN.
pendingDNMessages.removeAllMessagesForDatanode(node);
// Remove all Under Construction blocks on this datanode
ucBlocks.removeAllUcBlocksForDatanode(node);

node.resetBlocks();
invalidateBlocks.remove(node);
Expand Down Expand Up @@ -1954,6 +1979,9 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
// Add replica to the data-node if it is not already there
if (storageInfo != null) {
storageInfo.addBlock(b.getStored(), b.getCorrupted());
// Once the block is added to StorageInfos, it no longer needs to be
// tracked in UnderConstructionBlocks data structure.
ucBlocks.removeUcBlock(storageInfo.getDatanodeDescriptor(), b.getCorrupted());
}

// Add this replica to corruptReplicas Map. For striped blocks, we always
Expand Down Expand Up @@ -3703,11 +3731,15 @@ void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
block.getUnderConstructionFeature().addReplicaIfNotPresent(
storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);

// Add replica if appropriate. If the replica was previously corrupt
// but now okay, it might need to be updated.
if (ucBlock.reportedState == ReplicaState.FINALIZED && (
if (ucBlock.reportedState == ReplicaState.RBW) {
// Non-finalized Under Construction blocks are not added to the StorageInfos.
// They are instead tracked via UnderConstructionBlocks data structure.
ucBlocks.addUcBlock(storageInfo.getDatanodeDescriptor(), ucBlock.reportedBlock);
} else if (ucBlock.reportedState == ReplicaState.FINALIZED && (
block.findStorageInfo(storageInfo) < 0) || corruptReplicas
.isReplicaCorrupt(block, storageInfo.getDatanodeDescriptor())) {
// Add replica if appropriate. If the replica was previously corrupt
// but now okay, it might need to be updated.
addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true);
}
}
Expand Down Expand Up @@ -3735,6 +3767,9 @@ private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,

// just add it
AddBlockResult result = storageInfo.addBlock(storedBlock, reported);
// Once the block is added to StorageInfos, it no longer needs to be
// tracked in UnderConstructionBlocks data structure.
ucBlocks.removeUcBlock(storageInfo.getDatanodeDescriptor(), reported);

// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
Expand Down Expand Up @@ -3781,6 +3816,9 @@ private Block addStoredBlock(final BlockInfo block,

// add block to the datanode
AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock);
// Once the block is added to StorageInfos, it no longer needs to be
// tracked in UnderConstructionBlocks data structure.
ucBlocks.removeUcBlock(storageInfo.getDatanodeDescriptor(), reportedBlock);

int curReplicaDelta;
if (result == AddBlockResult.ADDED) {
Expand Down Expand Up @@ -4430,6 +4468,9 @@ public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node);
assert (namesystem.hasWriteLock());
{
// Once the block is removed from the datanode, it no longer needs to be
// tracked in UnderConstructionBlocks data structure.
ucBlocks.removeUcBlock(node, storedBlock);
if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
blockLog.debug("BLOCK* removeStoredBlock: {} has already been removed from node {}",
storedBlock, node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;

import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.INode;
Expand All @@ -27,6 +28,8 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.Map;
Expand Down Expand Up @@ -166,6 +169,8 @@ public void run() {
"decommissioning/maintenance checks.");
return;
}
blockManager.getUnderConstructionBlocks()
.logWarningForLongUnderConstructionBlocks();
// Reset the checked count at beginning of each iteration
numBlocksChecked = 0;
// Check decommission or maintenance progress.
Expand Down Expand Up @@ -362,38 +367,53 @@ private void processCompletedNodes(List<DatanodeDescriptor> toRemove) {
}
namesystem.writeLock();
try {
// Prepare a lookup map to determine if a datanode
// has any blocks Under Construction.
final Map<DatanodeDescriptor, List<Block>> ucBlocksByDatanode =
blockManager.getUnderConstructionBlocks().getUnderConstructionBlocksByDatanode();
for (DatanodeDescriptor dn : toRemove) {
final boolean isHealthy =
blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
if (isHealthy) {
if (dn.isDecommissionInProgress()) {
dnAdmin.setDecommissioned(dn);
outOfServiceNodeBlocks.remove(dn);
pendingRep.remove(dn);
} else if (dn.isEnteringMaintenance()) {
// IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
// to track maintenance expiration.
dnAdmin.setInMaintenance(dn);
pendingRep.remove(dn);
} else if (dn.isInService()) {
// Decom / maint was cancelled and the node is yet to be processed
// from cancelledNodes
LOG.info("Node {} completed decommission and maintenance " +
"but has been moved back to in service", dn);
pendingRep.remove(dn);
outOfServiceNodeBlocks.remove(dn);
continue;
// Perform a final check to see if the datanode has any blocks in
// Under Construction state (blocks which are open for write)
if (ucBlocksByDatanode.containsKey(dn)) {
final List<Block> ucBlocks =
ucBlocksByDatanode.getOrDefault(dn, Collections.emptyList());
final String ucBlocksString =
ucBlocks.stream().map(Object::toString).collect(Collectors.joining(","));
LOG.info("Cannot decommission datanode {} with {} UC blocks: [{}]",
dn, ucBlocks.size(), ucBlocksString);
} else {
// Should not happen
LOG.error("Node {} is in an unexpected state {} and has been "+
"removed from tracking for decommission or maintenance",
dn, dn.getAdminState());
pendingRep.remove(dn);
outOfServiceNodeBlocks.remove(dn);
continue;
if (dn.isDecommissionInProgress()) {
dnAdmin.setDecommissioned(dn);
outOfServiceNodeBlocks.remove(dn);
pendingRep.remove(dn);
} else if (dn.isEnteringMaintenance()) {
// IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
// to track maintenance expiration.
dnAdmin.setInMaintenance(dn);
pendingRep.remove(dn);
} else if (dn.isInService()) {
// Decom / maint was cancelled and the node is yet to be processed
// from cancelledNodes
LOG.info("Node {} completed decommission and maintenance " +
"but has been moved back to in service", dn);
pendingRep.remove(dn);
outOfServiceNodeBlocks.remove(dn);
continue;
} else {
// Should not happen
LOG.error("Node {} is in an unexpected state {} and has been "+
"removed from tracking for decommission or maintenance",
dn, dn.getAdminState());
pendingRep.remove(dn);
outOfServiceNodeBlocks.remove(dn);
continue;
}
LOG.info("Node {} is sufficiently replicated and healthy, "
+ "marked as {}.", dn, dn.getAdminState());
}
LOG.info("Node {} is sufficiently replicated and healthy, "
+ "marked as {}.", dn, dn.getAdminState());
} else {
LOG.info("Node {} isn't healthy."
+ " It needs to replicate {} more blocks."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
Expand All @@ -27,16 +29,17 @@
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.util.ChunkedArrayList;
import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.AbstractList;
import java.util.Collections;
import java.util.TreeMap;
import java.util.ArrayList;
import java.util.Map;
import java.util.List;
import java.util.Iterator;
import java.util.stream.Collectors;

/**
* Checks to see if datanodes have finished DECOMMISSION_INPROGRESS or
Expand Down Expand Up @@ -177,6 +180,8 @@ public void run() {
"decommissioning/maintenance checks.");
return;
}
blockManager.getUnderConstructionBlocks()
.logWarningForLongUnderConstructionBlocks();
// Reset the checked count at beginning of each iteration
numBlocksChecked = 0;
numBlocksCheckedPerLock = 0;
Expand Down Expand Up @@ -235,6 +240,11 @@ private void check() {
final List<DatanodeDescriptor> unhealthyDns = new ArrayList<>();
boolean isValidState = true;

// Prepare a lookup map to determine if a datanode
// has any blocks Under Construction.
final Map<DatanodeDescriptor, List<Block>> ucBlocksByDatanode =
blockManager.getUnderConstructionBlocks()
.getUnderConstructionBlocksByDatanode();
while (it.hasNext() && !exceededNumBlocksPerCheck() && namesystem
.isRunning()) {
numNodesChecked++;
Expand Down Expand Up @@ -290,22 +300,33 @@ private void check() {
// If the full scan is clean AND the node liveness is okay,
// we can finally mark as DECOMMISSIONED or IN_MAINTENANCE.
if (blocks.size() == 0 && isHealthy) {
if (dn.isDecommissionInProgress()) {
dnAdmin.setDecommissioned(dn);
toRemove.add(dn);
} else if (dn.isEnteringMaintenance()) {
// IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
// to track maintenance expiration.
dnAdmin.setInMaintenance(dn);
// Perform a final check to see if the datanode has any blocks in
// Under Construction state (blocks which are open for write)
if (ucBlocksByDatanode.containsKey(dn)) {
final List<Block> ucBlocks =
ucBlocksByDatanode.getOrDefault(dn, Collections.emptyList());
final String ucBlocksString =
ucBlocks.stream().map(Object::toString).collect(Collectors.joining(","));
LOG.info("Cannot decommission datanode {} with {} UC blocks: [{}]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this info log become noisy if the client takes time to close the stream ?

Copy link
Contributor Author

@KevinWikant KevinWikant Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log will be printed at most once per datanode per DatanodeAdminDefaultMonitor cycle. This means in a 1k datanode HDFS cluster, there could be up to 1k log lines printed every 30 seconds (if all the 1k datanodes have under construction blocks).

This behaviour matches existing behaviour where if a decommissioning datanode has under replicated blocks, then 1 log line will be printed every single DatanodeAdminDefaultMonitor cycle (for that datanode):

dn, ucBlocks.size(), ucBlocksString);
} else {
isValidState = false;
Preconditions.checkState(false,
"Node %s is in an invalid state! "
+ "Invalid state: %s %s blocks are on this dn.",
dn, dn.getAdminState(), blocks.size());
if (dn.isDecommissionInProgress()) {
dnAdmin.setDecommissioned(dn);
toRemove.add(dn);
} else if (dn.isEnteringMaintenance()) {
// IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
// to track maintenance expiration.
dnAdmin.setInMaintenance(dn);
} else {
isValidState = false;
Preconditions.checkState(false,
"Node %s is in an invalid state! "
+ "Invalid state: %s %s blocks are on this dn.",
dn, dn.getAdminState(), blocks.size());
}
LOG.debug("Node {} is sufficiently replicated and healthy, "
+ "marked as {}.", dn, dn.getAdminState());
}
LOG.debug("Node {} is sufficiently replicated and healthy, "
+ "marked as {}.", dn, dn.getAdminState());
} else {
LOG.info("Node {} {} healthy."
+ " It needs to replicate {} more blocks."
Expand Down
Loading
Loading