Skip to content

Commit

Permalink
[IOTDB-4366] Fix Ratis 1 replica migrate error; terminate the RegionM…
Browse files Browse the repository at this point in the history
…igrationProcedure process when datanode report failure result (apache#7269)
  • Loading branch information
Beyyes authored Sep 15, 2022
1 parent 856c31f commit 4bebeee
Show file tree
Hide file tree
Showing 14 changed files with 258 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
if (typeNum >= ConfigPhysicalPlanType.values().length) {
throw new IOException("unrecognized log type " + typeNum);
}

ConfigPhysicalPlanType type = ConfigPhysicalPlanType.values()[typeNum];
ConfigPhysicalPlan req;
switch (type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,8 +691,8 @@ private TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req) {
}

@Override
public TSStatus addConsensusGroup(List<TConfigNodeLocation> configNodeLocations) {
consensusManager.addConsensusGroup(configNodeLocations);
public TSStatus createPeerForConsensusGroup(List<TConfigNodeLocation> configNodeLocations) {
consensusManager.createPeerForConsensusGroup(configNodeLocations);
return StatusUtils.OK;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@
public class ConsensusManager {

private static final Logger LOGGER = LoggerFactory.getLogger(ConsensusManager.class);
private static final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();

private final IManager configManager;

private ConsensusGroupId consensusGroupId;
private IConsensus consensusImpl;
private final int seedConfigNodeId = 0;

public ConsensusManager(IManager configManager, PartitionRegionStateMachine stateMachine)
throws IOException {
Expand All @@ -72,57 +73,58 @@ public void close() throws IOException {
/** ConsensusLayer local implementation */
private void setConsensusLayer(PartitionRegionStateMachine stateMachine) throws IOException {
// There is only one ConfigNodeGroup
consensusGroupId = new PartitionRegionId(conf.getPartitionRegionId());
consensusGroupId = new PartitionRegionId(CONF.getPartitionRegionId());

// Implement local ConsensusLayer by ConfigNodeConfig
consensusImpl =
ConsensusFactory.getConsensusImpl(
conf.getConfigNodeConsensusProtocolClass(),
CONF.getConfigNodeConsensusProtocolClass(),
ConsensusConfig.newBuilder()
.setThisNode(new TEndPoint(conf.getInternalAddress(), conf.getConsensusPort()))
.setStorageDir(conf.getConsensusDir())
.setThisNode(new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort()))
.setStorageDir(CONF.getConsensusDir())
.build(),
gid -> stateMachine)
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
ConsensusFactory.CONSTRUCT_FAILED_MSG,
conf.getConfigNodeConsensusProtocolClass())));
CONF.getConfigNodeConsensusProtocolClass())));
consensusImpl.start();

if (SystemPropertiesUtils.isRestarted()) {
try {
// Create ConsensusGroup from confignode-system.properties file when restart
// TODO: Check and notify if current ConfigNode's ip or port has changed
addConsensusGroup(SystemPropertiesUtils.loadConfigNodeList());
createPeerForConsensusGroup(SystemPropertiesUtils.loadConfigNodeList());
} catch (BadNodeUrlException e) {
throw new IOException(e);
}
} else if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
// Create ConsensusGroup that contains only itself
// if the current ConfigNode is Seed-ConfigNode
addConsensusGroup(
createPeerForConsensusGroup(
Collections.singletonList(
new TConfigNodeLocation(
0,
new TEndPoint(conf.getInternalAddress(), conf.getInternalPort()),
new TEndPoint(conf.getInternalAddress(), conf.getConsensusPort()))));
seedConfigNodeId,
new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort()))));
}
}

