Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PipeConsensus: complete consensus prodedure and pipe components with new thrift service #12355

Merged
merged 121 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from 120 commits
Commits
Show all changes
121 commits
Select commit Hold shift + click to select a range
d7906a7
docs: add some comment
Pengzna Apr 11, 2024
2aa6a3d
Merge branch 'master' into pipe-consensus
Pengzna Apr 11, 2024
6877a92
feat: initialize PipeConsensus thrift service
Pengzna Apr 16, 2024
39ab10a
feat: add PipeConsensus config
Pengzna Apr 16, 2024
0171bb4
feat: add PipeConsensus config
Pengzna Apr 16, 2024
ccd134c
revert comment
Pengzna Apr 16, 2024
c7fc9ab
feat: add receiver logic
Pengzna Apr 16, 2024
cc70e6b
feat: add connector logic
Pengzna Apr 16, 2024
2f91090
feat: add receiver max waiting time
Pengzna Apr 16, 2024
6541ac7
fix review
Pengzna Apr 17, 2024
b901cc7
fix review
Pengzna Apr 17, 2024
e8d183c
add comment and temporarily complete receiver
Pengzna Apr 18, 2024
bae4893
feat: add clientManager and handshake logic
Pengzna Apr 20, 2024
a6493b7
feat: construct pipeConsensus payload
Pengzna Apr 20, 2024
463085b
fix: client manager code style
Pengzna Apr 20, 2024
87561fa
fix: manage event.referenceCount as review and add retry logic
Pengzna Apr 20, 2024
c09f027
fix: transfer request
Pengzna Apr 21, 2024
13ce8e6
feat: sync connector
Pengzna Apr 21, 2024
dbf9c61
feat: add new RPC service and client manager
Pengzna Apr 22, 2024
73dc7c9
feat: initialize handler
Pengzna Apr 22, 2024
d26acfb
refactor: adjust new RPC service
Pengzna Apr 22, 2024
0461eac
Merge remote-tracking branch 'base/master' into pipe-consensus
Pengzna Apr 22, 2024
2fcc786
refactor: adopt tsFile dependency and remove RPC to new consensus int…
Pengzna Apr 22, 2024
8953c3c
fix: redesign batch interface
Pengzna Apr 23, 2024
c0bcf50
wip: build bug-free
Pengzna Apr 23, 2024
b3f8842
feat: support tablet batch
Pengzna Apr 24, 2024
b020a46
feat: async connector handler
Pengzna Apr 24, 2024
93e3877
feat: complete async connector
Pengzna Apr 24, 2024
378c2b2
fix: tsFile pieces transfer
Pengzna Apr 25, 2024
90ffd12
fix: remove threadlocal in receiver
Pengzna Apr 26, 2024
32f95b5
fix: remove unnecessary handshake between RPC client and server
Pengzna Apr 26, 2024
95ba9ee
feat: integration mock connector with pipe
Pengzna May 7, 2024
fadaec4
refactor: add config dir
Pengzna May 7, 2024
35e32e7
fix: make RPC interface in iotdb-consensus and impl it in iotdb-core
Pengzna May 7, 2024
0ed9cba
test: add ut and fix some bugs
Pengzna May 7, 2024
c9d7d6f
test: fix ut fail
Pengzna May 7, 2024
5bc3a6a
finish pipe consensus
yschengzi May 11, 2024
2274c18
finish progress index
yschengzi May 14, 2024
c8519bb
finish procedure
yschengzi May 14, 2024
1623ab4
Merge branch 'pr/12355' into IOTDB-6321
yschengzi May 14, 2024
ac33113
working on merge
yschengzi May 15, 2024
a85ba1d
Merge pull request #1 from yschengzi/IOTDB-6321
Pengzna May 15, 2024
da0a0f1
internal thrift finish
yschengzi May 20, 2024
cc0594f
dependency revert and init for RPC
Pengzna May 20, 2024
9bedc4b
Merge remote-tracking branch 'refs/remotes/origin/pipe-consensus' int…
Pengzna May 20, 2024
d22251a
add ConsensusGroupId to pipeConsensus components
Pengzna May 20, 2024
4fa8dcb
add ConsensusGroupId to pipeConsensus connector
Pengzna May 20, 2024
1201491
finish pipe consensus thrift interface
yschengzi May 20, 2024
ec624ec
complete receive
Pengzna May 20, 2024
7a26e95
Merge remote-tracking branch 'refs/remotes/origin/pipe-consensus' int…
Pengzna May 20, 2024
a103e29
complete tsfile load framework
Pengzna May 20, 2024
3201dd1
Merge remote-tracking branch 'apache/master' into IOTDB-6321
yschengzi May 20, 2024
b058dc0
add exit logic for receiver
Pengzna May 20, 2024
3a7da12
fix
Pengzna May 20, 2024
b36a70e
finish merge master
yschengzi May 20, 2024
4acd059
fix
Pengzna May 20, 2024
d65abe7
Merge branch 'pipe-consensus' of https://github.com/Pengzna/iotdb int…
yschengzi May 20, 2024
f31aaa1
fix receiver start error
Pengzna May 20, 2024
f105e3f
optimize: asynchronizedly load tsFilePiece
Pengzna May 21, 2024
8f0da5a
precheck for read-only; null impl; inactive impl, etc.
Pengzna May 21, 2024
0adf91a
finish progress index
yschengzi May 21, 2024
f4d5982
Merge branch 'pipe-consensus' of https://github.com/Pengzna/iotdb int…
yschengzi May 21, 2024
2854d36
fix pipe_receiver_file_dirs path error in .bat and add consensus file…
Pengzna May 21, 2024
3c3252d
add consensus receiver file dirs config
Pengzna May 21, 2024
64ae178
remove useless todo
Pengzna May 21, 2024
5414ac9
fix review
Pengzna May 21, 2024
c400327
finish progress index for loading
yschengzi May 21, 2024
5364278
finish pipe consensus processor
yschengzi May 21, 2024
3b1285a
license
yschengzi May 21, 2024
0b2fb35
Merge branch 'pipe-consensus' of https://github.com/Pengzna/iotdb int…
yschengzi May 21, 2024
5764edb
Merge remote-tracking branch 'apache/master' into IOTDB-6321
yschengzi May 21, 2024
4683940
fix tsfile resouroce
yschengzi May 22, 2024
c90c54e
add dataNode route for receiver
Pengzna May 23, 2024
4c9a2cb
add replicate test
Pengzna May 23, 2024
e9e4037
Merge remote-tracking branch 'refs/remotes/base/master' into pipe-con…
Pengzna May 23, 2024
82d71c3
sync connector with mainstream pipe
Pengzna May 23, 2024
642318b
fix review
Pengzna May 23, 2024
61c2e6c
Merge remote-tracking branch 'refs/remotes/origin/pipe-consensus' int…
Pengzna May 23, 2024
5c9a3d6
fix review and give each receiver separate base file dirs
Pengzna May 23, 2024
d4b63ee
expose streamConsensus and batchConsensus to user
Pengzna May 24, 2024
d80650e
fix starting bug found in test
Pengzna May 24, 2024
0ce025b
remove fsync when transfer TsFile
Pengzna May 24, 2024
c919efc
add test
Pengzna May 24, 2024
b6a0dd0
add timeout
Pengzna May 24, 2024
c240abe
fix executor
Pengzna May 26, 2024
f92ceab
fix rebootTime
Pengzna May 26, 2024
c1d5c9d
fix validation
Pengzna May 26, 2024
927d6f1
fix create pipe
Pengzna May 26, 2024
4b18454
license
Pengzna May 26, 2024
55bfc27
connector parallel task set to 1 and add logs
Pengzna May 26, 2024
7b5f3d3
use pipe Consensus Processor
Pengzna May 26, 2024
4e5db8d
Merge remote-tracking branch 'refs/remotes/base/master' into pipe-con…
Pengzna May 26, 2024
c54896c
remove useless and fix pom
Pengzna May 26, 2024
3d050c0
delete ut temporarily
Pengzna May 26, 2024
e0e3c2b
fix review
Pengzna May 26, 2024
18aaf8a
remove unnecessary clean path
Pengzna May 26, 2024
8448fa1
First fix
Caideyipi May 28, 2024
d4d7348
Restore shells
Caideyipi May 28, 2024
f168839
delete raw
Caideyipi May 28, 2024
1ef6c77
Added IT
Caideyipi May 28, 2024
8cbf3f5
Change IT
Caideyipi May 28, 2024
85b9e00
fix
Pengzna May 29, 2024
15f29fe
fix review
Pengzna May 29, 2024
54d3d8a
Merge pull request #2 from Caideyipi/consensus-apply-comment
Pengzna May 29, 2024
80bc979
Merge remote-tracking branch 'refs/remotes/origin/pipe-consensus' int…
Pengzna May 29, 2024
821395d
fix all review
Pengzna May 29, 2024
c130490
Merge branch 'master' of https://github.com/apache/iotdb into pr/12355
SteveYurongSu May 29, 2024
7ab7798
revert IT and fix review
Pengzna May 29, 2024
41c7794
Merge remote-tracking branch 'refs/remotes/origin/pipe-consensus' int…
Pengzna May 29, 2024
a7f0b22
Merge remote-tracking branch 'refs/remotes/base/master' into pipe-con…
Pengzna May 29, 2024
3295635
fix: build success
Pengzna May 29, 2024
47bb893
fix review of ysc's part
Pengzna May 30, 2024
514d124
fix assignProgressIndexForTsFileRecovery
Pengzna May 30, 2024
7341cbc
fix dependency
Pengzna May 30, 2024
3592e77
fix dependency
Pengzna May 30, 2024
2982404
fix dependency
Pengzna May 30, 2024
465da4d
fix review
Pengzna May 30, 2024
5ce827b
Merge remote-tracking branch 'refs/remotes/base/master' into pipe-con…
Pengzna May 30, 2024
2c42b21
fix merge conflict
Pengzna May 31, 2024
d7362e3
Merge remote-tracking branch 'refs/remotes/base/master' into pipe-con…
Pengzna May 31, 2024
c22ea49
fix review and delete useless exit func
Pengzna May 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ public class ClusterConstant {
public static final String SIMPLE_CONSENSUS_STR = "Simple";
public static final String RATIS_CONSENSUS_STR = "Ratis";
public static final String IOT_CONSENSUS_STR = "IoT";
public static final String PIPE_CONSENSUS_STR = "Pipe";
public static final String STREAM_CONSENSUS_STR = "Stream";
public static final String BATCH_CONSENSUS_STR = "Batch";

public static final String JAVA_CMD =
System.getProperty("java.home")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.iotdb.consensus.ConsensusFactory.FAST_IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOTV2_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.REAL_PIPE_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
import static org.apache.iotdb.db.utils.DateTimeUtils.convertLongToDate;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.BATCH_CONSENSUS_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.CLUSTER_CONFIGURATIONS;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.DEFAULT_CONFIG_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.DEFAULT_DATA_NODE_NUM;
Expand All @@ -47,11 +51,13 @@
import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE_CONFIG_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE_DATA_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.LOCK_FILE_PATH;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.PIPE_CONSENSUS_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.RATIS_CONSENSUS_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCALABLE_SINGLE_NODE_MODE;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCALABLE_SINGLE_NODE_MODE_CONFIG_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCALABLE_SINGLE_NODE_MODE_DATA_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.SIMPLE_CONSENSUS_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.STREAM_CONSENSUS_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.STRONG_CONSISTENCY_CLUSTER_MODE;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.STRONG_CONSISTENCY_CLUSTER_MODE_CONFIG_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.STRONG_CONSISTENCY_CLUSTER_MODE_DATA_NODE_NUM;
Expand Down Expand Up @@ -216,6 +222,12 @@ public static String fromConsensusFullNameToAbbr(String consensus) {
return RATIS_CONSENSUS_STR;
case IOT_CONSENSUS:
return IOT_CONSENSUS_STR;
case REAL_PIPE_CONSENSUS:
return PIPE_CONSENSUS_STR;
case IOTV2_CONSENSUS:
return STREAM_CONSENSUS_STR;
case FAST_IOT_CONSENSUS:
return BATCH_CONSENSUS_STR;
default:
throw new IllegalArgumentException("Unknown consensus type: " + consensus);
}
Expand All @@ -229,6 +241,12 @@ public static String fromConsensusAbbrToFullName(String consensus) {
return RATIS_CONSENSUS;
case IOT_CONSENSUS_STR:
return IOT_CONSENSUS;
case PIPE_CONSENSUS_STR:
return REAL_PIPE_CONSENSUS;
case STREAM_CONSENSUS_STR:
return IOTV2_CONSENSUS;
case BATCH_CONSENSUS_STR:
return FAST_IOT_CONSENSUS;
default:
throw new IllegalArgumentException("Unknown consensus type: " + consensus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
import org.apache.iotdb.rpc.TSStatusCode;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
Expand All @@ -42,6 +45,36 @@
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipeAutoConflictIT extends AbstractPipeDualAutoIT {
@Before
public void setUp() {
MultiEnvFactory.createEnv(2);
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);

// TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
receiverEnv
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);

// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);

senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment();
}

@Test
public void testDoubleLivingAutoConflict() throws Exception {
// Double living is two clusters each with a pipe connecting to the other.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,14 @@ public enum TSStatusCode {
DROP_CONSUMER_ERROR(2101),
ALTER_CONSUMER_ERROR(2102),
CONSUMER_PUSH_META_ERROR(2103),

// Pipe Consensus
PIPE_CONSENSUS_CONNECTOR_RESTART_ERROR(2200),
PIPE_CONSENSUS_VERSION_ERROR(2201),
PIPE_CONSENSUS_DEPRECATED_REQUEST(2202),
PIPE_CONSENSUS_TRANSFER_FILE_OFFSET_RESET(2203),
PIPE_CONSENSUS_TRANSFER_FILE_ERROR(2204),
PIPE_CONSENSUS_TYPE_ERROR(2205),
;

private final int statusCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,18 @@ private void checkGlobalConfig() throws ConfigurationException {
"the SchemaRegion doesn't support org.apache.iotdb.consensus.iot.IoTConsensus");
}

// When the schemaengine region consensus protocol is set to PipeConsensus,
// we should report an error
if (CONF.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS)
|| CONF.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.IOTV2_CONSENSUS)) {
throw new ConfigurationException(
"schema_region_consensus_protocol_class",
String.valueOf(CONF.getSchemaRegionConsensusProtocolClass()),
String.format(
"%s or %s", ConsensusFactory.SIMPLE_CONSENSUS, ConsensusFactory.RATIS_CONSENSUS),
"the SchemaRegion doesn't support org.apache.iotdb.consensus.iot.FastIoTConsensus");
}

