Skip to content

Commit

Permalink
[IOTDB-4377] Fix TTimePartitionSlot count and DataPartition inherit p…
Browse files Browse the repository at this point in the history
…olicy bug (apache#7287)
  • Loading branch information
CRZbulabula authored Sep 13, 2022
1 parent ba7bd5d commit 2e93bc6
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ public RegionInfoListResp showRegion(GetRegionInfoListPlan getRegionInfoListPlan
if (!allLeadership.isEmpty()) {
String regionType =
regionInfo.getDataNodeId()
== allLeadership.get(regionInfo.getConsensusGroupId())
== allLeadership.getOrDefault(regionInfo.getConsensusGroupId(), -1)
? RegionRoleType.Leader.toString()
: RegionRoleType.Follower.toString();
regionInfo.setRoleType(regionType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/** The PartitionManager Manages cluster PartitionTable read and write requests. */
Expand Down Expand Up @@ -237,16 +236,9 @@ public DataSet getOrCreateDataPartition(GetOrCreateDataPartitionPlan req) {
// Map<StorageGroup, unassigned SeriesPartitionSlot count>
Map<String, Integer> unassignedDataPartitionSlotsCountMap = new ConcurrentHashMap<>();
unassignedDataPartitionSlotsMap.forEach(
(storageGroup, unassignedDataPartitionSlots) -> {
AtomicInteger unassignedDataPartitionSlotsCount = new AtomicInteger(0);
unassignedDataPartitionSlots
.values()
.forEach(
timePartitionSlots ->
unassignedDataPartitionSlotsCount.getAndAdd(timePartitionSlots.size()));
unassignedDataPartitionSlotsCountMap.put(
storageGroup, unassignedDataPartitionSlotsCount.get());
});
(storageGroup, unassignedDataPartitionSlots) ->
unassignedDataPartitionSlotsCountMap.put(
storageGroup, unassignedDataPartitionSlots.size()));
TSStatus status =
extendRegionsIfNecessary(
unassignedDataPartitionSlotsCountMap, TConsensusGroupType.DataRegion);
Expand Down Expand Up @@ -302,7 +294,8 @@ private TSStatus extendRegionsIfNecessary(
float allocatedRegionCount =
partitionInfo.getRegionCount(entry.getKey(), consensusGroupType);
// The slotCount equals to the sum of assigned slot count and unassigned slot count
float slotCount = partitionInfo.getSlotCount(entry.getKey()) + entry.getValue();
float slotCount =
partitionInfo.getAssignedSeriesPartitionSlotsCount(entry.getKey()) + entry.getValue();
float maxRegionCount =
getClusterSchemaManager().getMaxRegionGroupCount(entry.getKey(), consensusGroupType);
float maxSlotCount =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@
import org.apache.iotdb.tsfile.utils.Pair;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/** Allocating new Partitions by greedy algorithm */
public class GreedyPartitionAllocator implements IPartitionAllocator {

private static final long TIME_PARTITION_INTERVAL =
ConfigNodeDescriptor.getInstance().getConf().getTimePartitionInterval();

private final IManager configManager;

public GreedyPartitionAllocator(IManager configManager) {
Expand All @@ -63,7 +67,7 @@ public Map<String, SchemaPartitionTable> allocateSchemaPartition(
// Greedy allocation
schemaPartitionMap.put(seriesPartitionSlot, regionSlotsCounter.get(0).getRight());
// Bubble sort
bubbleSort(regionSlotsCounter);
bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter);
}
result.put(storageGroup, new SchemaPartitionTable(schemaPartitionMap));
});
Expand All @@ -84,46 +88,91 @@ public Map<String, DataPartitionTable> allocateDataPartition(
getPartitionManager()
.getSortedRegionSlotsCounter(storageGroup, TConsensusGroupType.DataRegion);

DataPartitionTable dataPartitionTable = new DataPartitionTable();

// Enumerate SeriesPartitionSlot
Map<TSeriesPartitionSlot, SeriesPartitionTable> dataPartitionMap =
new ConcurrentHashMap<>();
for (Map.Entry<TSeriesPartitionSlot, List<TTimePartitionSlot>> seriesPartitionEntry :
unassignedPartitionSlotsMap.entrySet()) {
// Enumerate TimePartitionSlot
Map<TTimePartitionSlot, List<TConsensusGroupId>> seriesPartitionMap =
new ConcurrentHashMap<>();
for (TTimePartitionSlot timePartitionSlot : seriesPartitionEntry.getValue()) {
SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();

// Enumerate TimePartitionSlot in ascending order
List<TTimePartitionSlot> timePartitionSlots = seriesPartitionEntry.getValue();
timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {

/* Check if the current DataPartition has predecessor firstly, and inherit it if exists */

// Check if the current Partition's predecessor is allocated
// in the same batch of Partition creation
TConsensusGroupId predecessor =
seriesPartitionTable.getPrecededDataPartition(
timePartitionSlot, TIME_PARTITION_INTERVAL);
if (predecessor != null) {
seriesPartitionTable
.getSeriesPartitionMap()
.put(timePartitionSlot, Collections.singletonList(predecessor));
bubbleSort(predecessor, regionSlotsCounter);
continue;
}

// Check if the current Partition's predecessor was allocated
// in the former Partition creation
predecessor =
getPartitionManager()
.getPrecededDataPartition(
storageGroup,
seriesPartitionEntry.getKey(),
timePartitionSlot,
ConfigNodeDescriptor.getInstance().getConf().getTimePartitionInterval());
TIME_PARTITION_INTERVAL);
if (predecessor != null) {
// For DataPartition allocation, we consider predecessor first
seriesPartitionMap.put(timePartitionSlot, Collections.singletonList(predecessor));
} else {
// Greedy allocation
seriesPartitionMap.put(
timePartitionSlot,
Collections.singletonList(regionSlotsCounter.get(0).getRight()));
// Bubble sort
bubbleSort(regionSlotsCounter);
seriesPartitionTable
.getSeriesPartitionMap()
.put(timePartitionSlot, Collections.singletonList(predecessor));
bubbleSort(predecessor, regionSlotsCounter);
continue;
}

/* Greedy allocation */
seriesPartitionTable
.getSeriesPartitionMap()
.put(
timePartitionSlot,
Collections.singletonList(regionSlotsCounter.get(0).getRight()));
bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter);
}
dataPartitionMap.put(
seriesPartitionEntry.getKey(), new SeriesPartitionTable(seriesPartitionMap));
dataPartitionTable
.getDataPartitionMap()
.put(seriesPartitionEntry.getKey(), seriesPartitionTable);
}
result.put(storageGroup, new DataPartitionTable(dataPartitionMap));
result.put(storageGroup, dataPartitionTable);
});

return result;
}

private void bubbleSort(List<Pair<Long, TConsensusGroupId>> regionSlotsCounter) {
/**
* Bubble sort the regionSlotsCounter from the specified consensus group
*
* <p>Notice: Here we use bubble sort instead of other sorting algorithm is because that, there is
* only one Partition allocated in each loop. Therefore, only consider one consensus group weight
* change is enough
*
* @param consensusGroupId The consensus group where the new Partition is allocated
* @param regionSlotsCounter List<Pair<Allocated Partition num, TConsensusGroupId>>
*/
private void bubbleSort(
TConsensusGroupId consensusGroupId, List<Pair<Long, TConsensusGroupId>> regionSlotsCounter) {
// Find the corresponding consensus group
int index = 0;
regionSlotsCounter.get(0).setLeft(regionSlotsCounter.get(0).getLeft() + 1);
for (int i = 0; i < regionSlotsCounter.size(); i++) {
if (regionSlotsCounter.get(i).getRight().equals(consensusGroupId)) {
index = i;
break;
}
}

// Do bubble sort
regionSlotsCounter.get(index).setLeft(regionSlotsCounter.get(index).getLeft() + 1);
while (index < regionSlotsCounter.size() - 1
&& regionSlotsCounter.get(index).getLeft() > regionSlotsCounter.get(index + 1).getLeft()) {
Collections.swap(regionSlotsCounter, index, index + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -441,7 +442,7 @@ public DataSet getSchemaNodeManagementPartition(List<String> matchedStorageGroup
/** Get region information */
public DataSet getRegionInfoList(GetRegionInfoListPlan regionsInfoPlan) {
RegionInfoListResp regionResp = new RegionInfoListResp();
List<TRegionInfo> regionInfoList = new ArrayList<>();
List<TRegionInfo> regionInfoList = new Vector<>();
if (storageGroupPartitionTables.isEmpty()) {
regionResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
regionResp.setRegionInfoList(new ArrayList<>());
Expand All @@ -455,7 +456,7 @@ public DataSet getRegionInfoList(GetRegionInfoListPlan regionsInfoPlan) {
if (storageGroups != null && !storageGroups.contains(storageGroup)) {
return;
}
storageGroupPartitionTable.getRegionInfoList(regionsInfoPlan, regionInfoList);
regionInfoList.addAll(storageGroupPartitionTable.getRegionInfoList(regionsInfoPlan));
});
regionInfoList.sort(
Comparator.comparingInt(regionId -> regionId.getConsensusGroupId().getId()));
Expand Down Expand Up @@ -588,8 +589,8 @@ public int getRegionCount(String storageGroup, TConsensusGroupType type)
return storageGroupPartitionTables.get(storageGroup).getRegionGroupCount(type);
}

public int getSlotCount(String storageGroup) {
return storageGroupPartitionTables.get(storageGroup).getSlotsCount();
public int getAssignedSeriesPartitionSlotsCount(String storageGroup) {
return storageGroupPartitionTables.get(storageGroup).getAssignedSeriesPartitionSlotsCount();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

import org.apache.thrift.TException;
Expand All @@ -28,27 +29,34 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

public class RegionGroup {

private final TRegionReplicaSet replicaSet;

// Map<TSeriesPartitionSlot, TTimePartitionSlot Count>
// For SchemaRegion, each SeriesSlot constitute a SchemaPartition.
// For DataRegion, a SeriesSlot and a TimeSlot constitute a DataPartition.
// Eg: A DataRegion contains SeriesSlot-1 which has TimeSlot-1, TimeSlot-2 and Timeslot-3,
// then (SeriesSlot-1 -> TimeSlot-1) constitute a DataPartition.
// For SchemaRegion, each SeriesSlot constitute a SchemaPartition.
private final AtomicLong slotCount;
private final Map<TSeriesPartitionSlot, AtomicLong> slotCountMap;

private final AtomicLong totalTimeSlotCount;

public RegionGroup() {
this.replicaSet = new TRegionReplicaSet();
this.slotCount = new AtomicLong();
this.slotCountMap = new ConcurrentHashMap<>();
this.totalTimeSlotCount = new AtomicLong();
}

public RegionGroup(TRegionReplicaSet replicaSet) {
this.replicaSet = replicaSet;
this.slotCount = new AtomicLong(0);
this.slotCountMap = new ConcurrentHashMap<>();
this.totalTimeSlotCount = new AtomicLong(0);
}

public TConsensusGroupId getId() {
Expand All @@ -59,36 +67,72 @@ public TRegionReplicaSet getReplicaSet() {
return replicaSet;
}

public void addCounter(long delta) {
slotCount.getAndAdd(delta);
/** @param deltaMap Map<TSeriesPartitionSlot, Delta TTimePartitionSlot Count> */
public void updateSlotCountMap(Map<TSeriesPartitionSlot, AtomicLong> deltaMap) {
deltaMap.forEach(
((seriesPartitionSlot, delta) -> {
slotCountMap
.computeIfAbsent(seriesPartitionSlot, empty -> new AtomicLong(0))
.getAndAdd(delta.get());
totalTimeSlotCount.getAndAdd(delta.get());
}));
}

public long getSeriesSlotCount() {
return slotCountMap.size();
}

public long getCounter() {
return slotCount.get();
public long getTimeSlotCount() {
return totalTimeSlotCount.get();
}

public void serialize(OutputStream outputStream, TProtocol protocol)
throws IOException, TException {
replicaSet.write(protocol);
ReadWriteIOUtils.write(slotCount.get(), outputStream);

ReadWriteIOUtils.write(slotCountMap.size(), outputStream);
for (Map.Entry<TSeriesPartitionSlot, AtomicLong> slotCountEntry : slotCountMap.entrySet()) {
slotCountEntry.getKey().write(protocol);
ReadWriteIOUtils.write(slotCountEntry.getValue().get(), outputStream);
}

ReadWriteIOUtils.write(totalTimeSlotCount.get(), outputStream);
}

public void deserialize(InputStream inputStream, TProtocol protocol)
throws IOException, TException {
replicaSet.read(protocol);
slotCount.set(ReadWriteIOUtils.readLong(inputStream));

int size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; i++) {
TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot();
seriesPartitionSlot.read(protocol);
AtomicLong slotCount = new AtomicLong(ReadWriteIOUtils.readLong(inputStream));
slotCountMap.put(seriesPartitionSlot, slotCount);
}

totalTimeSlotCount.set(ReadWriteIOUtils.readLong(inputStream));
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RegionGroup that = (RegionGroup) o;
return replicaSet.equals(that.replicaSet);
for (Map.Entry<TSeriesPartitionSlot, AtomicLong> slotCountEntry : slotCountMap.entrySet()) {
if (!that.slotCountMap.containsKey(slotCountEntry.getKey())) {
return false;
}
if (slotCountEntry.getValue().get() != that.slotCountMap.get(slotCountEntry.getKey()).get()) {
return false;
}
}
return replicaSet.equals(that.replicaSet)
&& totalTimeSlotCount.get() == that.totalTimeSlotCount.get();
}

@Override
public int hashCode() {
return Objects.hash(replicaSet);
return Objects.hash(replicaSet, slotCountMap, totalTimeSlotCount);
}
}
Loading

0 comments on commit 2e93bc6

Please sign in to comment.