Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache_master/develop' into weihubeats/…
Browse files Browse the repository at this point in the history
…bugfix_grpcClient_null
  • Loading branch information
weihubeats committed Oct 18, 2023
2 parents 381824b + 82b2f8e commit dae83ec
Show file tree
Hide file tree
Showing 302 changed files with 10,259 additions and 2,730 deletions.
1 change: 1 addition & 0 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ build --enable_platform_specific_config
test --action_env=TEST_TMPDIR=/tmp

test --experimental_strict_java_deps=warn
test --experimental_ui_max_stdouterr_bytes=10485760
build --experimental_strict_java_deps=warn

test --test_output=errors
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,21 @@ $ java -version
java version "1.8.0_121"
```

For Windows users, click [here](https://dist.apache.org/repos/dist/release/rocketmq/5.1.3/rocketmq-all-5.1.3-bin-release.zip) to download the 5.1.3 RocketMQ binary release,
For Windows users, click [here](https://dist.apache.org/repos/dist/release/rocketmq/5.1.4/rocketmq-all-5.1.4-bin-release.zip) to download the 5.1.4 RocketMQ binary release,
unpack it to your local disk, such as `D:\rocketmq`.
For macOS and Linux users, execute following commands:

```shell
# Download release from the Apache mirror
$ wget https://dist.apache.org/repos/dist/release/rocketmq/5.1.3/rocketmq-all-5.1.3-bin-release.zip
$ wget https://dist.apache.org/repos/dist/release/rocketmq/5.1.4/rocketmq-all-5.1.4-bin-release.zip

