Skip to content

Commit

Permalink
[IOTDB-3455] Make data_region_num takes effect in new standalone (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
THUMarkLau authored Sep 14, 2022
1 parent f776aa5 commit 5cc3581
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 124 deletions.
20 changes: 13 additions & 7 deletions server/src/assembly/resources/conf/iotdb-datanode.properties
Original file line number Diff line number Diff line change
Expand Up @@ -414,12 +414,6 @@ timestamp_precision=ms
# Datatype: boolean
# enable_partial_insert=true

# number of data regions per user-defined storage group
# a data region is the unit of parallelism in memory as all ingestions in one data region are serialized
# recommended value is [data region number] = [CPU core number] / [user-defined storage group number]
# Datatype: int
# data_region_num = 1

# the interval to log recover progress of each vsg when starting iotdb
# Datatype: int
# recovery_log_interval_in_ms=5000
Expand Down Expand Up @@ -1110,6 +1104,17 @@ trigger_forward_http_pool_max_per_route=20
trigger_forward_mqtt_pool_size=4



#######################
### LocalConfigNode ###
#######################

# number of data regions per user-defined storage group
# a data region is the unit of parallelism in memory as all ingestions in one data region are serialized
# recommended value is [data region number] = [CPU core number] / [user-defined storage group number]
# Datatype: int
# data_region_num=1

####################
### External Lib Configuration
####################
Expand All @@ -1130,4 +1135,5 @@ trigger_forward_mqtt_pool_size=4
# external_limiter_dir=ext\\limiter
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# external_limiter_dir=ext/limiter
# external_limiter_dir=ext/limiter

Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public class LocalConfigNode {
private static final Logger logger = LoggerFactory.getLogger(LocalConfigNode.class);

private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final long STANDALONE_MOCK_TIME_SLOT_START_TIME = 0L;
public static final long STANDALONE_MOCK_TIME_SLOT_START_TIME = 0L;
private volatile boolean initialized = false;

private ScheduledExecutorService timedForceMLogThread;
Expand All @@ -141,7 +141,7 @@ public class LocalConfigNode {

private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();

private final LocalDataPartitionTable dataPartitionTable = LocalDataPartitionTable.getInstance();
private final LocalDataPartitionInfo dataPartitionInfo = LocalDataPartitionInfo.getInstance();

private final SeriesPartitionExecutor executor =
SeriesPartitionExecutor.getSeriesPartitionExecutor(
Expand Down Expand Up @@ -212,7 +212,7 @@ public synchronized void init() {
if (config.isMppMode() && !config.isClusterMode()) {
Map<String, List<DataRegionId>> recoveredLocalDataRegionInfo =
storageEngine.getLocalDataRegionInfo();
dataPartitionTable.init(recoveredLocalDataRegionInfo);
dataPartitionInfo.init(recoveredLocalDataRegionInfo);
}
} catch (MetadataException | IOException e) {
logger.error(
Expand All @@ -239,7 +239,7 @@ public synchronized void clear() {
storageGroupSchemaManager.clear();
templateManager.clear();

dataPartitionTable.clear();
dataPartitionInfo.clear();

} catch (IOException e) {
logger.error("Error occurred when clearing LocalConfigNode:", e);
Expand Down Expand Up @@ -283,8 +283,8 @@ public void deleteStorageGroup(PartialPath storageGroup) throws MetadataExceptio

if (config.isMppMode() && !config.isClusterMode()) {
deleteDataRegionsInStorageGroup(
dataPartitionTable.getDataRegionIdsByStorageGroup(storageGroup));
dataPartitionTable.deleteStorageGroup(storageGroup);
dataPartitionInfo.getDataRegionIdsByStorageGroup(storageGroup));
dataPartitionInfo.deleteStorageGroup(storageGroup);
}

deleteSchemaRegionsInStorageGroup(
Expand Down Expand Up @@ -367,8 +367,7 @@ private PartialPath ensureStorageGroup(PartialPath path) throws MetadataExceptio

public void setTTL(PartialPath storageGroup, long dataTTL) throws MetadataException, IOException {
if (config.isMppMode() && !config.isClusterMode()) {
storageEngine.setTTL(
dataPartitionTable.getDataRegionIdsByStorageGroup(storageGroup), dataTTL);
storageEngine.setTTL(dataPartitionInfo.getDataRegionIdsByStorageGroup(storageGroup), dataTTL);
}
storageGroupSchemaManager.setTTL(storageGroup, dataTTL);
}
Expand Down Expand Up @@ -855,7 +854,7 @@ public void setUsingSchemaTemplate(ActivateTemplatePlan plan) throws MetadataExc
public DataRegionId getBelongedDataRegionId(PartialPath path)
throws MetadataException, DataRegionException {
PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
DataRegionId dataRegionId = dataPartitionTable.getDataRegionId(storageGroup, path);
DataRegionId dataRegionId = dataPartitionInfo.getDataRegionId(storageGroup, path);
if (dataRegionId == null) {
return null;
}
Expand All @@ -870,13 +869,13 @@ public DataRegionId getBelongedDataRegionId(PartialPath path)
}

// This interface involves storage group and data region auto creation
public DataRegionId getBelongedDataRegionIdWithAutoCreate(PartialPath path)
public DataRegionId getBelongedDataRegionIdWithAutoCreate(PartialPath devicePath)
throws MetadataException, DataRegionException {
PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
DataRegionId dataRegionId = dataPartitionTable.getDataRegionId(storageGroup, path);
PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(devicePath);
DataRegionId dataRegionId = dataPartitionInfo.getDataRegionId(storageGroup, devicePath);
if (dataRegionId == null) {
dataPartitionTable.setDataPartitionInfo(storageGroup);
dataRegionId = dataPartitionTable.getDataRegionId(storageGroup, path);
dataPartitionInfo.registerStorageGroup(storageGroup);
dataRegionId = dataPartitionInfo.allocateDataRegionForNewSlot(storageGroup, devicePath);
}
DataRegion dataRegion = storageEngine.getDataRegion(dataRegionId);
if (dataRegion == null) {
Expand All @@ -885,14 +884,6 @@ public DataRegionId getBelongedDataRegionIdWithAutoCreate(PartialPath path)
return dataRegionId;
}

public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath storageGroup) {
return dataPartitionTable.getDataRegionIdsByStorageGroup(storageGroup);
}

// endregion

// region Interfaces for StandaloneSchemaFetcher

public Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> getSchemaPartition(
PathPatternTree patternTree) {

Expand Down Expand Up @@ -950,8 +941,6 @@ public Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> getOrCreateSche
return partitionSlotsMap;
}

// endregion

// region Interfaces for StandalonePartitionFetcher
public DataPartition getDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap)
Expand Down Expand Up @@ -1023,22 +1012,22 @@ public DataPartition getOrCreateDataPartition(
for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
// for each device
String deviceId = dataPartitionQueryParam.getDevicePath();
DataRegionId dataRegionId =
getBelongedDataRegionIdWithAutoCreate(new PartialPath(deviceId));
Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionToRegionsMap =
deviceToRegionsMap.getOrDefault(
executor.getSeriesPartitionSlot(deviceId), new HashMap<>());
for (TTimePartitionSlot timePartitionSlot :
dataPartitionQueryParam.getTimePartitionSlotList()) {
// for each time partition
List<TTimePartitionSlot> timePartitionSlotList =
dataPartitionQueryParam.getTimePartitionSlotList();
for (TTimePartitionSlot timePartitionSlot : timePartitionSlotList) {
DataRegionId dataRegionId =
getBelongedDataRegionIdWithAutoCreate(new PartialPath(deviceId));
Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionToRegionsMap =
deviceToRegionsMap.getOrDefault(
executor.getSeriesPartitionSlot(deviceId), new HashMap<>());
timePartitionToRegionsMap.put(
timePartitionSlot,
Collections.singletonList(
genStandaloneRegionReplicaSet(
TConsensusGroupType.DataRegion, dataRegionId.getId())));
deviceToRegionsMap.put(
executor.getSeriesPartitionSlot(deviceId), timePartitionToRegionsMap);
}
deviceToRegionsMap.put(
executor.getSeriesPartitionSlot(deviceId), timePartitionToRegionsMap);
}
dataPartitionMap.put(storageGroupName, deviceToRegionsMap);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.localconfignode;

import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

// This class is used for data partition maintaining the map between storage group and
// dataRegionIds.
public class LocalDataPartitionInfo {

// storageGroup -> LocalDataPartitionTable
private Map<PartialPath, LocalDataPartitionTable> partitionTableMap;

private static class LocalDataPartitionTableHolder {
private static final LocalDataPartitionInfo INSTANCE = new LocalDataPartitionInfo();

private LocalDataPartitionTableHolder() {}
}

private LocalDataPartitionInfo() {}

public static LocalDataPartitionInfo getInstance() {
return LocalDataPartitionTableHolder.INSTANCE;
}

public synchronized void init(Map<String, List<DataRegionId>> regionInfos)
throws IllegalPathException {
partitionTableMap = new ConcurrentHashMap<>();
for (Map.Entry<String, List<DataRegionId>> entry : regionInfos.entrySet()) {
String storageGroupName = entry.getKey();
List<DataRegionId> regionIds = entry.getValue();
LocalDataPartitionTable table = new LocalDataPartitionTable(storageGroupName, regionIds);
partitionTableMap.put(new PartialPath(storageGroupName), table);
}
}

public synchronized void clear() {
if (partitionTableMap != null) {
partitionTableMap.clear();
partitionTableMap = null;
}
}

public DataRegionId getDataRegionId(PartialPath storageGroup, PartialPath path) {
if (!partitionTableMap.containsKey(storageGroup)) {
return null;
}
LocalDataPartitionTable table = partitionTableMap.get(storageGroup);
return table.getDataRegionId(path);
}

/**
* Try to allocate a data region for the new time partition slot. This function will try to create
* new data region to make expansion if the existing data regions meet some condition.
*
* @param storageGroup The path for the storage group.
* @param path The full path for the series.
* @return The data region id for the time partition slot.
*/
public DataRegionId allocateDataRegionForNewSlot(PartialPath storageGroup, PartialPath path) {
LocalDataPartitionTable table = partitionTableMap.get(storageGroup);
return table.getDataRegionWithAutoExtension(path);
}

public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath storageGroup) {
if (partitionTableMap.containsKey(storageGroup)) {
LocalDataPartitionTable partitionTable = partitionTableMap.get(storageGroup);
return partitionTable.getAllDataRegionId();
}
return Collections.emptyList();
}

public synchronized void registerStorageGroup(PartialPath storageGroup) {
if (partitionTableMap.containsKey(storageGroup)) {
return;
}
partitionTableMap.put(storageGroup, new LocalDataPartitionTable(storageGroup.getFullPath()));
}

public synchronized void deleteStorageGroup(PartialPath storageGroup) {
LocalDataPartitionTable partitionTable = partitionTableMap.remove(storageGroup);
if (partitionTable != null) {
partitionTable.clear();
}
}
}
Loading

0 comments on commit 5cc3581

Please sign in to comment.