// The leader distribution policy is limited
if (!AbstractLeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy())
&& !AbstractLeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,12 @@ private void balanceRegionLeader(
regionGroupId,
newLeaderId);
switch (consensusProtocolClass) {
case ConsensusFactory.FAST_IOT_CONSENSUS:
case ConsensusFactory.IOTV2_CONSENSUS:
case ConsensusFactory.IOT_CONSENSUS:
case ConsensusFactory.SIMPLE_CONSENSUS:
// For IoTConsensus or SimpleConsensus protocol, change RegionRouteMap is enough
// For IoTConsensus or SimpleConsensus or PipeConsensus protocol, change
// RegionRouteMap is enough
successTransferMap.put(
regionGroupId, new ConsensusGroupHeartbeatSample(currentTime, newLeaderId));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeType;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
Expand Down Expand Up @@ -498,6 +499,10 @@ private TSStatus handleLeaderChangeInternal(final PipeHandleLeaderChangePlan pla
.getPipeMetaList()
.forEach(
pipeMeta -> {
if (PipeType.CONSENSUS.equals(pipeMeta.getStaticMeta().getPipeType())) {
return; // pipe consensus pipe task will not change
}

final Map<Integer, PipeTaskMeta> consensusGroupIdToTaskMetaMap =
pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@

package org.apache.iotdb.confignode.procedure.impl.pipe.task;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeType;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
Expand Down Expand Up @@ -54,6 +59,9 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY;

public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 {

private static final Logger LOGGER = LoggerFactory.getLogger(CreatePipeProcedureV2.class);
Expand Down Expand Up @@ -137,31 +145,52 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
final ConcurrentMap<Integer, PipeTaskMeta> consensusGroupIdToTaskMetaMap =
new ConcurrentHashMap<>();

// data regions & schema regions
env.getConfigManager()
.getLoadManager()
.getRegionLeaderMap()
.forEach(
(regionGroupId, regionLeaderNodeId) -> {
final String databaseName =
env.getConfigManager().getPartitionManager().getRegionStorageGroup(regionGroupId);
if (databaseName != null && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)) {
// Pipe only collect user's data, filter out metric database here.
consensusGroupIdToTaskMetaMap.put(
regionGroupId.getId(),
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, regionLeaderNodeId));
}
});

// config region
consensusGroupIdToTaskMetaMap.put(
// 0 is the consensus group id of the config region, but data region id and schema region id
// also start from 0, so we use Integer.MIN_VALUE to represent the config region
Integer.MIN_VALUE,
new PipeTaskMeta(
MinimumProgressIndex.INSTANCE,
// The leader of the config region is the config node itself
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
if (PipeType.CONSENSUS.equals(pipeStaticMeta.getPipeType())) {
Pengzna marked this conversation as resolved.
Show resolved Hide resolved
final TConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromString(
createPipeRequest.getExtractorAttributes().get(EXTRACTOR_CONSENSUS_GROUP_ID_KEY))
.convertToTConsensusGroupId();

final int senderDataNodeId =
Integer.parseInt(
createPipeRequest
.getExtractorAttributes()
.get(EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY));
consensusGroupIdToTaskMetaMap.put(
groupId.getId(),
new PipeTaskMeta(
new RecoverProgressIndex(senderDataNodeId, new SimpleProgressIndex(0, 0)),
senderDataNodeId));
} else {
// data regions & schema regions
env.getConfigManager()
.getLoadManager()
.getRegionLeaderMap()
.forEach(
(regionGroupId, regionLeaderNodeId) -> {
final String databaseName =
env.getConfigManager()
.getPartitionManager()
.getRegionStorageGroup(regionGroupId);
if (databaseName != null && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)) {
// Pipe only collect user's data, filter out metric database here.
consensusGroupIdToTaskMetaMap.put(
regionGroupId.getId(),
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, regionLeaderNodeId));
}
});

// config region
consensusGroupIdToTaskMetaMap.put(
Pengzna marked this conversation as resolved.
Show resolved Hide resolved
// 0 is the consensus group id of the config region, but data region id and schema region
// id
// also start from 0, so we use Integer.MIN_VALUE to represent the config region
Integer.MIN_VALUE,
new PipeTaskMeta(
MinimumProgressIndex.INSTANCE,
// The leader of the config region is the config node itself
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
}

pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
pipeRuntimeMeta.getStatus().set(PipeStatus.RUNNING);
Expand Down
9 changes: 7 additions & 2 deletions iotdb-core/consensus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,24 @@
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-thrift-consensus</artifactId>
<artifactId>iotdb-thrift-commons</artifactId>
<version>1.3.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-thrift-commons</artifactId>
<artifactId>iotdb-thrift-consensus</artifactId>
<version>1.3.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>service-rpc</artifactId>
<version>1.3.2-SNAPSHOT</version>
</dependency>
<dependency>
Pengzna marked this conversation as resolved.
Show resolved Hide resolved
<groupId>org.apache.iotdb</groupId>
<artifactId>pipe-api</artifactId>
<version>1.3.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.ratis</groupId>
<artifactId>ratis-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@

package org.apache.iotdb.consensus;

import org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class ConsensusFactory {
Expand All @@ -35,6 +39,17 @@ public class ConsensusFactory {
public static final String SIMPLE_CONSENSUS = "org.apache.iotdb.consensus.simple.SimpleConsensus";
public static final String RATIS_CONSENSUS = "org.apache.iotdb.consensus.ratis.RatisConsensus";
public static final String IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.IoTConsensus";
public static final String REAL_PIPE_CONSENSUS = "org.apache.iotdb.consensus.pipe.PipeConsensus";
// Corresponding to streamConsensus
public static final String IOTV2_CONSENSUS = "org.apache.iotdb.consensus.iot.IoTV2Consensus";
// Corresponding to batchConsensus
public static final String FAST_IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.FastIoTConsensus";
private static final Map<String, ReplicateMode> PIPE_CONSENSUS_MODE_MAP = new HashMap<>();

static {
PIPE_CONSENSUS_MODE_MAP.put(IOTV2_CONSENSUS, ReplicateMode.STREAM);
PIPE_CONSENSUS_MODE_MAP.put(FAST_IOT_CONSENSUS, ReplicateMode.BATCH);
}

private static final Logger logger = LoggerFactory.getLogger(ConsensusFactory.class);

Expand All @@ -45,6 +60,13 @@ private ConsensusFactory() {
public static Optional<IConsensus> getConsensusImpl(
String className, ConsensusConfig config, IStateMachine.Registry registry) {
try {
// special judge for PipeConsensus
if (className.equals(IOTV2_CONSENSUS) || className.equals(FAST_IOT_CONSENSUS)) {
config.getPipeConsensusConfig().setReplicateMode(PIPE_CONSENSUS_MODE_MAP.get(className));
className = REAL_PIPE_CONSENSUS;
// initialize pipeConsensus' thrift component
PipeConsensusClientMgrContainer.build();
}
Class<?> executor = Class.forName(className);
Constructor<?> executorConstructor =
executor.getDeclaredConstructor(ConsensusConfig.class, IStateMachine.Registry.class);
Expand Down
Loading
Loading