Skip to content

Commit

Permalink
[IOTDB-4419] Maintain RegionStatus through cluster heartbeat (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
CRZbulabula authored Sep 18, 2022
1 parent f140313 commit 3348a06
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.heartbeat.RegionGroupCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.RegionHeartbeatSample;
Expand All @@ -36,12 +35,12 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
// Update DataNodeHeartbeatCache when success
private final TDataNodeLocation dataNodeLocation;
private final DataNodeHeartbeatCache dataNodeHeartbeatCache;
private final Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap;
private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;

public DataNodeHeartbeatHandler(
TDataNodeLocation dataNodeLocation,
DataNodeHeartbeatCache dataNodeHeartbeatCache,
Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap) {
Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap) {
this.dataNodeLocation = dataNodeLocation;
this.dataNodeHeartbeatCache = dataNodeHeartbeatCache;
this.regionGroupCacheMap = regionGroupCacheMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@

import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
Expand Down Expand Up @@ -80,14 +78,12 @@
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
Expand All @@ -102,7 +98,6 @@
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -117,7 +112,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
Expand Down Expand Up @@ -792,24 +786,7 @@ public UDFManager getUDFManager() {
public RegionInfoListResp showRegion(GetRegionInfoListPlan getRegionInfoListPlan) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
RegionInfoListResp regionInfoListResp =
(RegionInfoListResp) partitionManager.getRegionInfoList(getRegionInfoListPlan);
regionInfoListResp
.getRegionInfoList()
.forEach(
regionInfo -> {
Map<TConsensusGroupId, Integer> allLeadership =
getPartitionManager().getAllLeadership();
if (!allLeadership.isEmpty()) {
String regionType =
regionInfo.getDataNodeId()
== allLeadership.getOrDefault(regionInfo.getConsensusGroupId(), -1)
? RegionRoleType.Leader.toString()
: RegionRoleType.Follower.toString();
regionInfo.setRoleType(regionType);
}
});
return regionInfoListResp;
return (RegionInfoListResp) partitionManager.getRegionInfoList(getRegionInfoListPlan);
} else {
RegionInfoListResp regionResp = new RegionInfoListResp();
regionResp.setStatus(status);
Expand All @@ -822,48 +799,8 @@ public TShowDataNodesResp showDataNodes() {
TSStatus status = confirmLeader();
TShowDataNodesResp resp = new TShowDataNodesResp();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
List<TDataNodeInfo> registeredDataNodesInfoList = nodeManager.getRegisteredDataNodeInfoList();

// Map<DataNodeId, DataRegionNum>
Map<Integer, AtomicInteger> dataRegionNumMap = new HashMap<>();
// Map<DataNodeId, SchemaRegionNum>
Map<Integer, AtomicInteger> schemaRegionNumMap = new HashMap<>();

List<TRegionInfo> regionInfoList =
((RegionInfoListResp) partitionManager.getRegionInfoList(new GetRegionInfoListPlan()))
.getRegionInfoList();
if (CollectionUtils.isNotEmpty(regionInfoList)) {

regionInfoList.forEach(
(regionInfo) -> {
int dataNodeId = regionInfo.getDataNodeId();
int regionTypeValue = regionInfo.getConsensusGroupId().getType().getValue();
int dataRegionNum =
regionTypeValue == TConsensusGroupType.DataRegion.getValue() ? 1 : 0;
int schemaRegionNum =
regionTypeValue == TConsensusGroupType.SchemaRegion.getValue() ? 1 : 0;
dataRegionNumMap
.computeIfAbsent(dataNodeId, key -> new AtomicInteger())
.addAndGet(dataRegionNum);
schemaRegionNumMap
.computeIfAbsent(dataNodeId, key -> new AtomicInteger())
.addAndGet(schemaRegionNum);
});

registeredDataNodesInfoList.forEach(
(dataNodesInfo -> {
if (dataRegionNumMap.containsKey(dataNodesInfo.getDataNodeId())) {
dataNodesInfo.setDataRegionNum(
dataRegionNumMap.get(dataNodesInfo.getDataNodeId()).get());
}
if (schemaRegionNumMap.containsKey(dataNodesInfo.getDataNodeId())) {
dataNodesInfo.setSchemaRegionNum(
schemaRegionNumMap.get(dataNodesInfo.getDataNodeId()).get());
}
}));
}

return resp.setDataNodesInfoList(registeredDataNodesInfoList).setStatus(StatusUtils.OK);
return resp.setDataNodesInfoList(nodeManager.getRegisteredDataNodeInfoList())
.setStatus(StatusUtils.OK);
} else {
return resp.setStatus(status);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionRoleType;
Expand Down Expand Up @@ -70,6 +71,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -275,6 +277,43 @@ public List<TDataNodeInfo> getRegisteredDataNodeInfoList() {
dataNodeInfoList.add(info);
});
}

// Map<DataNodeId, DataRegionNum>
Map<Integer, AtomicInteger> dataRegionNumMap = new HashMap<>();
// Map<DataNodeId, SchemaRegionNum>
Map<Integer, AtomicInteger> schemaRegionNumMap = new HashMap<>();
List<TRegionReplicaSet> regionReplicaSets = getPartitionManager().getAllReplicaSets();
regionReplicaSets.forEach(
regionReplicaSet ->
regionReplicaSet
.getDataNodeLocations()
.forEach(
dataNodeLocation -> {
switch (regionReplicaSet.getRegionId().getType()) {
case SchemaRegion:
schemaRegionNumMap
.computeIfAbsent(
dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
.getAndIncrement();
break;
case DataRegion:
default:
dataRegionNumMap
.computeIfAbsent(
dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
.getAndIncrement();
}
}));
AtomicInteger zero = new AtomicInteger(0);
dataNodeInfoList.forEach(
(dataNodesInfo -> {
dataNodesInfo.setSchemaRegionNum(
schemaRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
dataNodesInfo.setDataRegionNum(
dataRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
}));

dataNodeInfoList.sort(Comparator.comparingInt(TDataNodeInfo::getDataNodeId));
return dataNodeInfoList;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.partition.DataPartitionTable;
Expand All @@ -45,12 +46,13 @@
import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
import org.apache.iotdb.confignode.consensus.response.RegionInfoListResp;
import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.RegionGroupCache;
import org.apache.iotdb.confignode.persistence.metric.PartitionInfoMetrics;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.consensus.ConsensusFactory;
Expand Down Expand Up @@ -93,7 +95,7 @@ public class PartitionManager {
private Future<?> currentRegionCleanerFuture;

// Map<RegionId, RegionGroupCache>
private final Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap;
private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;

public PartitionManager(IManager configManager, PartitionInfo partitionInfo) {
this.configManager = configManager;
Expand Down Expand Up @@ -460,7 +462,31 @@ public TSeriesPartitionSlot getSeriesPartitionSlot(String devicePath) {
}

public DataSet getRegionInfoList(GetRegionInfoListPlan req) {
return getConsensusManager().read(req).getDataset();
// Get static result
RegionInfoListResp regionInfoListResp =
(RegionInfoListResp) getConsensusManager().read(req).getDataset();
Map<TConsensusGroupId, Integer> allLeadership = getAllLeadership();

// Get cached result
regionInfoListResp
.getRegionInfoList()
.forEach(
regionInfo -> {
regionInfo.setStatus(
regionGroupCacheMap
.get(regionInfo.getConsensusGroupId())
.getRegionStatus(regionInfo.getDataNodeId())
.getStatus());

String regionType =
regionInfo.getDataNodeId()
== allLeadership.getOrDefault(regionInfo.getConsensusGroupId(), -1)
? RegionRoleType.Leader.toString()
: RegionRoleType.Follower.toString();
regionInfo.setRoleType(regionType);
});

return regionInfoListResp;
}

/**
Expand Down Expand Up @@ -540,7 +566,7 @@ public void stopRegionCleaner() {
}
}

public Map<TConsensusGroupId, IRegionGroupCache> getRegionGroupCacheMap() {
public Map<TConsensusGroupId, RegionGroupCache> getRegionGroupCacheMap() {
return regionGroupCacheMap;
}

Expand All @@ -549,12 +575,16 @@ public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
}

/**
* Get the leadership of each RegionGroup If a node is in unknown or removing status, this node
* can't be leader
* Get the leadership of each RegionGroup.
*
* @return Map<RegionGroupId, leader location>
* @return Map<RegionGroupId, DataNodeId where the leader located>
* <p>Some RegionGroups that supposed to be occurred in the result map might be nonexistent
* and some leaderId might be -1(leader unknown yet) due to heartbeat latency
*/
public Map<TConsensusGroupId, Integer> getAllLeadership() {

// TODO: Will be optimized by IOTDB-4341

Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
if (ConfigNodeDescriptor.getInstance()
.getConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private void updateNodeLoadStatistic() {
.values()
.forEach(
regionGroupCache -> {
boolean updateResult = regionGroupCache.updateLoadStatistic();
boolean updateResult = regionGroupCache.updateRegionStatistics();
switch (regionGroupCache.getConsensusGroupId().getType()) {
// Check if some RegionGroups change their leader
case SchemaRegion:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
public abstract class BaseNodeCache {

/** When the response time of heartbeat is more than 20s, the node is considered as down */
static final int HEARTBEAT_TIMEOUT_TIME = 20_000;
public static final int HEARTBEAT_TIMEOUT_TIME = 20_000;

/** Max heartbeat cache samples store size */
static final int MAXIMUM_WINDOW_SIZE = 100;
public static final int MAXIMUM_WINDOW_SIZE = 100;

/** SlidingWindow stores the heartbeat sample data */
final LinkedList<NodeHeartbeatSample> slidingWindow = new LinkedList<>();
Expand Down

This file was deleted.

Loading

0 comments on commit 3348a06

Please sign in to comment.