From 2e93bc6b361b5e34cb19dc2757ffe3c3b44c56de Mon Sep 17 00:00:00 2001 From: YongzaoDan <33111881+CRZbulabula@users.noreply.github.com> Date: Tue, 13 Sep 2022 15:34:13 +0800 Subject: [PATCH] [IOTDB-4377] Fix TTimePartitionSlot count and DataPartition inherit policy bug (#7287) --- .../confignode/manager/ConfigManager.java | 2 +- .../confignode/manager/PartitionManager.java | 17 +- .../partition/GreedyPartitionAllocator.java | 93 +++++++--- .../persistence/partition/PartitionInfo.java | 9 +- .../persistence/partition/RegionGroup.java | 68 ++++++-- .../partition/StorageGroupPartitionTable.java | 88 +++++----- .../IoTDBClusterPartitionTableTest.java | 160 +++++++++++------- .../{ => confignode}/IoTDBConfigNodeIT.java | 2 +- .../commons/partition/DataPartitionTable.java | 14 +- .../partition/SchemaPartitionTable.java | 18 +- .../partition/SeriesPartitionTable.java | 19 ++- 11 files changed, 312 insertions(+), 178 deletions(-) rename integration-test/src/test/java/org/apache/iotdb/db/it/{ => confignode}/IoTDBClusterPartitionTableTest.java (69%) rename integration-test/src/test/java/org/apache/iotdb/db/it/{ => confignode}/IoTDBConfigNodeIT.java (99%) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index dcf3149530ef..f6136d558d17 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -799,7 +799,7 @@ public RegionInfoListResp showRegion(GetRegionInfoListPlan getRegionInfoListPlan if (!allLeadership.isEmpty()) { String regionType = regionInfo.getDataNodeId() - == allLeadership.get(regionInfo.getConsensusGroupId()) + == allLeadership.getOrDefault(regionInfo.getConsensusGroupId(), -1) ? RegionRoleType.Leader.toString() : RegionRoleType.Follower.toString(); regionInfo.setRoleType(regionType); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java index 5808989e011a..cffdc97c8d2e 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java @@ -72,7 +72,6 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** The PartitionManager Manages cluster PartitionTable read and write requests. */ @@ -237,16 +236,9 @@ public DataSet getOrCreateDataPartition(GetOrCreateDataPartitionPlan req) { // Map Map unassignedDataPartitionSlotsCountMap = new ConcurrentHashMap<>(); unassignedDataPartitionSlotsMap.forEach( - (storageGroup, unassignedDataPartitionSlots) -> { - AtomicInteger unassignedDataPartitionSlotsCount = new AtomicInteger(0); - unassignedDataPartitionSlots - .values() - .forEach( - timePartitionSlots -> - unassignedDataPartitionSlotsCount.getAndAdd(timePartitionSlots.size())); - unassignedDataPartitionSlotsCountMap.put( - storageGroup, unassignedDataPartitionSlotsCount.get()); - }); + (storageGroup, unassignedDataPartitionSlots) -> + unassignedDataPartitionSlotsCountMap.put( + storageGroup, unassignedDataPartitionSlots.size())); TSStatus status = extendRegionsIfNecessary( unassignedDataPartitionSlotsCountMap, TConsensusGroupType.DataRegion); @@ -302,7 +294,8 @@ private TSStatus extendRegionsIfNecessary( float allocatedRegionCount = partitionInfo.getRegionCount(entry.getKey(), consensusGroupType); // The slotCount equals to the sum of assigned slot count and unassigned slot count - float slotCount = partitionInfo.getSlotCount(entry.getKey()) + entry.getValue(); + float slotCount = + partitionInfo.getAssignedSeriesPartitionSlotsCount(entry.getKey()) + entry.getValue(); float maxRegionCount = getClusterSchemaManager().getMaxRegionGroupCount(entry.getKey(), consensusGroupType); float maxSlotCount = diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java index 7feeeb2a8068..3f4e230154e4 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java @@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.utils.Pair; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -38,6 +39,9 @@ /** Allocating new Partitions by greedy algorithm */ public class GreedyPartitionAllocator implements IPartitionAllocator { + private static final long TIME_PARTITION_INTERVAL = + ConfigNodeDescriptor.getInstance().getConf().getTimePartitionInterval(); + private final IManager configManager; public GreedyPartitionAllocator(IManager configManager) { @@ -63,7 +67,7 @@ public Map allocateSchemaPartition( // Greedy allocation schemaPartitionMap.put(seriesPartitionSlot, regionSlotsCounter.get(0).getRight()); // Bubble sort - bubbleSort(regionSlotsCounter); + bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter); } result.put(storageGroup, new SchemaPartitionTable(schemaPartitionMap)); }); @@ -84,46 +88,91 @@ public Map allocateDataPartition( getPartitionManager() .getSortedRegionSlotsCounter(storageGroup, TConsensusGroupType.DataRegion); + DataPartitionTable dataPartitionTable = new DataPartitionTable(); + // Enumerate SeriesPartitionSlot - Map dataPartitionMap = - new ConcurrentHashMap<>(); for (Map.Entry> seriesPartitionEntry : unassignedPartitionSlotsMap.entrySet()) { - // Enumerate TimePartitionSlot - Map> seriesPartitionMap = - new ConcurrentHashMap<>(); - for (TTimePartitionSlot timePartitionSlot : seriesPartitionEntry.getValue()) { + SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable(); + + // Enumerate TimePartitionSlot in ascending order + List timePartitionSlots = seriesPartitionEntry.getValue(); + timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime)); + for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) { + + /* Check if the current DataPartition has predecessor firstly, and inherit it if exists */ + + // Check if the current Partition's predecessor is allocated + // in the same batch of Partition creation TConsensusGroupId predecessor = + seriesPartitionTable.getPrecededDataPartition( + timePartitionSlot, TIME_PARTITION_INTERVAL); + if (predecessor != null) { + seriesPartitionTable + .getSeriesPartitionMap() + .put(timePartitionSlot, Collections.singletonList(predecessor)); + bubbleSort(predecessor, regionSlotsCounter); + continue; + } + + // Check if the current Partition's predecessor was allocated + // in the former Partition creation + predecessor = getPartitionManager() .getPrecededDataPartition( storageGroup, seriesPartitionEntry.getKey(), timePartitionSlot, - ConfigNodeDescriptor.getInstance().getConf().getTimePartitionInterval()); + TIME_PARTITION_INTERVAL); if (predecessor != null) { - // For DataPartition allocation, we consider predecessor first - seriesPartitionMap.put(timePartitionSlot, Collections.singletonList(predecessor)); - } else { - // Greedy allocation - seriesPartitionMap.put( - timePartitionSlot, - Collections.singletonList(regionSlotsCounter.get(0).getRight())); - // Bubble sort - bubbleSort(regionSlotsCounter); + seriesPartitionTable + .getSeriesPartitionMap() + .put(timePartitionSlot, Collections.singletonList(predecessor)); + bubbleSort(predecessor, regionSlotsCounter); + continue; } + + /* Greedy allocation */ + seriesPartitionTable + .getSeriesPartitionMap() + .put( + timePartitionSlot, + Collections.singletonList(regionSlotsCounter.get(0).getRight())); + bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter); } - dataPartitionMap.put( - seriesPartitionEntry.getKey(), new SeriesPartitionTable(seriesPartitionMap)); + dataPartitionTable + .getDataPartitionMap() + .put(seriesPartitionEntry.getKey(), seriesPartitionTable); } - result.put(storageGroup, new DataPartitionTable(dataPartitionMap)); + result.put(storageGroup, dataPartitionTable); }); return result; } - private void bubbleSort(List> regionSlotsCounter) { + /** + * Bubble sort the regionSlotsCounter from the specified consensus group + * + *

Notice: Here we use bubble sort instead of other sorting algorithm is because that, there is + * only one Partition allocated in each loop. Therefore, only consider one consensus group weight + * change is enough + * + * @param consensusGroupId The consensus group where the new Partition is allocated + * @param regionSlotsCounter List> + */ + private void bubbleSort( + TConsensusGroupId consensusGroupId, List> regionSlotsCounter) { + // Find the corresponding consensus group int index = 0; - regionSlotsCounter.get(0).setLeft(regionSlotsCounter.get(0).getLeft() + 1); + for (int i = 0; i < regionSlotsCounter.size(); i++) { + if (regionSlotsCounter.get(i).getRight().equals(consensusGroupId)) { + index = i; + break; + } + } + + // Do bubble sort + regionSlotsCounter.get(index).setLeft(regionSlotsCounter.get(index).getLeft() + 1); while (index < regionSlotsCounter.size() - 1 && regionSlotsCounter.get(index).getLeft() > regionSlotsCounter.get(index + 1).getLeft()) { Collections.swap(regionSlotsCounter, index, index + 1); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index c757b9f8ba98..7cd539b456ce 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -80,6 +80,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -441,7 +442,7 @@ public DataSet getSchemaNodeManagementPartition(List matchedStorageGroup /** Get region information */ public DataSet getRegionInfoList(GetRegionInfoListPlan regionsInfoPlan) { RegionInfoListResp regionResp = new RegionInfoListResp(); - List regionInfoList = new ArrayList<>(); + List regionInfoList = new Vector<>(); if (storageGroupPartitionTables.isEmpty()) { regionResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); regionResp.setRegionInfoList(new ArrayList<>()); @@ -455,7 +456,7 @@ public DataSet getRegionInfoList(GetRegionInfoListPlan regionsInfoPlan) { if (storageGroups != null && !storageGroups.contains(storageGroup)) { return; } - storageGroupPartitionTable.getRegionInfoList(regionsInfoPlan, regionInfoList); + regionInfoList.addAll(storageGroupPartitionTable.getRegionInfoList(regionsInfoPlan)); }); regionInfoList.sort( Comparator.comparingInt(regionId -> regionId.getConsensusGroupId().getId())); @@ -588,8 +589,8 @@ public int getRegionCount(String storageGroup, TConsensusGroupType type) return storageGroupPartitionTables.get(storageGroup).getRegionGroupCount(type); } - public int getSlotCount(String storageGroup) { - return storageGroupPartitionTables.get(storageGroup).getSlotsCount(); + public int getAssignedSeriesPartitionSlotsCount(String storageGroup) { + return storageGroupPartitionTables.get(storageGroup).getAssignedSeriesPartitionSlotsCount(); } /** diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java index 4f0c4298b272..b4930f078b5c 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java @@ -20,6 +20,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.thrift.TException; @@ -28,27 +29,34 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; public class RegionGroup { private final TRegionReplicaSet replicaSet; + // Map + // For SchemaRegion, each SeriesSlot constitute a SchemaPartition. // For DataRegion, a SeriesSlot and a TimeSlot constitute a DataPartition. // Eg: A DataRegion contains SeriesSlot-1 which has TimeSlot-1, TimeSlot-2 and Timeslot-3, // then (SeriesSlot-1 -> TimeSlot-1) constitute a DataPartition. - // For SchemaRegion, each SeriesSlot constitute a SchemaPartition. - private final AtomicLong slotCount; + private final Map slotCountMap; + + private final AtomicLong totalTimeSlotCount; public RegionGroup() { this.replicaSet = new TRegionReplicaSet(); - this.slotCount = new AtomicLong(); + this.slotCountMap = new ConcurrentHashMap<>(); + this.totalTimeSlotCount = new AtomicLong(); } public RegionGroup(TRegionReplicaSet replicaSet) { this.replicaSet = replicaSet; - this.slotCount = new AtomicLong(0); + this.slotCountMap = new ConcurrentHashMap<>(); + this.totalTimeSlotCount = new AtomicLong(0); } public TConsensusGroupId getId() { @@ -59,24 +67,51 @@ public TRegionReplicaSet getReplicaSet() { return replicaSet; } - public void addCounter(long delta) { - slotCount.getAndAdd(delta); + /** @param deltaMap Map */ + public void updateSlotCountMap(Map deltaMap) { + deltaMap.forEach( + ((seriesPartitionSlot, delta) -> { + slotCountMap + .computeIfAbsent(seriesPartitionSlot, empty -> new AtomicLong(0)) + .getAndAdd(delta.get()); + totalTimeSlotCount.getAndAdd(delta.get()); + })); + } + + public long getSeriesSlotCount() { + return slotCountMap.size(); } - public long getCounter() { - return slotCount.get(); + public long getTimeSlotCount() { + return totalTimeSlotCount.get(); } public void serialize(OutputStream outputStream, TProtocol protocol) throws IOException, TException { replicaSet.write(protocol); - ReadWriteIOUtils.write(slotCount.get(), outputStream); + + ReadWriteIOUtils.write(slotCountMap.size(), outputStream); + for (Map.Entry slotCountEntry : slotCountMap.entrySet()) { + slotCountEntry.getKey().write(protocol); + ReadWriteIOUtils.write(slotCountEntry.getValue().get(), outputStream); + } + + ReadWriteIOUtils.write(totalTimeSlotCount.get(), outputStream); } public void deserialize(InputStream inputStream, TProtocol protocol) throws IOException, TException { replicaSet.read(protocol); - slotCount.set(ReadWriteIOUtils.readLong(inputStream)); + + int size = ReadWriteIOUtils.readInt(inputStream); + for (int i = 0; i < size; i++) { + TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(); + seriesPartitionSlot.read(protocol); + AtomicLong slotCount = new AtomicLong(ReadWriteIOUtils.readLong(inputStream)); + slotCountMap.put(seriesPartitionSlot, slotCount); + } + + totalTimeSlotCount.set(ReadWriteIOUtils.readLong(inputStream)); } @Override @@ -84,11 +119,20 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; RegionGroup that = (RegionGroup) o; - return replicaSet.equals(that.replicaSet); + for (Map.Entry slotCountEntry : slotCountMap.entrySet()) { + if (!that.slotCountMap.containsKey(slotCountEntry.getKey())) { + return false; + } + if (slotCountEntry.getValue().get() != that.slotCountMap.get(slotCountEntry.getKey()).get()) { + return false; + } + } + return replicaSet.equals(that.replicaSet) + && totalTimeSlotCount.get() == that.totalTimeSlotCount.get(); } @Override public int hashCode() { - return Objects.hash(replicaSet); + return Objects.hash(replicaSet, slotCountMap, totalTimeSlotCount); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java index aadabe9c1d71..f527ea12f9d4 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java @@ -51,6 +51,7 @@ import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public class StorageGroupPartitionTable { private static final Logger LOGGER = LoggerFactory.getLogger(StorageGroupPartitionTable.class); @@ -59,10 +60,6 @@ public class StorageGroupPartitionTable { // The name of storage group private String storageGroupName; - // Total number of SeriesPartitionSlots occupied by schema, - // determines whether a new Region needs to be created - private final AtomicInteger seriesPartitionSlotsCount; - // Region private final Map regionGroupMap; // SchemaPartition @@ -72,7 +69,6 @@ public class StorageGroupPartitionTable { public StorageGroupPartitionTable(String storageGroupName) { this.storageGroupName = storageGroupName; - this.seriesPartitionSlotsCount = new AtomicInteger(0); this.regionGroupMap = new ConcurrentHashMap<>(); @@ -155,8 +151,10 @@ public int getRegionGroupCount(TConsensusGroupType type) { return result.getAndIncrement(); } - public int getSlotsCount() { - return seriesPartitionSlotsCount.get(); + public int getAssignedSeriesPartitionSlotsCount() { + return Math.max( + schemaPartitionTable.getSchemaPartitionMap().size(), + dataPartitionTable.getDataPartitionMap().size()); } /** @@ -207,17 +205,14 @@ public TConsensusGroupId getPrecededDataPartition( */ public void createSchemaPartition(SchemaPartitionTable assignedSchemaPartition) { // Cache assigned result - Map deltaMap = + // Map> + Map> groupDeltaMap = schemaPartitionTable.createSchemaPartition(assignedSchemaPartition); - // Add counter - AtomicInteger total = new AtomicInteger(0); - deltaMap.forEach( - ((consensusGroupId, delta) -> { - total.getAndAdd(delta.get()); - regionGroupMap.get(consensusGroupId).addCounter(delta.get()); - })); - seriesPartitionSlotsCount.getAndAdd(total.get()); + // Update counter + groupDeltaMap.forEach( + ((consensusGroupId, deltaMap) -> + regionGroupMap.get(consensusGroupId).updateSlotCountMap(deltaMap))); } /** @@ -227,16 +222,14 @@ public void createSchemaPartition(SchemaPartitionTable assignedSchemaPartition) */ public void createDataPartition(DataPartitionTable assignedDataPartition) { // Cache assigned result - Map deltaMap = + // Map> + Map> groupDeltaMap = dataPartitionTable.createDataPartition(assignedDataPartition); - // Add counter - AtomicInteger total = new AtomicInteger(0); - deltaMap.forEach( - ((consensusGroupId, delta) -> { - total.getAndAdd(delta.get()); - regionGroupMap.get(consensusGroupId).addCounter(delta.get()); - })); + // Update counter + groupDeltaMap.forEach( + ((consensusGroupId, deltaMap) -> + regionGroupMap.get(consensusGroupId).updateSlotCountMap(deltaMap))); } /** @@ -293,7 +286,7 @@ public List> getSortedRegionGroupSlotsCounter( regionGroupMap.forEach( (consensusGroupId, regionGroup) -> { if (consensusGroupId.getType().equals(type)) { - result.add(new Pair<>(regionGroup.getCounter(), consensusGroupId)); + result.add(new Pair<>(regionGroup.getSeriesSlotCount(), consensusGroupId)); } }); @@ -301,51 +294,51 @@ public List> getSortedRegionGroupSlotsCounter( return result; } - public void getRegionInfoList( - GetRegionInfoListPlan regionsInfoPlan, List regionInfoList) { + public List getRegionInfoList(GetRegionInfoListPlan regionsInfoPlan) { + List regionInfoList = new Vector<>(); final TShowRegionReq showRegionReq = regionsInfoPlan.getShowRegionReq(); + regionGroupMap.forEach( (consensusGroupId, regionGroup) -> { - TRegionReplicaSet replicaSet = regionGroup.getReplicaSet(); if (showRegionReq == null || showRegionReq.getConsensusGroupType() == null) { - buildTRegionsInfo(regionInfoList, replicaSet, regionGroup); - } else if (regionsInfoPlan.getShowRegionReq().getConsensusGroupType().ordinal() - == replicaSet.getRegionId().getType().ordinal()) { - buildTRegionsInfo(regionInfoList, replicaSet, regionGroup); + regionInfoList.addAll(buildRegionInfoList(regionGroup)); + } else if (showRegionReq.getConsensusGroupType().equals(regionGroup.getId().getType())) { + regionInfoList.addAll(buildRegionInfoList(regionGroup)); } }); + + return regionInfoList; } - private void buildTRegionsInfo( - List regionInfoList, TRegionReplicaSet replicaSet, RegionGroup regionGroup) { - replicaSet + private List buildRegionInfoList(RegionGroup regionGroup) { + List regionInfoList = new Vector<>(); + final TConsensusGroupId regionId = regionGroup.getId(); + + regionGroup + .getReplicaSet() .getDataNodeLocations() .forEach( (dataNodeLocation) -> { TRegionInfo regionInfo = new TRegionInfo(); - regionInfo.setConsensusGroupId(replicaSet.getRegionId()); + regionInfo.setConsensusGroupId(regionId); regionInfo.setStorageGroup(storageGroupName); - if (replicaSet.getRegionId().getType() == TConsensusGroupType.DataRegion) { - regionInfo.setSeriesSlots(dataPartitionTable.getDataPartitionMap().size()); - regionInfo.setTimeSlots(regionGroup.getCounter()); - } else if (replicaSet.getRegionId().getType() == TConsensusGroupType.SchemaRegion) { - regionInfo.setSeriesSlots(regionGroup.getCounter()); - regionInfo.setTimeSlots(0); - } + regionInfo.setSeriesSlots(regionGroup.getSeriesSlotCount()); + regionInfo.setTimeSlots(regionGroup.getTimeSlotCount()); regionInfo.setDataNodeId(dataNodeLocation.getDataNodeId()); regionInfo.setClientRpcIp(dataNodeLocation.getClientRpcEndPoint().getIp()); regionInfo.setClientRpcPort(dataNodeLocation.getClientRpcEndPoint().getPort()); - // TODO: Wait for data migration. And then add the state + // TODO: Maintain Region status regionInfo.setStatus(RegionStatus.Up.getStatus()); regionInfoList.add(regionInfo); }); + + return regionInfoList; } public void serialize(OutputStream outputStream, TProtocol protocol) throws IOException, TException { ReadWriteIOUtils.write(isPredeleted, outputStream); ReadWriteIOUtils.write(storageGroupName, outputStream); - ReadWriteIOUtils.write(seriesPartitionSlotsCount.get(), outputStream); ReadWriteIOUtils.write(regionGroupMap.size(), outputStream); for (Map.Entry regionInfoEntry : regionGroupMap.entrySet()) { @@ -361,7 +354,6 @@ public void deserialize(InputStream inputStream, TProtocol protocol) throws IOException, TException { isPredeleted = ReadWriteIOUtils.readBool(inputStream); storageGroupName = ReadWriteIOUtils.readString(inputStream); - seriesPartitionSlotsCount.set(ReadWriteIOUtils.readInt(inputStream)); int length = ReadWriteIOUtils.readInt(inputStream); for (int i = 0; i < length; i++) { @@ -445,7 +437,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; StorageGroupPartitionTable that = (StorageGroupPartitionTable) o; - return isPredeleted == that.isPredeleted + return storageGroupName.equals(that.storageGroupName) && regionGroupMap.equals(that.regionGroupMap) && schemaPartitionTable.equals(that.schemaPartitionTable) && dataPartitionTable.equals(that.dataPartitionTable); @@ -453,6 +445,6 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(isPredeleted, regionGroupMap, schemaPartitionTable, dataPartitionTable); + return Objects.hash(storageGroupName, regionGroupMap, schemaPartitionTable, dataPartitionTable); } } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBClusterPartitionTableTest.java b/integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBClusterPartitionTableTest.java similarity index 69% rename from integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBClusterPartitionTableTest.java rename to integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBClusterPartitionTableTest.java index 2ce04424272f..63943d2b12ef 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBClusterPartitionTableTest.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBClusterPartitionTableTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.it; +package org.apache.iotdb.db.it.confignode; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TSStatus; @@ -30,6 +30,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq; import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp; import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema; import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree; import org.apache.iotdb.it.env.ConfigFactory; @@ -54,7 +56,6 @@ import java.util.List; import java.util.Map; -// TODO: @MiniSho Move this test into org.apache.iotdb.db.it.confignode package @RunWith(IoTDBTestRunner.class) @Category({ClusterIT.class}) public class IoTDBClusterPartitionTableTest { @@ -68,8 +69,10 @@ public class IoTDBClusterPartitionTableTest { private static final long testTimePartitionInterval = 86400; private static final String sg = "root.sg"; private static final int storageGroupNum = 5; - private static final int seriesPartitionSlotsNum = 10; - private static final int timePartitionSlotsNum = 100; + private static final int seriesPartitionSlotsNum = 10000; + private static final int seriesPartitionBatchSize = 1000; + private static final int timePartitionSlotsNum = 10; + private static final int timePartitionBatchSize = 10; @Before public void setUp() throws Exception { @@ -206,21 +209,22 @@ public void testGetAndCreateSchemaPartition() } private Map>> - constructPartitionSlotsMap() { - final String sg = "root.sg"; + constructPartitionSlotsMap( + String storageGroup, + int seriesSlotStart, + int seriesSlotEnd, + long timeSlotStart, + long timeSlotEnd) { Map>> result = new HashMap<>(); - - for (int i = 0; i < storageGroupNum; i++) { - String storageGroup = sg + i; - result.put(storageGroup, new HashMap<>()); - for (int j = 0; j < seriesPartitionSlotsNum; j++) { - TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(j); - result.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>()); - for (long k = 0; k < timePartitionSlotsNum; k++) { - TTimePartitionSlot timePartitionSlot = - new TTimePartitionSlot(k * testTimePartitionInterval); - result.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot); - } + result.put(storageGroup, new HashMap<>()); + + for (int i = seriesSlotStart; i < seriesSlotEnd; i++) { + TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); + result.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>()); + for (long j = timeSlotStart; j < timeSlotEnd; j++) { + TTimePartitionSlot timePartitionSlot = + new TTimePartitionSlot(j * testTimePartitionInterval); + result.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot); } } @@ -228,32 +232,36 @@ public void testGetAndCreateSchemaPartition() } private void checkDataPartitionMap( + String storageGroup, + int seriesSlotStart, + int seriesSlotEnd, + long timeSlotStart, + long timeSlotEnd, Map>>> dataPartitionTable) { - Assert.assertEquals(storageGroupNum, dataPartitionTable.size()); - for (int i = 0; i < storageGroupNum; i++) { - String storageGroup = sg + i; - Assert.assertTrue(dataPartitionTable.containsKey(storageGroup)); - Assert.assertEquals(seriesPartitionSlotsNum, dataPartitionTable.get(storageGroup).size()); - for (int j = 0; j < seriesPartitionSlotsNum; j++) { - TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(j); - Assert.assertTrue(dataPartitionTable.get(storageGroup).containsKey(seriesPartitionSlot)); - Assert.assertEquals( - timePartitionSlotsNum, - dataPartitionTable.get(storageGroup).get(seriesPartitionSlot).size()); - - Map> timePartitionSlotMap = - dataPartitionTable.get(storageGroup).get(seriesPartitionSlot); - for (long k = 0; k < timePartitionSlotsNum; k++) { - TTimePartitionSlot timePartitionSlot = - new TTimePartitionSlot(k * testTimePartitionInterval); - Assert.assertTrue(timePartitionSlotMap.containsKey(timePartitionSlot)); - if (k > 0) { - // Check consistency - Assert.assertEquals( - timePartitionSlotMap.get(new TTimePartitionSlot(0)), - timePartitionSlotMap.get(timePartitionSlot)); - } + + Assert.assertTrue(dataPartitionTable.containsKey(storageGroup)); + Map>> + seriesPartitionTable = dataPartitionTable.get(storageGroup); + Assert.assertEquals(seriesPartitionBatchSize, seriesPartitionTable.size()); + + for (int i = seriesSlotStart; i < seriesSlotEnd; i++) { + TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); + Assert.assertTrue(seriesPartitionTable.containsKey(seriesPartitionSlot)); + Map> timePartitionTable = + seriesPartitionTable.get(seriesPartitionSlot); + Assert.assertEquals(timePartitionBatchSize, timePartitionTable.size()); + + for (long j = timeSlotStart; j < timeSlotEnd; j++) { + TTimePartitionSlot timePartitionSlot = + new TTimePartitionSlot(j * testTimePartitionInterval); + Assert.assertTrue(timePartitionTable.containsKey(timePartitionSlot)); + if (j > timeSlotStart) { + // Check consistency + Assert.assertEquals( + timePartitionTable.get( + new TTimePartitionSlot(timeSlotStart * testTimePartitionInterval)), + timePartitionTable.get(timePartitionSlot)); } } } @@ -269,7 +277,7 @@ public void testGetAndCreateDataPartition() throws TException, IOException { // Prepare partitionSlotsMap Map>> partitionSlotsMap = - constructPartitionSlotsMap(); + constructPartitionSlotsMap(sg + 0, 0, 10, 0, 10); // Set StorageGroups for (int i = 0; i < storageGroupNum; i++) { @@ -287,22 +295,58 @@ public void testGetAndCreateDataPartition() throws TException, IOException { Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable()); Assert.assertEquals(0, dataPartitionTableResp.getDataPartitionTableSize()); - // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return - dataPartitionTableResp = client.getOrCreateDataPartitionTable(dataPartitionReq); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), - dataPartitionTableResp.getStatus().getCode()); - Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable()); - checkDataPartitionMap(dataPartitionTableResp.getDataPartitionTable()); + for (int i = 0; i < storageGroupNum; i++) { + String storageGroup = sg + i; + for (int j = 0; j < seriesPartitionSlotsNum; j += seriesPartitionBatchSize) { + for (long k = 0; k < timePartitionSlotsNum; k += timePartitionBatchSize) { + partitionSlotsMap = + constructPartitionSlotsMap( + storageGroup, j, j + seriesPartitionBatchSize, k, k + timePartitionBatchSize); + + // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return + dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap); + dataPartitionTableResp = client.getOrCreateDataPartitionTable(dataPartitionReq); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + dataPartitionTableResp.getStatus().getCode()); + Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable()); + checkDataPartitionMap( + storageGroup, + j, + j + seriesPartitionBatchSize, + k, + k + timePartitionBatchSize, + dataPartitionTableResp.getDataPartitionTable()); + + // Test getDataPartition, the result should only contain DataPartition created before + dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap); + dataPartitionTableResp = client.getDataPartitionTable(dataPartitionReq); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + dataPartitionTableResp.getStatus().getCode()); + Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable()); + checkDataPartitionMap( + storageGroup, + j, + j + seriesPartitionBatchSize, + k, + k + timePartitionBatchSize, + dataPartitionTableResp.getDataPartitionTable()); + } + } + } - // Test getDataPartition, the result should only contain DataPartition created before - dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap); - dataPartitionTableResp = client.getDataPartitionTable(dataPartitionReq); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), - dataPartitionTableResp.getStatus().getCode()); - Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable()); - checkDataPartitionMap(dataPartitionTableResp.getDataPartitionTable()); + // Test DataPartition inherit policy + TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq()); + showRegionResp + .getRegionInfoList() + .forEach( + regionInfo -> { + // Normally, all Timeslots belonging to the same SeriesSlot are allocated to the + // same DataRegionGroup + Assert.assertEquals( + regionInfo.getSeriesSlots() * timePartitionSlotsNum, regionInfo.getTimeSlots()); + }); } } } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBConfigNodeIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBConfigNodeIT.java similarity index 99% rename from integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBConfigNodeIT.java rename to integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBConfigNodeIT.java index 8bc3f7956e4f..ebdc7f80d4ff 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBConfigNodeIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBConfigNodeIT.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.it; +package org.apache.iotdb.db.it.confignode; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java index 9322474cb415..ff3a75feb221 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java @@ -36,7 +36,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public class DataPartitionTable { @@ -119,11 +119,12 @@ public TConsensusGroupId getPrecededDataPartition( * Create DataPartition within the specific StorageGroup * * @param assignedDataPartition Assigned result - * @return Number of DataPartitions added to each Region + * @return Map> */ - public Map createDataPartition( + public Map> createDataPartition( DataPartitionTable assignedDataPartition) { - Map deltaMap = new ConcurrentHashMap<>(); + Map> groupDeltaMap = + new ConcurrentHashMap<>(); assignedDataPartition .getDataPartitionMap() @@ -131,9 +132,10 @@ public Map createDataPartition( ((seriesPartitionSlot, seriesPartitionTable) -> dataPartitionMap .computeIfAbsent(seriesPartitionSlot, empty -> new SeriesPartitionTable()) - .createDataPartition(seriesPartitionTable, deltaMap))); + .createDataPartition( + seriesPartitionTable, seriesPartitionSlot, groupDeltaMap))); - return deltaMap; + return groupDeltaMap; } /** diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionTable.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionTable.java index e13f7bfc953c..61b9189e16f3 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionTable.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionTable.java @@ -36,7 +36,7 @@ import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public class SchemaPartitionTable { @@ -89,23 +89,25 @@ public boolean getSchemaPartition( * Create SchemaPartition within the specific StorageGroup * * @param assignedSchemaPartition assigned result - * @return Number of SchemaPartitions added to each Region + * @return Map> */ - public Map createSchemaPartition( + public Map> createSchemaPartition( SchemaPartitionTable assignedSchemaPartition) { - Map deltaMap = new ConcurrentHashMap<>(); + Map> groupDeltaMap = + new ConcurrentHashMap<>(); assignedSchemaPartition .getSchemaPartitionMap() .forEach( ((seriesPartitionSlot, consensusGroupId) -> { schemaPartitionMap.put(seriesPartitionSlot, consensusGroupId); - deltaMap - .computeIfAbsent(consensusGroupId, empty -> new AtomicInteger(0)) - .getAndIncrement(); + groupDeltaMap + .computeIfAbsent(consensusGroupId, empty -> new ConcurrentHashMap<>()) + .put(seriesPartitionSlot, new AtomicLong(0)); })); - return deltaMap; + return groupDeltaMap; } /** diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java index ca2d0697127a..ac599887c1ba 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java @@ -19,6 +19,7 @@ package org.apache.iotdb.commons.partition; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -37,7 +38,7 @@ import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public class SeriesPartitionTable { @@ -112,19 +113,25 @@ public TConsensusGroupId getPrecededDataPartition( * Create DataPartition within the specific SeriesPartitionSlot * * @param assignedSeriesPartitionTable Assigned result - * @param deltaMap Number of DataPartitions added to each Region + * @param seriesPartitionSlot Corresponding TSeriesPartitionSlot + * @param groupDeltaMap Map> */ public void createDataPartition( SeriesPartitionTable assignedSeriesPartitionTable, - Map deltaMap) { + TSeriesPartitionSlot seriesPartitionSlot, + Map> groupDeltaMap) { assignedSeriesPartitionTable .getSeriesPartitionMap() .forEach( ((timePartitionSlot, consensusGroupIds) -> { seriesPartitionMap.put(timePartitionSlot, new Vector<>(consensusGroupIds)); - deltaMap - .computeIfAbsent(consensusGroupIds.get(0), empty -> new AtomicInteger(0)) - .getAndIncrement(); + consensusGroupIds.forEach( + consensusGroupId -> + groupDeltaMap + .computeIfAbsent(consensusGroupId, empty -> new ConcurrentHashMap<>()) + .computeIfAbsent(seriesPartitionSlot, empty -> new AtomicLong(0)) + .getAndIncrement()); })); }