Skip to content

Commit

Permalink
PipeConsensus: complete consensus prodedure and pipe components with …
Browse files Browse the repository at this point in the history
…new thrift service (apache#12355)

Co-authored-by: yschengzi <[email protected]>
Co-authored-by: Caideyipi <[email protected]>
Co-authored-by: Steve Yurong Su <[email protected]>
Co-authored-by: OneSizeFitQuorum <[email protected]>
  • Loading branch information
5 people authored and SzyWilliam committed Nov 22, 2024
1 parent 5526ec3 commit c790c03
Show file tree
Hide file tree
Showing 94 changed files with 8,635 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ public class ClusterConstant {
public static final String SIMPLE_CONSENSUS_STR = "Simple";
public static final String RATIS_CONSENSUS_STR = "Ratis";
public static final String IOT_CONSENSUS_STR = "IoT";
public static final String PIPE_CONSENSUS_STR = "Pipe";
public static final String STREAM_CONSENSUS_STR = "Stream";
public static final String BATCH_CONSENSUS_STR = "Batch";

public static final String JAVA_CMD =
System.getProperty("java.home")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.iotdb.consensus.ConsensusFactory.FAST_IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOTV2_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.REAL_PIPE_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
import static org.apache.iotdb.db.utils.DateTimeUtils.convertLongToDate;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.BATCH_CONSENSUS_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.CLUSTER_CONFIGURATIONS;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.DEFAULT_CONFIG_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.DEFAULT_DATA_NODE_NUM;
Expand All @@ -47,11 +51,13 @@
import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE_CONFIG_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE_DATA_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.LOCK_FILE_PATH;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.PIPE_CONSENSUS_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.RATIS_CONSENSUS_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCALABLE_SINGLE_NODE_MODE;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCALABLE_SINGLE_NODE_MODE_CONFIG_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCALABLE_SINGLE_NODE_MODE_DATA_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.SIMPLE_CONSENSUS_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.STREAM_CONSENSUS_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.STRONG_CONSISTENCY_CLUSTER_MODE;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.STRONG_CONSISTENCY_CLUSTER_MODE_CONFIG_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.STRONG_CONSISTENCY_CLUSTER_MODE_DATA_NODE_NUM;
Expand Down Expand Up @@ -216,6 +222,12 @@ public static String fromConsensusFullNameToAbbr(String consensus) {
return RATIS_CONSENSUS_STR;
case IOT_CONSENSUS:
return IOT_CONSENSUS_STR;
case REAL_PIPE_CONSENSUS:
return PIPE_CONSENSUS_STR;
case IOTV2_CONSENSUS:
return STREAM_CONSENSUS_STR;
case FAST_IOT_CONSENSUS:
return BATCH_CONSENSUS_STR;
default:
throw new IllegalArgumentException("Unknown consensus type: " + consensus);
}
Expand All @@ -229,6 +241,12 @@ public static String fromConsensusAbbrToFullName(String consensus) {
return RATIS_CONSENSUS;
case IOT_CONSENSUS_STR:
return IOT_CONSENSUS;
case PIPE_CONSENSUS_STR:
return REAL_PIPE_CONSENSUS;
case STREAM_CONSENSUS_STR:
return IOTV2_CONSENSUS;
case BATCH_CONSENSUS_STR:
return FAST_IOT_CONSENSUS;
default:
throw new IllegalArgumentException("Unknown consensus type: " + consensus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
import org.apache.iotdb.rpc.TSStatusCode;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
Expand All @@ -42,6 +45,36 @@
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipeAutoConflictIT extends AbstractPipeDualAutoIT {
@Before
public void setUp() {
MultiEnvFactory.createEnv(2);
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);

// TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
receiverEnv
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);

// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);

senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment();
}

@Test
public void testDoubleLivingAutoConflict() throws Exception {
// Double living is two clusters each with a pipe connecting to the other.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,14 @@ public enum TSStatusCode {
DROP_CONSUMER_ERROR(2101),
ALTER_CONSUMER_ERROR(2102),
CONSUMER_PUSH_META_ERROR(2103),

// Pipe Consensus
PIPE_CONSENSUS_CONNECTOR_RESTART_ERROR(2200),
PIPE_CONSENSUS_VERSION_ERROR(2201),
PIPE_CONSENSUS_DEPRECATED_REQUEST(2202),
PIPE_CONSENSUS_TRANSFER_FILE_OFFSET_RESET(2203),
PIPE_CONSENSUS_TRANSFER_FILE_ERROR(2204),
PIPE_CONSENSUS_TYPE_ERROR(2205),
;

private final int statusCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,18 @@ private void checkGlobalConfig() throws ConfigurationException {
"the SchemaRegion doesn't support org.apache.iotdb.consensus.iot.IoTConsensus");
}

// When the schemaengine region consensus protocol is set to PipeConsensus,
// we should report an error
if (CONF.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS)
|| CONF.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.IOTV2_CONSENSUS)) {
throw new ConfigurationException(
"schema_region_consensus_protocol_class",
String.valueOf(CONF.getSchemaRegionConsensusProtocolClass()),
String.format(
"%s or %s", ConsensusFactory.SIMPLE_CONSENSUS, ConsensusFactory.RATIS_CONSENSUS),
"the SchemaRegion doesn't support org.apache.iotdb.consensus.iot.FastIoTConsensus");
}

// The leader distribution policy is limited
if (!AbstractLeaderBalancer.GREEDY_POLICY.equals(CONF.getLeaderDistributionPolicy())
&& !AbstractLeaderBalancer.CFD_POLICY.equals(CONF.getLeaderDistributionPolicy())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public class RouteBalancer implements IClusterStatusSubscriber {
&& ConsensusFactory.RATIS_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
&& ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
&& ConsensusFactory.FAST_IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
&& ConsensusFactory.IOTV2_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
// The simple consensus protocol will always automatically designate itself as the leader
|| ConsensusFactory.SIMPLE_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS);
private static final boolean IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION =
Expand Down Expand Up @@ -180,9 +184,12 @@ private void balanceRegionLeader(
regionGroupId,
newLeaderId);
switch (consensusProtocolClass) {
case ConsensusFactory.FAST_IOT_CONSENSUS:
case ConsensusFactory.IOTV2_CONSENSUS:
case ConsensusFactory.IOT_CONSENSUS:
case ConsensusFactory.SIMPLE_CONSENSUS:
// For IoTConsensus or SimpleConsensus protocol, change RegionRouteMap is enough
// For IoTConsensus or SimpleConsensus or PipeConsensus protocol, change
// RegionRouteMap is enough
successTransferMap.put(
regionGroupId, new ConsensusGroupHeartbeatSample(currentTime, newLeaderId));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeType;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
Expand Down Expand Up @@ -498,6 +499,10 @@ private TSStatus handleLeaderChangeInternal(final PipeHandleLeaderChangePlan pla
.getPipeMetaList()
.forEach(
pipeMeta -> {
if (PipeType.CONSENSUS.equals(pipeMeta.getStaticMeta().getPipeType())) {
return; // pipe consensus pipe task will not change
}

final Map<Integer, PipeTaskMeta> consensusGroupIdToTaskMetaMap =
pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@

package org.apache.iotdb.confignode.procedure.impl.pipe.task;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeType;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
Expand Down Expand Up @@ -54,6 +59,9 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY;

public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 {

private static final Logger LOGGER = LoggerFactory.getLogger(CreatePipeProcedureV2.class);
Expand Down Expand Up @@ -137,31 +145,52 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
final ConcurrentMap<Integer, PipeTaskMeta> consensusGroupIdToTaskMetaMap =
new ConcurrentHashMap<>();

// data regions & schema regions
env.getConfigManager()
.getLoadManager()
.getRegionLeaderMap()
.forEach(
(regionGroupId, regionLeaderNodeId) -> {
final String databaseName =
env.getConfigManager().getPartitionManager().getRegionStorageGroup(regionGroupId);
if (databaseName != null && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)) {
// Pipe only collect user's data, filter out metric database here.
consensusGroupIdToTaskMetaMap.put(
regionGroupId.getId(),
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, regionLeaderNodeId));
}
});

// config region
consensusGroupIdToTaskMetaMap.put(
// 0 is the consensus group id of the config region, but data region id and schema region id
// also start from 0, so we use Integer.MIN_VALUE to represent the config region
Integer.MIN_VALUE,
new PipeTaskMeta(
MinimumProgressIndex.INSTANCE,
// The leader of the config region is the config node itself
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
if (PipeType.CONSENSUS.equals(pipeStaticMeta.getPipeType())) {
final TConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromString(
createPipeRequest.getExtractorAttributes().get(EXTRACTOR_CONSENSUS_GROUP_ID_KEY))
.convertToTConsensusGroupId();

final int senderDataNodeId =
Integer.parseInt(
createPipeRequest
.getExtractorAttributes()
.get(EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY));
consensusGroupIdToTaskMetaMap.put(
groupId.getId(),
new PipeTaskMeta(
new RecoverProgressIndex(senderDataNodeId, new SimpleProgressIndex(0, 0)),
senderDataNodeId));
} else {
// data regions & schema regions
env.getConfigManager()
.getLoadManager()
.getRegionLeaderMap()
.forEach(
(regionGroupId, regionLeaderNodeId) -> {
final String databaseName =
env.getConfigManager()
.getPartitionManager()
.getRegionStorageGroup(regionGroupId);
if (databaseName != null && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)) {
// Pipe only collect user's data, filter out metric database here.
consensusGroupIdToTaskMetaMap.put(
regionGroupId.getId(),
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, regionLeaderNodeId));
}
});

// config region
consensusGroupIdToTaskMetaMap.put(
// 0 is the consensus group id of the config region, but data region id and schema region
// id
// also start from 0, so we use Integer.MIN_VALUE to represent the config region
Integer.MIN_VALUE,
new PipeTaskMeta(
MinimumProgressIndex.INSTANCE,
// The leader of the config region is the config node itself
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
}

pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
pipeRuntimeMeta.getStatus().set(PipeStatus.RUNNING);
Expand Down
9 changes: 7 additions & 2 deletions iotdb-core/consensus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,24 @@
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-thrift-consensus</artifactId>
<artifactId>iotdb-thrift-commons</artifactId>
<version>1.3.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-thrift-commons</artifactId>
<artifactId>iotdb-thrift-consensus</artifactId>
<version>1.3.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>service-rpc</artifactId>
<version>1.3.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>pipe-api</artifactId>
<version>1.3.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.ratis</groupId>
<artifactId>ratis-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@

package org.apache.iotdb.consensus;

import org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class ConsensusFactory {
Expand All @@ -35,6 +39,17 @@ public class ConsensusFactory {
public static final String SIMPLE_CONSENSUS = "org.apache.iotdb.consensus.simple.SimpleConsensus";
public static final String RATIS_CONSENSUS = "org.apache.iotdb.consensus.ratis.RatisConsensus";
public static final String IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.IoTConsensus";
public static final String REAL_PIPE_CONSENSUS = "org.apache.iotdb.consensus.pipe.PipeConsensus";
// Corresponding to streamConsensus
public static final String IOTV2_CONSENSUS = "org.apache.iotdb.consensus.iot.IoTV2Consensus";
// Corresponding to batchConsensus
public static final String FAST_IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.FastIoTConsensus";
private static final Map<String, ReplicateMode> PIPE_CONSENSUS_MODE_MAP = new HashMap<>();

static {
PIPE_CONSENSUS_MODE_MAP.put(IOTV2_CONSENSUS, ReplicateMode.STREAM);
PIPE_CONSENSUS_MODE_MAP.put(FAST_IOT_CONSENSUS, ReplicateMode.BATCH);
}

private static final Logger logger = LoggerFactory.getLogger(ConsensusFactory.class);

Expand All @@ -45,6 +60,13 @@ private ConsensusFactory() {
public static Optional<IConsensus> getConsensusImpl(
String className, ConsensusConfig config, IStateMachine.Registry registry) {
try {
// special judge for PipeConsensus
if (className.equals(IOTV2_CONSENSUS) || className.equals(FAST_IOT_CONSENSUS)) {
config.getPipeConsensusConfig().setReplicateMode(PIPE_CONSENSUS_MODE_MAP.get(className));
className = REAL_PIPE_CONSENSUS;
// initialize pipeConsensus' thrift component
PipeConsensusClientMgrContainer.build();
}
Class<?> executor = Class.forName(className);
Constructor<?> executorConstructor =
executor.getDeclaredConstructor(ConsensusConfig.class, IStateMachine.Registry.class);
Expand Down
Loading

0 comments on commit c790c03

Please sign in to comment.