Skip to content

Commit

Permalink
[ISSUE #7300] jRaft-Controller Implemention (#7301)
Browse files Browse the repository at this point in the history
* jRaft-Controller Implemention

* fix bazel build

* reformat code

* remove fury dependence

* fix bazel build

* resolve conflict

* clear code

* Optimize code style

* Optimize code style

* Optimize code style

* revert style only change
fix code style
rollback an unexpected modification.

* Update Producer.java

* chore: move raft startup to start method

* chore: use jraftconfig to collect all configs about jraft

* fix: fix wrong store path because init use string constant

* fix: fix CONTROLLER_NOT_LEADER error in follower

* chore: seperate jraft and controller log

* chore: fix conflict with develop

* chore: add comment to clear the filter logic

* feat: triggerElectMaster will retry when failed

* feat: when controller all restart, we use a timestamp to trace the first heartbeat, avoid to elect again

* fix: implements Serializable to enable snapshot serialize

* fix: use for loop to simple the elect retry

* chore: update jraft version

* chore: opt import

---------

Co-authored-by: leizhiyuan <[email protected]>
  • Loading branch information
yulangz and leizhiyuan authored Jan 29, 2024
1 parent 1784213 commit 8df53df
Show file tree
Hide file tree
Showing 57 changed files with 2,484 additions and 191 deletions.
2 changes: 2 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ maven_install(
"com.adobe.testing:s3mock-junit4:2.11.0",
"io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0",
"org.apache.rocketmq:rocketmq-rocksdb:1.0.2",
"com.alipay.sofa:jraft-core:1.3.11",
"com.alipay.sofa:hessian:3.3.6",
],
fetch_sources = True,
repositories = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import org.apache.rocketmq.common.metrics.MetricsExporterType;

public class ControllerConfig {

private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String configStorePath = System.getProperty("user.home") + File.separator + "controller" + File.separator + "controller.properties";
public static final String DLEDGER_CONTROLLER = "DLedger";
public static final String JRAFT_CONTROLLER = "jRaft";

private JraftConfig jraftConfig = new JraftConfig();

private String controllerType = DLEDGER_CONTROLLER;
/**
* Interval of periodic scanning for non-active broker;
* Unit: millisecond
Expand All @@ -45,7 +49,13 @@ public class ControllerConfig {
private String controllerDLegerPeers;
private String controllerDLegerSelfId;
private int mappedFileSize = 1024 * 1024 * 1024;
private String controllerStorePath = System.getProperty("user.home") + File.separator + "DledgerController";
private String controllerStorePath = "";

/**
* Max retry count for electing master when failed because of network or system error.
*/
private int electMasterMaxRetryCount = 3;


/**
* Whether the controller can elect a master which is not in the syncStateSet.
Expand Down Expand Up @@ -171,6 +181,9 @@ public void setMappedFileSize(int mappedFileSize) {
}

public String getControllerStorePath() {
if (controllerStorePath.isEmpty()) {
controllerStorePath = System.getProperty("user.home") + File.separator + controllerType + "Controller";
}
return controllerStorePath;
}

Expand Down Expand Up @@ -303,4 +316,28 @@ public boolean isMetricsInDelta() {
public void setMetricsInDelta(boolean metricsInDelta) {
this.metricsInDelta = metricsInDelta;
}

public String getControllerType() {
return controllerType;
}

public void setControllerType(String controllerType) {
this.controllerType = controllerType;
}

public JraftConfig getJraftConfig() {
return jraftConfig;
}

public void setJraftConfig(JraftConfig jraftConfig) {
this.jraftConfig = jraftConfig;
}

public int getElectMasterMaxRetryCount() {
return this.electMasterMaxRetryCount;
}

public void setElectMasterMaxRetryCount(int electMasterMaxRetryCount) {
this.electMasterMaxRetryCount = electMasterMaxRetryCount;
}
}
88 changes: 88 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/JraftConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;

public class JraftConfig {
int jRaftElectionTimeoutMs = 1000;

int jRaftScanWaitTimeoutMs = 1000;
int jRaftSnapshotIntervalSecs = 3600;
String jRaftGroupId = "jRaft-Controller";
String jRaftServerId = "localhost:9880";
String jRaftInitConf = "localhost:9880,localhost:9881,localhost:9882";
String jRaftControllerRPCAddr = "localhost:9770,localhost:9771,localhost:9772";

public int getjRaftElectionTimeoutMs() {
return jRaftElectionTimeoutMs;
}

public void setjRaftElectionTimeoutMs(int jRaftElectionTimeoutMs) {
this.jRaftElectionTimeoutMs = jRaftElectionTimeoutMs;
}

public int getjRaftSnapshotIntervalSecs() {
return jRaftSnapshotIntervalSecs;
}

public void setjRaftSnapshotIntervalSecs(int jRaftSnapshotIntervalSecs) {
this.jRaftSnapshotIntervalSecs = jRaftSnapshotIntervalSecs;
}

public String getjRaftGroupId() {
return jRaftGroupId;
}

public void setjRaftGroupId(String jRaftGroupId) {
this.jRaftGroupId = jRaftGroupId;
}

public String getjRaftServerId() {
return jRaftServerId;
}

public void setjRaftServerId(String jRaftServerId) {
this.jRaftServerId = jRaftServerId;
}

public String getjRaftInitConf() {
return jRaftInitConf;
}

public void setjRaftInitConf(String jRaftInitConf) {
this.jRaftInitConf = jRaftInitConf;
}

public String getjRaftControllerRPCAddr() {
return jRaftControllerRPCAddr;
}

public void setjRaftControllerRPCAddr(String jRaftControllerRPCAddr) {
this.jRaftControllerRPCAddr = jRaftControllerRPCAddr;
}

public String getjRaftAddress() {
return this.jRaftServerId;
}

public int getjRaftScanWaitTimeoutMs() {
return jRaftScanWaitTimeoutMs;
}

public void setjRaftScanWaitTimeoutMs(int jRaftScanWaitTimeoutMs) {
this.jRaftScanWaitTimeoutMs = jRaftScanWaitTimeoutMs;
}
}
4 changes: 3 additions & 1 deletion common/src/main/java/org/apache/rocketmq/common/Pair.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.rocketmq.common;

public class Pair<T1, T2> {
import java.io.Serializable;

public class Pair<T1, T2> implements Serializable {
private T1 object1;
private T2 object2;

Expand Down
4 changes: 3 additions & 1 deletion controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ java_library(
"@maven//:io_opentelemetry_opentelemetry_exporter_logging",
"@maven//:io_opentelemetry_opentelemetry_exporter_logging_otlp",
"@maven//:org_slf4j_jul_to_slf4j",
"@maven//:com_alipay_sofa_jraft_core",
"@maven//:com_alipay_sofa_hessian",
],
)

Expand All @@ -69,7 +71,7 @@ java_library(
"@maven//:org_apache_commons_commons_lang3",
"@maven//:io_netty_netty_all",
"@maven//:com_google_guava_guava",
"@maven//:com_alibaba_fastjson",
"@maven//:com_alibaba_fastjson",
],
resources = glob(["src/test/resources/certs/*.pem"]) + glob(["src/test/resources/certs/*.key"])
)
Expand Down
11 changes: 10 additions & 1 deletion controller/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
Expand Down Expand Up @@ -62,5 +63,13 @@
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>jraft-core</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,32 @@
package org.apache.rocketmq.controller;

import io.netty.channel.Channel;
import java.util.Map;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
import org.apache.rocketmq.controller.impl.heartbeat.BrokerLiveInfo;
import org.apache.rocketmq.controller.impl.heartbeat.DefaultBrokerHeartbeatManager;
import org.apache.rocketmq.controller.impl.heartbeat.RaftBrokerHeartBeatManager;

import java.util.Map;

public interface BrokerHeartbeatManager {
public static final long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 10;

public static BrokerHeartbeatManager newBrokerHeartbeatManager(ControllerConfig controllerConfig) {
if (controllerConfig.getControllerType().equals(ControllerConfig.JRAFT_CONTROLLER)) {
return new RaftBrokerHeartBeatManager(controllerConfig);
} else {
return new DefaultBrokerHeartbeatManager(controllerConfig);
}
}

/**
* initialize the resources
*
* @return
*/
void initialize();

/**
* Broker new heartbeat.
*/
Expand Down Expand Up @@ -67,6 +82,7 @@ void onBrokerHeartbeat(final String clusterName, final String brokerName, final

/**
* Count the number of active brokers in each broker-set of each cluster
*
* @return active brokers count
*/
Map<String/*cluster*/, Map<String/*broker-set*/, Integer/*active broker num*/>> getActiveBrokersNum();
Expand Down
Loading

0 comments on commit 8df53df

Please sign in to comment.