/**
* Add the current ConfigNode to the ConsensusGroup
* Create peer in new node to build consensus group
*
* @param configNodeLocations All registered ConfigNodes
*/
public void addConsensusGroup(List<TConfigNodeLocation> configNodeLocations) {
public void createPeerForConsensusGroup(List<TConfigNodeLocation> configNodeLocations) {
if (configNodeLocations.size() == 0) {
LOGGER.warn("configNodeLocations is null, addConsensusGroup failed.");
LOGGER.warn("configNodeLocations is empty, createPeerForConsensusGroup failed.");
return;
}

LOGGER.info("Set ConfigNode consensus group {}...", configNodeLocations);
LOGGER.info("createPeerForConsensusGroup {}...", configNodeLocations);

List<Peer> peerList = new ArrayList<>();
for (TConfigNodeLocation configNodeLocation : configNodeLocations) {
peerList.add(new Peer(consensusGroupId, configNodeLocation.getConsensusEndPoint()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,11 @@ TDataPartitionTableResp getOrCreateDataPartition(
TSStatus registerConfigNode(TConfigNodeRegisterReq req);

/**
* Add Consensus Group in new node.
* Create peer in new node to build consensus group.
*
* @return status
*/
TSStatus addConsensusGroup(List<TConfigNodeLocation> configNodeLocations);
TSStatus createPeerForConsensusGroup(List<TConfigNodeLocation> configNodeLocations);

/**
* Remove ConfigNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan) {
* DATANODE_NOT_EXIST when some datanode not exist.
*/
public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
LOGGER.info("Node manager start to remove DataNode {}", removeDataNodePlan);
LOGGER.info("NodeManager start to remove DataNode {}", removeDataNodePlan);

DataNodeRemoveHandler dataNodeRemoveHandler =
new DataNodeRemoveHandler((ConfigManager) configManager);
Expand All @@ -204,7 +204,7 @@ public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
}
dataSet.setStatus(status);

LOGGER.info("Node manager finished to remove DataNode {}", removeDataNodePlan);
LOGGER.info("NodeManager finished to remove DataNode {}", removeDataNodePlan);
return dataSet;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public void setEnv(ConfigNodeProcedureEnv env) {
}

public void reportRegionMigrateResult(TRegionMigrateResultReportReq req) {
LOGGER.info("receive DataNode region:{} migrate result:{}", req.getRegionId(), req);

this.executor
.getProcedures()
.values()
Expand All @@ -270,11 +270,6 @@ public void reportRegionMigrateResult(TRegionMigrateResultReportReq req) {
RegionMigrateProcedure regionMigrateProcedure = (RegionMigrateProcedure) procedure;
if (regionMigrateProcedure.getConsensusGroupId().equals(req.getRegionId())) {
regionMigrateProcedure.notifyTheRegionMigrateFinished(req);
} else {
LOGGER.warn(
"DataNode report region:{} is not equals ConfigNode send region:{}",
req.getRegionId(),
regionMigrateProcedure.getConsensusGroupId());
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
Expand Down Expand Up @@ -90,32 +89,36 @@ public List<TConsensusGroupId> getDataNodeRegionIds(TDataNodeLocation dataNodeLo
*
* @param disabledDataNode TDataNodeLocation
*/
public TSStatus broadcastDisableDataNode(TDataNodeLocation disabledDataNode) {
public void broadcastDisableDataNode(TDataNodeLocation disabledDataNode) {
LOGGER.info(
"DataNodeRemoveService start send disable the Data Node to cluster, {}", disabledDataNode);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
List<TEndPoint> otherOnlineDataNodes =
"DataNodeRemoveService start broadcastDisableDataNode to cluster, disabledDataNode: {}",
getIdWithRpcEndpoint(disabledDataNode));

List<TDataNodeConfiguration> otherOnlineDataNodes =
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
.map(TDataNodeConfiguration::getLocation)
.filter(loc -> !loc.equals(disabledDataNode))
.map(TDataNodeLocation::getInternalEndPoint)
.filter(node -> !node.getLocation().equals(disabledDataNode))
.collect(Collectors.toList());

for (TEndPoint server : otherOnlineDataNodes) {
for (TDataNodeConfiguration node : otherOnlineDataNodes) {
TDisableDataNodeReq disableReq = new TDisableDataNodeReq(disabledDataNode);
status =
TSStatus status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
server, disableReq, DataNodeRequestType.DISABLE_DATA_NODE);
node.getLocation().getInternalEndPoint(),
disableReq,
DataNodeRequestType.DISABLE_DATA_NODE);
if (!isSucceed(status)) {
return status;
LOGGER.error(
"broadcastDisableDataNode meets error, disabledDataNode: {}, error: {}",
getIdWithRpcEndpoint(disabledDataNode),
status);
return;
}
}

LOGGER.info(
"DataNodeRemoveService finished send disable the Data Node to cluster, {}",
disabledDataNode);
status.setMessage("Succeed disable the Data Node from cluster");
return status;
"DataNodeRemoveService finished broadcastDisableDataNode to cluster, disabledDataNode: {}",
getIdWithRpcEndpoint(disabledDataNode));
}

/**
Expand All @@ -137,6 +140,7 @@ public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
Optional<TDataNodeLocation> newNode = pickNewReplicaNodeForRegion(regionReplicaNodes);
if (!newNode.isPresent()) {
LOGGER.warn("No enough Data node to migrate region: {}", regionId);
return null;
}
return newNode.get();
}
Expand All @@ -161,15 +165,18 @@ public TSStatus addRegionPeer(TDataNodeLocation destDataNode, TConsensusGroupId
filterDataNodeWithOtherRegionReplica(regionId, destDataNode);
if (!selectedDataNode.isPresent()) {
LOGGER.warn(
"There are no other DataNodes could be selected to perform the add peer process, please check RegionGroup: {} by SQL: show regions",
"There are no other DataNodes could be selected to perform the add peer process, "
+ "please check RegionGroup: {} by SQL: show regions",
regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(
"There are no other DataNodes could be selected to perform the add peer process, please check by SQL: show regions");
"There are no other DataNodes could be selected to perform the add peer process, "
+ "please check by SQL: show regions");
return status;
}

// Send addRegionPeer request to the selected DataNode
// Send addRegionPeer request to the selected DataNode,
// destDataNode is where the new RegionReplica is created
TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, destDataNode);
status =
SyncDataNodeClientPool.getInstance()
Expand All @@ -178,9 +185,9 @@ public TSStatus addRegionPeer(TDataNodeLocation destDataNode, TConsensusGroupId
maintainPeerReq,
DataNodeRequestType.ADD_REGION_PEER);
LOGGER.info(
"Send region {} add peer action to {}, wait it finished",
"Send action addRegionPeer, wait it finished, regionId: {}, dataNode: {}",
regionId,
selectedDataNode.get().getInternalEndPoint());
getIdWithRpcEndpoint(selectedDataNode.get()));
return status;
}

Expand All @@ -194,36 +201,34 @@ public TSStatus addRegionPeer(TDataNodeLocation destDataNode, TConsensusGroupId
* @param regionId region id
* @return TSStatus
*/
public TSStatus removeRegionPeer(TDataNodeLocation originalDataNode, TConsensusGroupId regionId) {
public TSStatus removeRegionPeer(
TDataNodeLocation originalDataNode,
TDataNodeLocation destDataNode,
TConsensusGroupId regionId) {
TSStatus status;

TDataNodeLocation rpcClientDataNode = null;

// Here we pick the DataNode who contains one of the RegionReplica of the specified
// ConsensusGroup except the origin one
// in order to notify the new ConsensusGroup that the origin peer should secede now
// if the selectedDataNode equals null, we choose the destDataNode to execute the method
Optional<TDataNodeLocation> selectedDataNode =
filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
if (!selectedDataNode.isPresent()) {
LOGGER.warn(
"There are no other DataNodes could be selected to perform the remove peer process, please check RegionGroup: {} by SQL: show regions",
regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(
"There are no other DataNodes could be selected to perform the remove peer process, please check by SQL: show regions");
return status;
}
rpcClientDataNode = selectedDataNode.orElse(destDataNode);

// Send addRegionPeer request to the selected DataNode
// Send removeRegionPeer request to the rpcClientDataNode
TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, originalDataNode);
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
selectedDataNode.get().getInternalEndPoint(),
rpcClientDataNode.getInternalEndPoint(),
maintainPeerReq,
DataNodeRequestType.REMOVE_REGION_PEER);
LOGGER.info(
"Send region {} remove peer to {}, wait it finished",
"Send action removeRegionPeer, wait it finished, regionId: {}, dataNode: {}",
regionId,
selectedDataNode.get().getInternalEndPoint());
rpcClientDataNode.getInternalEndPoint());
return status;
}

Expand All @@ -239,6 +244,34 @@ public TSStatus removeRegionPeer(TDataNodeLocation originalDataNode, TConsensusG
*/
public TSStatus deleteOldRegionPeer(
TDataNodeLocation originalDataNode, TConsensusGroupId regionId) {

// when SchemaReplicationFactor==1, execute deleteOldRegionPeer method will cause error
// user must delete the related data manually
if (CONF.getSchemaReplicationFactor() == 1
&& TConsensusGroupType.SchemaRegion.equals(regionId.getType())) {
String errorMessage =
"deleteOldRegionPeer is not supported for schemaRegion when SchemaReplicationFactor equals 1, "
+ "you are supposed to delete the region data of datanode manually";
LOGGER.info(errorMessage);
TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(errorMessage);
return status;
}

// when DataReplicationFactor==1, execute deleteOldRegionPeer method will cause error
// user must delete the related data manually
// TODO if multi-leader supports deleteOldRegionPeer when DataReplicationFactor==1?
if (CONF.getDataReplicationFactor() == 1
&& TConsensusGroupType.DataRegion.equals(regionId.getType())) {
String errorMessage =
"deleteOldRegionPeer is not supported for dataRegion when DataReplicationFactor equals 1, "
+ "you are supposed to delete the region data of datanode manually";
LOGGER.info(errorMessage);
TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(errorMessage);
return status;
}

TSStatus status;
TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, originalDataNode);
status =
Expand All @@ -248,7 +281,7 @@ public TSStatus deleteOldRegionPeer(
maintainPeerReq,
DataNodeRequestType.DELETE_OLD_REGION_PEER);
LOGGER.info(
"Send region {} delete peer action to {}, wait it finished",
"Send action deleteOldRegionPeer to regionId {} on dataNodeId {}, wait it finished",
regionId,
originalDataNode.getInternalEndPoint());
return status;
Expand Down Expand Up @@ -345,10 +378,11 @@ public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocatio
req,
DataNodeRequestType.CREATE_NEW_REGION_PEER);

LOGGER.info("Send create peer for regionId {} on data node {}", regionId, destDataNode);
LOGGER.info(
"Send action createNewRegionPeer, regionId: {}, dataNode: {}", regionId, destDataNode);
if (isFailed(status)) {
LOGGER.error(
"Send create peer for regionId {} on data node {}, result: {}",
"Send action createNewRegionPeer, regionId: {}, dataNode: {}, result: {}",
regionId,
destDataNode,
status);
Expand All @@ -372,7 +406,7 @@ private boolean isFailed(TSStatus status) {
* @throws ProcedureException procedure exception
*/
public TSStatus stopDataNode(TDataNodeLocation dataNode) throws ProcedureException {
LOGGER.info("begin to stop Data Node {}", dataNode);
LOGGER.info("Begin to stop Data Node {}", dataNode);
AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
TSStatus status =
SyncDataNodeClientPool.getInstance()
Expand Down Expand Up @@ -528,6 +562,12 @@ private Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
return regionReplicaNodes.stream().filter(e -> !e.equals(filterLocation)).findAny();
}

private String getIdWithRpcEndpoint(TDataNodeLocation location) {
return String.format(
"dataNodeId: %s, clientRpcEndPoint: %s",
location.getDataNodeId(), location.getClientRpcEndPoint());
}

/**
* Check the protocol of the cluster, standalone is not supported to remove data node currently
*
Expand Down
Loading

0 comments on commit 4bebeee

Please sign in to comment.