# Unpack the release
$ unzip rocketmq-all-5.1.3-bin-release.zip
$ unzip rocketmq-all-5.1.4-bin-release.zip
```

Prepare a terminal and change to the extracted `bin` directory:
```shell
$ cd rocketmq-all-5.1.3-bin-release/bin
$ cd rocketmq-all-5.1.4-bin-release/bin
```

**1) Start NameServer**
Expand Down
3 changes: 2 additions & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ maven_install(
"io.opentelemetry:opentelemetry-exporter-prometheus:1.29.0-alpha",
"io.opentelemetry:opentelemetry-exporter-logging:1.29.0",
"io.opentelemetry:opentelemetry-sdk:1.29.0",
"io.opentelemetry:opentelemetry-exporter-logging-otlp:1.29.0",
"com.squareup.okio:okio-jvm:3.0.0",
"io.opentelemetry:opentelemetry-api:1.29.0",
"io.opentelemetry:opentelemetry-sdk-metrics:1.29.0",
Expand All @@ -105,7 +106,7 @@ maven_install(
"com.fasterxml.jackson.core:jackson-databind:2.13.4.2",
"com.adobe.testing:s3mock-junit4:2.11.0",
"io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0",
"io.github.aliyunmq:rocketmq-rocksdb:1.0.3",
"org.apache.rocketmq:rocketmq-rocksdb:1.0.2",
],
fetch_sources = True,
repositories = [
Expand Down
2 changes: 1 addition & 1 deletion acl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>5.1.4-SNAPSHOT</version>
<version>5.1.5-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-acl</artifactId>
<name>rocketmq-acl ${project.version}</name>
Expand Down
3 changes: 2 additions & 1 deletion broker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ java_library(
"@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
"@maven//:io_opentelemetry_opentelemetry_exporter_prometheus",
"@maven//:io_opentelemetry_opentelemetry_exporter_logging",
"@maven//:io_opentelemetry_opentelemetry_exporter_logging_otlp",
"@maven//:io_opentelemetry_opentelemetry_sdk",
"@maven//:io_opentelemetry_opentelemetry_sdk_common",
"@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
Expand All @@ -53,7 +54,7 @@ java_library(
"@maven//:io_github_aliyunmq_rocketmq_logback_classic",
"@maven//:org_slf4j_jul_to_slf4j",
"@maven//:io_github_aliyunmq_rocketmq_shaded_slf4j_api_bridge",
"@maven//:io_github_aliyunmq_rocketmq_rocksdb",
"@maven//:org_apache_rocketmq_rocketmq_rocksdb",
"@maven//:net_java_dev_jna_jna",
],
)
Expand Down
2 changes: 1 addition & 1 deletion broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>5.1.4-SNAPSHOT</version>
<version>5.1.5-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
188 changes: 102 additions & 86 deletions broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import io.netty.channel.Channel;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
Expand All @@ -35,7 +35,7 @@ public class ClientHousekeepingService implements ChannelEventListener {

public ClientHousekeepingService(final BrokerController brokerController) {
this.brokerController = brokerController;
scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("ClientHousekeepingScheduledThread", brokerController.getBrokerIdentity()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
Expand All @@ -37,7 +36,7 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen
private final BrokerController brokerController;
private final int cacheSize = 8096;

private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
ThreadUtils.newGenericThreadFactory("DefaultConsumerIdsChangeListener", true));

private ConcurrentHashMap<String,List<Channel>> consumerChannelMap = new ConcurrentHashMap<>(cacheSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -42,6 +40,7 @@
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.EpochEntry;
Expand Down Expand Up @@ -107,9 +106,9 @@ public class ReplicasManager {
public ReplicasManager(final BrokerController brokerController) {
this.brokerController = brokerController;
this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
this.scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity()));
this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
this.scheduledService = ThreadUtils.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity()));
this.executorService = ThreadUtils.newThreadPoolExecutor(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
this.scanExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("ReplicasManager_scan_thread_", brokerController.getBrokerIdentity()));
this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService();
this.brokerConfig = brokerController.getBrokerConfig();
Expand Down Expand Up @@ -225,7 +224,7 @@ public void shutdown() {

public synchronized void changeBrokerRole(final Long newMasterBrokerId, final String newMasterAddress,
final Integer newMasterEpoch,
final Integer syncStateSetEpoch, final Set<Long> syncStateSet) {
final Integer syncStateSetEpoch, final Set<Long> syncStateSet) throws Exception {
if (newMasterBrokerId != null && newMasterEpoch > this.masterEpoch) {
if (newMasterBrokerId.equals(this.brokerControllerId)) {
changeToMaster(newMasterEpoch, syncStateSetEpoch, syncStateSet);
Expand All @@ -235,7 +234,7 @@ public synchronized void changeBrokerRole(final Long newMasterBrokerId, final St
}
}

public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch, final Set<Long> syncStateSet) {
public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch, final Set<Long> syncStateSet) throws Exception {
synchronized (this) {
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to master, brokerName:{}, replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), this.brokerAddress, newMasterEpoch);
Expand Down Expand Up @@ -542,7 +541,7 @@ private boolean createMetadataFileAndDeleteTemp() {
this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId());
this.tempBrokerMetadata.clear();
this.brokerControllerId = this.brokerMetadata.getBrokerId();
this.haService.setBrokerControllerId(this.brokerControllerId);
this.haService.setLocalBrokerId(this.brokerControllerId);
return true;
} catch (Exception e) {
LOGGER.error("fail to create metadata file", e);
Expand Down Expand Up @@ -594,7 +593,7 @@ private void confirmNowRegisteringState() {
if (this.brokerMetadata.isLoaded()) {
this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
this.brokerControllerId = brokerMetadata.getBrokerId();
this.haService.setBrokerControllerId(this.brokerControllerId);
this.haService.setLocalBrokerId(this.brokerControllerId);
return;
}
// 2. check if temp metadata exist
Expand Down Expand Up @@ -735,23 +734,26 @@ private void schedulingCheckSyncStateSet() {
if (this.checkSyncStateSetTaskFuture != null) {
this.checkSyncStateSetTaskFuture.cancel(false);
}
this.checkSyncStateSetTaskFuture = this.scheduledService.scheduleAtFixedRate(() -> {
checkSyncStateSetAndDoReport();
}, 3 * 1000, this.brokerConfig.getCheckSyncStateSetPeriod(), TimeUnit.MILLISECONDS);
this.checkSyncStateSetTaskFuture = this.scheduledService.scheduleAtFixedRate(this::checkSyncStateSetAndDoReport, 3 * 1000,
this.brokerConfig.getCheckSyncStateSetPeriod(), TimeUnit.MILLISECONDS);
}

private void checkSyncStateSetAndDoReport() {
final Set<Long> newSyncStateSet = this.haService.maybeShrinkSyncStateSet();
newSyncStateSet.add(this.brokerControllerId);
synchronized (this) {
if (this.syncStateSet != null) {
// Check if syncStateSet changed
if (this.syncStateSet.size() == newSyncStateSet.size() && this.syncStateSet.containsAll(newSyncStateSet)) {
return;
try {
final Set<Long> newSyncStateSet = this.haService.maybeShrinkSyncStateSet();
newSyncStateSet.add(this.brokerControllerId);
synchronized (this) {
if (this.syncStateSet != null) {
// Check if syncStateSet changed
if (this.syncStateSet.size() == newSyncStateSet.size() && this.syncStateSet.containsAll(newSyncStateSet)) {
return;
}
}
}
doReportSyncStateSetChanged(newSyncStateSet);
} catch (Exception e) {
LOGGER.error("Check syncStateSet error", e);
}
doReportSyncStateSetChanged(newSyncStateSet);
}

private void doReportSyncStateSetChanged(Set<Long> newSyncStateSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import io.openmessaging.storage.dledger.MemberState;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
Expand All @@ -49,7 +49,7 @@ public DLedgerRoleChangeHandler(BrokerController brokerController, DefaultMessag
this.messageStore = messageStore;
this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
this.dLegerServer = dLedgerCommitLog.getdLedgerServer();
this.executorService = Executors.newSingleThreadExecutor(
this.executorService = ThreadUtils.newSingleThreadExecutor(
new ThreadFactoryImpl("DLegerRoleChangeHandler_", brokerController.getBrokerIdentity()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
Expand All @@ -43,6 +42,7 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;
Expand Down Expand Up @@ -72,7 +72,7 @@ public EscapeBridge(BrokerController brokerController) {
public void start() throws Exception {
if (brokerController.getBrokerConfig().isEnableSlaveActingMaster() && brokerController.getBrokerConfig().isEnableRemoteEscape()) {
final BlockingQueue<Runnable> asyncSenderThreadPoolQueue = new LinkedBlockingQueue<>(50000);
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
this.defaultAsyncSenderExecutor = ThreadUtils.newThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.netty.RequestTask;
Expand All @@ -43,7 +44,7 @@ public class BrokerFastFailure {

public BrokerFastFailure(final BrokerController brokerController) {
this.brokerController = brokerController;
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("BrokerFastFailureScheduledThread", true,
brokerController == null ? null : brokerController.getBrokerConfig()));
}
Expand Down

This file was deleted.

Loading

0 comments on commit dae83ec

Please sign in to comment.