diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java index 701fcdf67fc2..be411c8e3666 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java @@ -104,6 +104,7 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException { if (typeNum >= ConfigPhysicalPlanType.values().length) { throw new IOException("unrecognized log type " + typeNum); } + ConfigPhysicalPlanType type = ConfigPhysicalPlanType.values()[typeNum]; ConfigPhysicalPlan req; switch (type) { diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index f6136d558d17..d19b889a7a2c 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -691,8 +691,8 @@ private TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req) { } @Override - public TSStatus addConsensusGroup(List configNodeLocations) { - consensusManager.addConsensusGroup(configNodeLocations); + public TSStatus createPeerForConsensusGroup(List configNodeLocations) { + consensusManager.createPeerForConsensusGroup(configNodeLocations); return StatusUtils.OK; } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java index 3ba78e601d21..75ce264ba574 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java @@ -52,12 +52,13 @@ public class ConsensusManager { private static final Logger LOGGER = LoggerFactory.getLogger(ConsensusManager.class); - private static final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf(); + private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); private final IManager configManager; private ConsensusGroupId consensusGroupId; private IConsensus consensusImpl; + private final int seedConfigNodeId = 0; public ConsensusManager(IManager configManager, PartitionRegionStateMachine stateMachine) throws IOException { @@ -72,15 +73,15 @@ public void close() throws IOException { /** ConsensusLayer local implementation */ private void setConsensusLayer(PartitionRegionStateMachine stateMachine) throws IOException { // There is only one ConfigNodeGroup - consensusGroupId = new PartitionRegionId(conf.getPartitionRegionId()); + consensusGroupId = new PartitionRegionId(CONF.getPartitionRegionId()); // Implement local ConsensusLayer by ConfigNodeConfig consensusImpl = ConsensusFactory.getConsensusImpl( - conf.getConfigNodeConsensusProtocolClass(), + CONF.getConfigNodeConsensusProtocolClass(), ConsensusConfig.newBuilder() - .setThisNode(new TEndPoint(conf.getInternalAddress(), conf.getConsensusPort())) - .setStorageDir(conf.getConsensusDir()) + .setThisNode(new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())) + .setStorageDir(CONF.getConsensusDir()) .build(), gid -> stateMachine) .orElseThrow( @@ -88,41 +89,42 @@ private void setConsensusLayer(PartitionRegionStateMachine stateMachine) throws new IllegalArgumentException( String.format( ConsensusFactory.CONSTRUCT_FAILED_MSG, - conf.getConfigNodeConsensusProtocolClass()))); + CONF.getConfigNodeConsensusProtocolClass()))); consensusImpl.start(); if (SystemPropertiesUtils.isRestarted()) { try { // Create ConsensusGroup from confignode-system.properties file when restart // TODO: Check and notify if current ConfigNode's ip or port has changed - addConsensusGroup(SystemPropertiesUtils.loadConfigNodeList()); + createPeerForConsensusGroup(SystemPropertiesUtils.loadConfigNodeList()); } catch (BadNodeUrlException e) { throw new IOException(e); } } else if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) { // Create ConsensusGroup that contains only itself // if the current ConfigNode is Seed-ConfigNode - addConsensusGroup( + createPeerForConsensusGroup( Collections.singletonList( new TConfigNodeLocation( - 0, - new TEndPoint(conf.getInternalAddress(), conf.getInternalPort()), - new TEndPoint(conf.getInternalAddress(), conf.getConsensusPort())))); + seedConfigNodeId, + new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()), + new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())))); } } /** - * Add the current ConfigNode to the ConsensusGroup + * Create peer in new node to build consensus group * * @param configNodeLocations All registered ConfigNodes */ - public void addConsensusGroup(List configNodeLocations) { + public void createPeerForConsensusGroup(List configNodeLocations) { if (configNodeLocations.size() == 0) { - LOGGER.warn("configNodeLocations is null, addConsensusGroup failed."); + LOGGER.warn("configNodeLocations is empty, createPeerForConsensusGroup failed."); return; } - LOGGER.info("Set ConfigNode consensus group {}...", configNodeLocations); + LOGGER.info("createPeerForConsensusGroup {}...", configNodeLocations); + List peerList = new ArrayList<>(); for (TConfigNodeLocation configNodeLocation : configNodeLocations) { peerList.add(new Peer(consensusGroupId, configNodeLocation.getConsensusEndPoint())); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index db3a802be207..b989356542de 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -254,11 +254,11 @@ TDataPartitionTableResp getOrCreateDataPartition( TSStatus registerConfigNode(TConfigNodeRegisterReq req); /** - * Add Consensus Group in new node. + * Create peer in new node to build consensus group. * * @return status */ - TSStatus addConsensusGroup(List configNodeLocations); + TSStatus createPeerForConsensusGroup(List configNodeLocations); /** * Remove ConfigNode diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java index 3471c1526298..218bbc29e149 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java @@ -177,7 +177,7 @@ public DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan) { * DATANODE_NOT_EXIST when some datanode not exist. */ public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) { - LOGGER.info("Node manager start to remove DataNode {}", removeDataNodePlan); + LOGGER.info("NodeManager start to remove DataNode {}", removeDataNodePlan); DataNodeRemoveHandler dataNodeRemoveHandler = new DataNodeRemoveHandler((ConfigManager) configManager); @@ -204,7 +204,7 @@ public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) { } dataSet.setStatus(status); - LOGGER.info("Node manager finished to remove DataNode {}", removeDataNodePlan); + LOGGER.info("NodeManager finished to remove DataNode {}", removeDataNodePlan); return dataSet; } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 61027c1343fa..a1cc6ad5f205 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -260,7 +260,7 @@ public void setEnv(ConfigNodeProcedureEnv env) { } public void reportRegionMigrateResult(TRegionMigrateResultReportReq req) { - LOGGER.info("receive DataNode region:{} migrate result:{}", req.getRegionId(), req); + this.executor .getProcedures() .values() @@ -270,11 +270,6 @@ public void reportRegionMigrateResult(TRegionMigrateResultReportReq req) { RegionMigrateProcedure regionMigrateProcedure = (RegionMigrateProcedure) procedure; if (regionMigrateProcedure.getConsensusGroupId().equals(req.getRegionId())) { regionMigrateProcedure.notifyTheRegionMigrateFinished(req); - } else { - LOGGER.warn( - "DataNode report region:{} is not equals ConfigNode send region:{}", - req.getRegionId(), - regionMigrateProcedure.getConsensusGroupId()); } } }); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java index 4d53e884b7e5..a83f1a8016e7 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java @@ -22,7 +22,6 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.cluster.NodeStatus; @@ -90,32 +89,36 @@ public List getDataNodeRegionIds(TDataNodeLocation dataNodeLo * * @param disabledDataNode TDataNodeLocation */ - public TSStatus broadcastDisableDataNode(TDataNodeLocation disabledDataNode) { + public void broadcastDisableDataNode(TDataNodeLocation disabledDataNode) { LOGGER.info( - "DataNodeRemoveService start send disable the Data Node to cluster, {}", disabledDataNode); - TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - List otherOnlineDataNodes = + "DataNodeRemoveService start broadcastDisableDataNode to cluster, disabledDataNode: {}", + getIdWithRpcEndpoint(disabledDataNode)); + + List otherOnlineDataNodes = configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream() - .map(TDataNodeConfiguration::getLocation) - .filter(loc -> !loc.equals(disabledDataNode)) - .map(TDataNodeLocation::getInternalEndPoint) + .filter(node -> !node.getLocation().equals(disabledDataNode)) .collect(Collectors.toList()); - for (TEndPoint server : otherOnlineDataNodes) { + for (TDataNodeConfiguration node : otherOnlineDataNodes) { TDisableDataNodeReq disableReq = new TDisableDataNodeReq(disabledDataNode); - status = + TSStatus status = SyncDataNodeClientPool.getInstance() .sendSyncRequestToDataNodeWithRetry( - server, disableReq, DataNodeRequestType.DISABLE_DATA_NODE); + node.getLocation().getInternalEndPoint(), + disableReq, + DataNodeRequestType.DISABLE_DATA_NODE); if (!isSucceed(status)) { - return status; + LOGGER.error( + "broadcastDisableDataNode meets error, disabledDataNode: {}, error: {}", + getIdWithRpcEndpoint(disabledDataNode), + status); + return; } } + LOGGER.info( - "DataNodeRemoveService finished send disable the Data Node to cluster, {}", - disabledDataNode); - status.setMessage("Succeed disable the Data Node from cluster"); - return status; + "DataNodeRemoveService finished broadcastDisableDataNode to cluster, disabledDataNode: {}", + getIdWithRpcEndpoint(disabledDataNode)); } /** @@ -137,6 +140,7 @@ public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) { Optional newNode = pickNewReplicaNodeForRegion(regionReplicaNodes); if (!newNode.isPresent()) { LOGGER.warn("No enough Data node to migrate region: {}", regionId); + return null; } return newNode.get(); } @@ -161,15 +165,18 @@ public TSStatus addRegionPeer(TDataNodeLocation destDataNode, TConsensusGroupId filterDataNodeWithOtherRegionReplica(regionId, destDataNode); if (!selectedDataNode.isPresent()) { LOGGER.warn( - "There are no other DataNodes could be selected to perform the add peer process, please check RegionGroup: {} by SQL: show regions", + "There are no other DataNodes could be selected to perform the add peer process, " + + "please check RegionGroup: {} by SQL: show regions", regionId); status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); status.setMessage( - "There are no other DataNodes could be selected to perform the add peer process, please check by SQL: show regions"); + "There are no other DataNodes could be selected to perform the add peer process, " + + "please check by SQL: show regions"); return status; } - // Send addRegionPeer request to the selected DataNode + // Send addRegionPeer request to the selected DataNode, + // destDataNode is where the new RegionReplica is created TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, destDataNode); status = SyncDataNodeClientPool.getInstance() @@ -178,9 +185,9 @@ public TSStatus addRegionPeer(TDataNodeLocation destDataNode, TConsensusGroupId maintainPeerReq, DataNodeRequestType.ADD_REGION_PEER); LOGGER.info( - "Send region {} add peer action to {}, wait it finished", + "Send action addRegionPeer, wait it finished, regionId: {}, dataNode: {}", regionId, - selectedDataNode.get().getInternalEndPoint()); + getIdWithRpcEndpoint(selectedDataNode.get())); return status; } @@ -194,36 +201,34 @@ public TSStatus addRegionPeer(TDataNodeLocation destDataNode, TConsensusGroupId * @param regionId region id * @return TSStatus */ - public TSStatus removeRegionPeer(TDataNodeLocation originalDataNode, TConsensusGroupId regionId) { + public TSStatus removeRegionPeer( + TDataNodeLocation originalDataNode, + TDataNodeLocation destDataNode, + TConsensusGroupId regionId) { TSStatus status; + TDataNodeLocation rpcClientDataNode = null; + // Here we pick the DataNode who contains one of the RegionReplica of the specified // ConsensusGroup except the origin one // in order to notify the new ConsensusGroup that the origin peer should secede now + // if the selectedDataNode equals null, we choose the destDataNode to execute the method Optional selectedDataNode = filterDataNodeWithOtherRegionReplica(regionId, originalDataNode); - if (!selectedDataNode.isPresent()) { - LOGGER.warn( - "There are no other DataNodes could be selected to perform the remove peer process, please check RegionGroup: {} by SQL: show regions", - regionId); - status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); - status.setMessage( - "There are no other DataNodes could be selected to perform the remove peer process, please check by SQL: show regions"); - return status; - } + rpcClientDataNode = selectedDataNode.orElse(destDataNode); - // Send addRegionPeer request to the selected DataNode + // Send removeRegionPeer request to the rpcClientDataNode TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, originalDataNode); status = SyncDataNodeClientPool.getInstance() .sendSyncRequestToDataNodeWithRetry( - selectedDataNode.get().getInternalEndPoint(), + rpcClientDataNode.getInternalEndPoint(), maintainPeerReq, DataNodeRequestType.REMOVE_REGION_PEER); LOGGER.info( - "Send region {} remove peer to {}, wait it finished", + "Send action removeRegionPeer, wait it finished, regionId: {}, dataNode: {}", regionId, - selectedDataNode.get().getInternalEndPoint()); + rpcClientDataNode.getInternalEndPoint()); return status; } @@ -239,6 +244,34 @@ public TSStatus removeRegionPeer(TDataNodeLocation originalDataNode, TConsensusG */ public TSStatus deleteOldRegionPeer( TDataNodeLocation originalDataNode, TConsensusGroupId regionId) { + + // when SchemaReplicationFactor==1, execute deleteOldRegionPeer method will cause error + // user must delete the related data manually + if (CONF.getSchemaReplicationFactor() == 1 + && TConsensusGroupType.SchemaRegion.equals(regionId.getType())) { + String errorMessage = + "deleteOldRegionPeer is not supported for schemaRegion when SchemaReplicationFactor equals 1, " + + "you are supposed to delete the region data of datanode manually"; + LOGGER.info(errorMessage); + TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); + status.setMessage(errorMessage); + return status; + } + + // when DataReplicationFactor==1, execute deleteOldRegionPeer method will cause error + // user must delete the related data manually + // TODO if multi-leader supports deleteOldRegionPeer when DataReplicationFactor==1? + if (CONF.getDataReplicationFactor() == 1 + && TConsensusGroupType.DataRegion.equals(regionId.getType())) { + String errorMessage = + "deleteOldRegionPeer is not supported for dataRegion when DataReplicationFactor equals 1, " + + "you are supposed to delete the region data of datanode manually"; + LOGGER.info(errorMessage); + TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); + status.setMessage(errorMessage); + return status; + } + TSStatus status; TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, originalDataNode); status = @@ -248,7 +281,7 @@ public TSStatus deleteOldRegionPeer( maintainPeerReq, DataNodeRequestType.DELETE_OLD_REGION_PEER); LOGGER.info( - "Send region {} delete peer action to {}, wait it finished", + "Send action deleteOldRegionPeer to regionId {} on dataNodeId {}, wait it finished", regionId, originalDataNode.getInternalEndPoint()); return status; @@ -345,10 +378,11 @@ public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocatio req, DataNodeRequestType.CREATE_NEW_REGION_PEER); - LOGGER.info("Send create peer for regionId {} on data node {}", regionId, destDataNode); + LOGGER.info( + "Send action createNewRegionPeer, regionId: {}, dataNode: {}", regionId, destDataNode); if (isFailed(status)) { LOGGER.error( - "Send create peer for regionId {} on data node {}, result: {}", + "Send action createNewRegionPeer, regionId: {}, dataNode: {}, result: {}", regionId, destDataNode, status); @@ -372,7 +406,7 @@ private boolean isFailed(TSStatus status) { * @throws ProcedureException procedure exception */ public TSStatus stopDataNode(TDataNodeLocation dataNode) throws ProcedureException { - LOGGER.info("begin to stop Data Node {}", dataNode); + LOGGER.info("Begin to stop Data Node {}", dataNode); AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint()); TSStatus status = SyncDataNodeClientPool.getInstance() @@ -528,6 +562,12 @@ private Optional filterDataNodeWithOtherRegionReplica( return regionReplicaNodes.stream().filter(e -> !e.equals(filterLocation)).findAny(); } + private String getIdWithRpcEndpoint(TDataNodeLocation location) { + return String.format( + "dataNodeId: %s, clientRpcEndPoint: %s", + location.getDataNodeId(), location.getClientRpcEndPoint()); + } + /** * Check the protocol of the cluster, standalone is not supported to remove data node currently * diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AbstractNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AbstractNodeProcedure.java index 15a256143ea3..1bd5565dc4ab 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AbstractNodeProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AbstractNodeProcedure.java @@ -36,11 +36,11 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced configNodeProcedureEnv.getSchedulerLock().lock(); try { if (configNodeProcedureEnv.getNodeLock().tryLock(this)) { - LOG.info("{} acquire lock.", getProcId()); + LOG.info("procedureId {} acquire lock.", getProcId()); return ProcedureLockState.LOCK_ACQUIRED; } configNodeProcedureEnv.getNodeLock().waitProcedure(this); - LOG.info("{} wait for lock.", getProcId()); + LOG.info("procedureId {} wait for lock.", getProcId()); return ProcedureLockState.LOCK_EVENT_WAIT; } finally { configNodeProcedureEnv.getSchedulerLock().unlock(); @@ -51,7 +51,7 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) { configNodeProcedureEnv.getSchedulerLock().lock(); try { - LOG.info("{} release lock.", getProcId()); + LOG.info("procedureId {} release lock.", getProcId()); if (configNodeProcedureEnv.getNodeLock().releaseLock(this)) { configNodeProcedureEnv .getNodeLock() diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java index 1fa944b11353..6d2f87d9725d 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java @@ -40,11 +40,13 @@ import java.io.IOException; import java.nio.ByteBuffer; +import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS; + /** region migrate procedure */ public class RegionMigrateProcedure extends StateMachineProcedure { private static final Logger LOG = LoggerFactory.getLogger(RegionMigrateProcedure.class); - private static final int retryThreshold = 5; + private static final int RETRY_THRESHOLD = 5; /** Wait region migrate finished */ private final Object regionMigrateLock = new Object(); @@ -55,6 +57,10 @@ public class RegionMigrateProcedure private TDataNodeLocation destDataNode; + private boolean migrateSuccess = true; + + private String migrateResult = ""; + public RegionMigrateProcedure() { super(); } @@ -86,9 +92,9 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RegionTransitionStat break; case ADD_REGION_PEER: tsStatus = env.getDataNodeRemoveHandler().addRegionPeer(destDataNode, consensusGroupId); - if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { waitForOneMigrationStepFinished(consensusGroupId); - LOG.info("Wait for region {} add peer finished", consensusGroupId); + LOG.info("Wait for ADD_REGION_PEER finished, regionId: {}", consensusGroupId); } else { throw new ProcedureException("Failed to add region peer"); } @@ -100,10 +106,11 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RegionTransitionStat break; case REMOVE_REGION_PEER: tsStatus = - env.getDataNodeRemoveHandler().removeRegionPeer(originalDataNode, consensusGroupId); - if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + env.getDataNodeRemoveHandler() + .removeRegionPeer(originalDataNode, destDataNode, consensusGroupId); + if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { waitForOneMigrationStepFinished(consensusGroupId); - LOG.info("Wait for region {} remove peer finished", consensusGroupId); + LOG.info("Wait REMOVE_REGION_PEER finished, regionId: {}", consensusGroupId); } else { throw new ProcedureException("Failed to remove region peer"); } @@ -113,9 +120,9 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RegionTransitionStat tsStatus = env.getDataNodeRemoveHandler() .deleteOldRegionPeer(originalDataNode, consensusGroupId); - if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { waitForOneMigrationStepFinished(consensusGroupId); - LOG.info("Wait for region {} remove consensus group finished", consensusGroupId); + LOG.info("Wait for DELETE_OLD_REGION_PEER finished, regionId: {}", consensusGroupId); } // remove consensus group after a node stop, which will be failed, but we will // continuously execute. @@ -127,14 +134,27 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RegionTransitionStat return Flow.NO_MORE_STATE; } } catch (Exception e) { + LOG.error( + "Meets error in region migrate state, please do the rollback operation yourself manually according to the error message!!! " + + "error state: {}, migrateResult: {}", + state, + migrateResult); if (isRollbackSupported(state)) { - setFailure(new ProcedureException("Region migrate failed " + state)); + setFailure(new ProcedureException("Region migrate failed at state: " + state)); } else { LOG.error( - "Retrievable error trying to region migrate {}, state {}", originalDataNode, state, e); - if (getCycles() > retryThreshold) { - setFailure(new ProcedureException("State stuck at " + state)); + "Failed state is not support rollback, filed state {}, originalDataNode: {}", + state, + originalDataNode); + if (getCycles() > RETRY_THRESHOLD) { + setFailure( + new ProcedureException( + "Procedure retried failed exceed 5 times, state stuck at " + state)); } + + // meets exception in region migrate process + // terminate the process + return Flow.NO_MORE_STATE; } } return Flow.HAS_MORE_STATE; @@ -154,12 +174,12 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced configNodeProcedureEnv.getSchedulerLock().lock(); try { if (configNodeProcedureEnv.getRegionMigrateLock().tryLock(this)) { - LOG.info("{} acquire lock.", getProcId()); + LOG.info("procedureId {} acquire lock.", getProcId()); return ProcedureLockState.LOCK_ACQUIRED; } configNodeProcedureEnv.getRegionMigrateLock().waitProcedure(this); - LOG.info("{} wait for lock.", getProcId()); + LOG.info("procedureId {} wait for lock.", getProcId()); return ProcedureLockState.LOCK_EVENT_WAIT; } finally { configNodeProcedureEnv.getSchedulerLock().unlock(); @@ -170,7 +190,7 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) { configNodeProcedureEnv.getSchedulerLock().lock(); try { - LOG.info("{} release lock.", getProcId()); + LOG.info("procedureId {} release lock.", getProcId()); if (configNodeProcedureEnv.getRegionMigrateLock().releaseLock(this)) { configNodeProcedureEnv .getRegionMigrateLock() @@ -230,12 +250,18 @@ public boolean equals(Object that) { return false; } - public TSStatus waitForOneMigrationStepFinished(TConsensusGroupId consensusGroupId) { - TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + public TSStatus waitForOneMigrationStepFinished(TConsensusGroupId consensusGroupId) + throws Exception { + TSStatus status = new TSStatus(SUCCESS_STATUS.getStatusCode()); synchronized (regionMigrateLock) { try { // TODO set timeOut? regionMigrateLock.wait(); + + if (!migrateSuccess) { + throw new ProcedureException( + String.format("Region migrate failed, regionId: %s", consensusGroupId)); + } } catch (InterruptedException e) { LOG.error("region migrate {} interrupt", consensusGroupId, e); Thread.currentThread().interrupt(); @@ -248,12 +274,20 @@ public TSStatus waitForOneMigrationStepFinished(TConsensusGroupId consensusGroup /** DataNode report region migrate result to ConfigNode, and continue */ public void notifyTheRegionMigrateFinished(TRegionMigrateResultReportReq req) { + + LOG.info("ConfigNode received DataNode reported region migrate result: {} ", req); + // TODO the req is used in roll back synchronized (regionMigrateLock) { + TSStatus migrateStatus = req.getMigrateResult(); + // migrate failed + if (migrateStatus.getCode() != SUCCESS_STATUS.getStatusCode()) { + LOG.info("Region migrate executed failed in DataNode, migrateStatus: {}", migrateStatus); + migrateSuccess = false; + migrateResult = migrateStatus.toString(); + } regionMigrateLock.notify(); } - LOG.info( - "notified after DataNode reported region {} migrate result:{} ", req.getRegionId(), req); } public TConsensusGroupId getConsensusGroupId() { diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 1da0ac6eedf0..28e22d33ce99 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -394,7 +394,7 @@ public TSStatus registerConfigNode(TConfigNodeRegisterReq req) throws TException @Override public TSStatus addConsensusGroup(TAddConsensusGroupReq registerResp) { - return configManager.addConsensusGroup(registerResp.getConfigNodeList()); + return configManager.createPeerForConsensusGroup(registerResp.getConfigNodeList()); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java index 1282b32e8fdb..139233c31056 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -77,7 +77,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; public class DataNode implements DataNodeMBean { private static final Logger logger = LoggerFactory.getLogger(DataNode.class); @@ -415,20 +414,6 @@ public void stop() { } catch (Exception e) { logger.error("stop data node error", e); } - - // kill the datanode process 5 seconds later - // if remove this step, datanode process will still alive - new Thread( - () -> { - try { - TimeUnit.SECONDS.sleep(5); - } catch (InterruptedException e) { - logger.error("Meets InterruptedException in stop method of DataNode"); - } finally { - System.exit(0); - } - }) - .start(); } private void initServiceProvider() throws QueryProcessException { diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java index 15d902041d19..5fc9c2a02246 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java +++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java @@ -158,6 +158,11 @@ private void removeNodesFromCluster(String[] args) throw new IoTDBException( removeResp.getStatus().toString(), removeResp.getStatus().getCode()); } + logger.info( + "Submit remove datanode request successfully, " + + "more details are shown in the logs of confignode-leader and removed-datanode, " + + "and after the process of remove-datanode is over, " + + "you are supposed to delete directory and data of the removed-datanode manually"); } } diff --git a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java index 9cbe669564a3..09707a4e925a 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java +++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java @@ -55,6 +55,7 @@ public class RegionMigrateService implements IService { private static final int RETRY = 5; private static final int SLEEP_MILLIS = 5000; + private RegionMigratePool regionMigratePool; private RegionMigrateService() {} @@ -64,10 +65,10 @@ public static RegionMigrateService getInstance() { } /** - * add a region peer + * submit AddRegionPeerTask * - * @param req TMigrateRegionReq - * @return submit task succeed? + * @param req TMaintainPeerReq + * @return if the submit task succeed */ public synchronized boolean submitAddRegionPeerTask(TMaintainPeerReq req) { @@ -76,7 +77,7 @@ public synchronized boolean submitAddRegionPeerTask(TMaintainPeerReq req) { regionMigratePool.submit(new AddRegionPeerTask(req.getRegionId(), req.getDestNode())); } catch (Exception e) { LOGGER.error( - "Submit add region peer task error for Region: {} on DataNode: {}.", + "Submit addRegionPeer task error for Region: {} on DataNode: {}.", req.getRegionId(), req.getDestNode().getInternalEndPoint().getIp(), e); @@ -86,10 +87,10 @@ public synchronized boolean submitAddRegionPeerTask(TMaintainPeerReq req) { } /** - * remove a region peer + * submit RemoveRegionPeerTask * - * @param req TMigrateRegionReq - * @return submit task succeed? + * @param req TMaintainPeerReq + * @return if the submit task succeed */ public synchronized boolean submitRemoveRegionPeerTask(TMaintainPeerReq req) { @@ -98,7 +99,7 @@ public synchronized boolean submitRemoveRegionPeerTask(TMaintainPeerReq req) { regionMigratePool.submit(new RemoveRegionPeerTask(req.getRegionId(), req.getDestNode())); } catch (Exception e) { LOGGER.error( - "Submit remove region peer task error for Region: {} on DataNode: {}.", + "Submit removeRegionPeer task error for Region: {} on DataNode: {}.", req.getRegionId(), req.getDestNode().getInternalEndPoint().getIp(), e); @@ -113,15 +114,14 @@ public synchronized boolean submitRemoveRegionPeerTask(TMaintainPeerReq req) { * @param req TMigrateRegionReq * @return submit task succeed? */ - public synchronized boolean submitRemoveRegionConsensusGroupTask(TMaintainPeerReq req) { + public synchronized boolean submitDeleteOldRegionPeerTask(TMaintainPeerReq req) { boolean submitSucceed = true; try { - regionMigratePool.submit( - new RemoveRegionConsensusGroupTask(req.getRegionId(), req.getDestNode())); + regionMigratePool.submit(new DeleteOldRegionPeerTask(req.getRegionId(), req.getDestNode())); } catch (Exception e) { LOGGER.error( - "Submit remove region peer task error for Region: {} on DataNode: {}.", + "Submit deleteOldRegionPeerTask error for Region: {} on DataNode: {}.", req.getRegionId(), req.getDestNode().getInternalEndPoint().getIp(), e); @@ -276,15 +276,12 @@ private TSStatus addPeer() { } catch (Throwable e) { addPeerSucceed = false; taskLogger.error( - "add new peer {} for region {} error, retry times: {}", newPeerNode, regionId, i, e); + "Add new peer {} for region {} error, retry times: {}", newPeerNode, regionId, i, e); status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); status.setMessage( - "add peer " - + newPeerNode - + " for region: " - + regionId - + " error, exception: " - + e.getMessage()); + String.format( + "Add peer for region error, peerId: %s, regionId: %s, errorMessage: %s", + newPeerNode, regionId, e.getMessage())); } if (addPeerSucceed && resp != null && resp.isSuccess()) { break; @@ -292,9 +289,12 @@ private TSStatus addPeer() { } if (!addPeerSucceed || resp == null || !resp.isSuccess()) { taskLogger.error( - "add new peer {} for region {} failed, resp: {}", newPeerNode, regionId, resp); + "Add new peer {} for region {} failed, resp: {}", newPeerNode, regionId, resp); status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); - status.setMessage("add new peer " + newPeerNode + " for region " + regionId + "failed"); + status.setMessage( + String.format( + "Add peer for region error, peerId: %s, regionId: %s, resp: %s", + newPeerNode, regionId, resp)); return status; } @@ -334,10 +334,10 @@ private boolean isFailed(TSStatus status) { private static class RemoveRegionPeerTask implements Runnable { private static final Logger taskLogger = LoggerFactory.getLogger(RemoveRegionPeerTask.class); - // The RegionGroup that shall perform the add peer process + // The RegionGroup that shall perform the remove peer process private final TConsensusGroupId tRegionId; - // The DataNode that selected to perform the add peer process + // The DataNode that selected to perform the remove peer process private final TDataNodeLocation selectedDataNode; public RemoveRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation selectedDataNode) { @@ -370,7 +370,7 @@ private TSStatus removePeer() { ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId); TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); TEndPoint oldPeerNode = getConsensusEndPoint(selectedDataNode, regionId); - taskLogger.info("start to remove peer {} for region {}", oldPeerNode, regionId); + taskLogger.info("Start to remove peer {} for region {}", oldPeerNode, regionId); ConsensusGenericResponse resp = null; boolean removePeerSucceed = true; for (int i = 0; i < RETRY; i++) { @@ -429,9 +429,8 @@ private boolean isFailed(TSStatus status) { } } - private static class RemoveRegionConsensusGroupTask implements Runnable { - private static final Logger taskLogger = - LoggerFactory.getLogger(RemoveRegionConsensusGroupTask.class); + private static class DeleteOldRegionPeerTask implements Runnable { + private static final Logger taskLogger = LoggerFactory.getLogger(DeleteOldRegionPeerTask.class); // migrate which region private final TConsensusGroupId tRegionId; @@ -439,14 +438,14 @@ private static class RemoveRegionConsensusGroupTask implements Runnable { // migrate from which node private final TDataNodeLocation fromNode; - public RemoveRegionConsensusGroupTask(TConsensusGroupId tRegionId, TDataNodeLocation fromNode) { + public DeleteOldRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation fromNode) { this.tRegionId = tRegionId; this.fromNode = fromNode; } @Override public void run() { - TSStatus runResult = removeConsensusGroup(); + TSStatus runResult = deleteOldRegionPeer(); if (isFailed(runResult)) { reportFailed( tRegionId, fromNode, TRegionMigrateFailedType.RemoveConsensusGroupFailed, runResult); @@ -460,30 +459,9 @@ public void run() { reportSucceed(tRegionId); } - private TSStatus deleteRegion() { - TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId); - taskLogger.debug("start to delete region {}", regionId); - try { - if (regionId instanceof DataRegionId) { - StorageEngineV2.getInstance().deleteDataRegion((DataRegionId) regionId); - } else { - SchemaEngine.getInstance().deleteSchemaRegion((SchemaRegionId) regionId); - } - } catch (Throwable e) { - taskLogger.error("delete the region {} failed", regionId, e); - status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode()); - status.setMessage("delete region " + regionId + "failed, " + e.getMessage()); - return status; - } - status.setMessage("delete region " + regionId + " succeed"); - taskLogger.info("Finished to delete region {}", regionId); - return status; - } - - private TSStatus removeConsensusGroup() { + private TSStatus deleteOldRegionPeer() { ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId); - taskLogger.info("start to remove region {} consensus group", regionId); + taskLogger.info("Start to deleteOldRegionPeer: {}", regionId); TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); ConsensusGenericResponse resp; try { @@ -493,23 +471,19 @@ private TSStatus removeConsensusGroup() { resp = SchemaRegionConsensusImpl.getInstance().deletePeer(regionId); } } catch (Throwable e) { - taskLogger.error("remove region {} consensus group error", regionId, e); + taskLogger.error("DeleteOldRegionPeer error, regionId: {}", regionId, e); status.setCode(TSStatusCode.REGION_MIGRATE_FAILED.getStatusCode()); status.setMessage( - "remove consensus group for region: " - + regionId - + " error. exception: " - + e.getMessage()); + "deleteOldRegionPeer for region: " + regionId + " error. exception: " + e.getMessage()); return status; } if (!resp.isSuccess()) { - taskLogger.error("remove region {} consensus group failed", regionId, resp.getException()); + taskLogger.error("deleteOldRegionPeer error, regionId: {}", regionId, resp.getException()); status.setCode(TSStatusCode.REGION_MIGRATE_FAILED.getStatusCode()); status.setMessage( - "remove consensus group for region: " - + regionId - + " failed. exception: " - + resp.getException().getMessage()); + String.format( + "deleteOldRegionPeer error, regionId: %s, errorMessage: %s", + regionId, resp.getException().getMessage())); return status; } taskLogger.info("succeed to remove region {} consensus group", regionId); @@ -517,6 +491,27 @@ private TSStatus removeConsensusGroup() { return status; } + private TSStatus deleteRegion() { + TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId); + taskLogger.debug("start to delete region {}", regionId); + try { + if (regionId instanceof DataRegionId) { + StorageEngineV2.getInstance().deleteDataRegion((DataRegionId) regionId); + } else { + SchemaEngine.getInstance().deleteSchemaRegion((SchemaRegionId) regionId); + } + } catch (Throwable e) { + taskLogger.error("delete the region {} failed", regionId, e); + status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode()); + status.setMessage("delete region " + regionId + "failed, " + e.getMessage()); + return status; + } + status.setMessage("delete region " + regionId + " succeed"); + taskLogger.info("Finished to delete region {}", regionId); + return status; + } + private boolean isSucceed(TSStatus status) { return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode(); } diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java index 1dc70958b705..7b74027cef1a 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -128,6 +128,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface { @@ -651,14 +652,13 @@ public TSStatus createNewRegionPeer(TCreatePeerReq req) throws TException { ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getRegionId()); List peers = req.getRegionLocations().stream() - .map(n -> getConsensusEndPoint(n, regionId)) - .map(node -> new Peer(regionId, node)) + .map(location -> new Peer(regionId, getConsensusEndPoint(location, regionId))) .collect(Collectors.toList()); TSStatus status = createNewRegion(regionId, req.getStorageGroup(), req.getTtl()); if (!isSucceed(status)) { return status; } - return addConsensusGroup(regionId, peers); + return createNewRegionPeer(regionId, peers); } @Override @@ -669,13 +669,13 @@ public TSStatus addRegionPeer(TMaintainPeerReq req) throws TException { TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); if (submitSucceed) { LOGGER.info( - "Successfully submit a add region peer task for region: {} on DataNode: {}", + "Successfully submit addRegionPeer task for region: {} on DataNode: {}", regionId, selectedDataNodeIP); return status; } status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); - status.setMessage("submit add region peer task failed, region: " + regionId); + status.setMessage("Submit addRegionPeer task failed, region: " + regionId); return status; } @@ -687,13 +687,13 @@ public TSStatus removeRegionPeer(TMaintainPeerReq req) throws TException { TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); if (submitSucceed) { LOGGER.info( - "Successfully to submit a remove region peer task for region: {} on DataNode: {}", + "Successfully submit removeRegionPeer task for region: {} on DataNode: {}", regionId, selectedDataNodeIP); return status; } status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); - status.setMessage("submit add region peer task failed, region: " + regionId); + status.setMessage("Submit removeRegionPeer task failed, region: " + regionId); return status; } @@ -701,19 +701,17 @@ public TSStatus removeRegionPeer(TMaintainPeerReq req) throws TException { public TSStatus deleteOldRegionPeer(TMaintainPeerReq req) throws TException { TConsensusGroupId regionId = req.getRegionId(); String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp(); - boolean submitSucceed = - RegionMigrateService.getInstance().submitRemoveRegionConsensusGroupTask(req); + boolean submitSucceed = RegionMigrateService.getInstance().submitDeleteOldRegionPeerTask(req); TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); if (submitSucceed) { LOGGER.info( - "Successfully to submit a remove region consensus group task for region: {} on DataNode: {}", + "Successfully submit deleteOldRegionPeer task for region: {} on DataNode: {}", regionId, selectedDataNodeIP); return status; } status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); - status.setMessage( - "submit region remove region consensus group task failed, region: " + regionId); + status.setMessage("Submit deleteOldRegionPeer task failed, region: " + regionId); return status; } @@ -819,8 +817,8 @@ private boolean isSucceed(TSStatus status) { return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode(); } - private TSStatus addConsensusGroup(ConsensusGroupId regionId, List peers) { - LOGGER.info("Start to add consensus group {} to region {}", peers, regionId); + private TSStatus createNewRegionPeer(ConsensusGroupId regionId, List peers) { + LOGGER.info("Start to createNewRegionPeer {} to region {}", peers, regionId); TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); ConsensusGenericResponse resp; if (regionId instanceof DataRegionId) { @@ -830,13 +828,16 @@ private TSStatus addConsensusGroup(ConsensusGroupId regionId, List peers) } if (!resp.isSuccess()) { LOGGER.error( - "add peers {} to region {} consensus group error", peers, regionId, resp.getException()); + "CreateNewRegionPeer error, peers: {}, regionId: {}, errorMessage", + peers, + regionId, + resp.getException()); status.setCode(TSStatusCode.REGION_MIGRATE_FAILED.getStatusCode()); status.setMessage(resp.getException().getMessage()); return status; } - LOGGER.info("succeed to add peers {} to region {} consensus group", peers, regionId); - status.setMessage("add peers to region consensus group " + regionId + "succeed"); + LOGGER.info("Succeed to createNewRegionPeer {} for region {}", peers, regionId); + status.setMessage("createNewRegionPeer succeed, regionId: " + regionId); return status; } @@ -854,7 +855,23 @@ public TSStatus disableDataNode(TDisableDataNodeReq req) throws TException { @Override public TSStatus stopDataNode() { TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - LOGGER.info("stopping Data Node"); + LOGGER.info("Execute stopDataNode RPC method"); + + // kill the datanode process 20 seconds later + // because datanode process cannot exit normally for the reason of InterruptedException + new Thread( + () -> { + try { + TimeUnit.SECONDS.sleep(20); + } catch (InterruptedException e) { + LOGGER.error("Meets InterruptedException in stopDataNode RPC method"); + } finally { + LOGGER.info("Executing system.exit(0) in stopDataNode RPC method after 20 seconds"); + System.exit(0); + } + }) + .start(); + try { DataNode.getInstance().stop(); status.setMessage("stop datanode succeed");