Skip to content

Commit

Permalink
[ISSUE #7064] [RIP-66-2] Support KV(RocksDB) Storage for ConsumeQueue (
Browse files Browse the repository at this point in the history
…#7120)

* typo int readme[ecosystem]

* consumequue support rocksdb

* fix rocksdb test

* fix rocksdb test

* remove unused method

* split into two tables

* CqUnit in Rocksdb [phyOffset, bodySize, tagHashCode, msgStoreTime]

* fix build.baze in tieredMessageStore

* skip RocksDBMessageStoreTest bazel

* skip RocksDBMessageStoreTest bazel

* Rocksdb TimerMessageStore

* fix unit test bazel

* fix store build.bazel

* fix store build.bazel

* optimize

* optimize

* polish

* build bytebuffer pair inner

* remove unused code

* DataConverter.CHARSET_UTF8

* fix comment

* fix comment

* rebuild test

* rebuild test

* optimize

* rebuild test

* polish

* polish

* rebuild test

* merge develop

* rebuild test

* rebuild test

* fix getConsumeQueue not find cq

* fix test

* rocksdb new version

* rebuild test

* fix bug

* Resolve conflicts after merging develop

* fix updateCqOffset

* fix recoverAbnormally

* fix recoverAbnormally

* polish

* polish

* polish

* remove exception in cleanunusedTopic

* remove exception in cleanunusedTopic

* remove exception in cleanunusedTopic

---------

Co-authored-by: RongtongJin <[email protected]>
  • Loading branch information
fujian-zfj and RongtongJin authored Oct 14, 2023
1 parent 2113fa3 commit f565654
Show file tree
Hide file tree
Showing 64 changed files with 4,793 additions and 670 deletions.
2 changes: 1 addition & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ maven_install(
"com.fasterxml.jackson.core:jackson-databind:2.13.4.2",
"com.adobe.testing:s3mock-junit4:2.11.0",
"io.github.aliyunmq:rocketmq-grpc-netty-codec-haproxy:1.0.0",
"io.github.aliyunmq:rocketmq-rocksdb:1.0.3",
"org.apache.rocketmq:rocketmq-rocksdb:1.0.2",
],
fetch_sources = True,
repositories = [
Expand Down
2 changes: 1 addition & 1 deletion broker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ java_library(
"@maven//:io_github_aliyunmq_rocketmq_logback_classic",
"@maven//:org_slf4j_jul_to_slf4j",
"@maven//:io_github_aliyunmq_rocketmq_shaded_slf4j_api_bridge",
"@maven//:io_github_aliyunmq_rocketmq_rocksdb",
"@maven//:org_apache_rocketmq_rocketmq_rocksdb",
"@maven//:net_java_dev_jna_jna",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,33 @@
*/
package org.apache.rocketmq.broker;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;

import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
Expand Down Expand Up @@ -126,7 +152,7 @@
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.StoreType;
import org.apache.rocketmq.store.RocksDBMessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
Expand All @@ -141,31 +167,6 @@
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.TimerMetrics;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

public class BrokerController {
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
Expand Down Expand Up @@ -308,7 +309,7 @@ public BrokerController(
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
this.broadcastOffsetManager = new BroadcastOffsetManager(this);
if (isEnableRocksDBStore()) {
if (this.messageStoreConfig.isEnableRocksDBStore()) {
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqSubscriptionGroupManager(this) : new RocksDBSubscriptionGroupManager(this);
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqConsumerOffsetManager(this) : new RocksDBConsumerOffsetManager(this);
Expand Down Expand Up @@ -747,7 +748,12 @@ public boolean initializeMetadata() {
public boolean initializeMessageStore() {
boolean result = true;
try {
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
DefaultMessageStore defaultMessageStore;
if (this.messageStoreConfig.isEnableRocksDBStore()) {
defaultMessageStore = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
} else {
defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
}

if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler =
Expand Down Expand Up @@ -944,16 +950,16 @@ private void initialTransaction() {
this.transactionalMessageService = ServiceProvider.loadClass(TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
this.transactionalMessageService = new TransactionalMessageServiceImpl(
new TransactionalMessageBridge(this, this.getMessageStore()));
new TransactionalMessageBridge(this, this.getMessageStore()));
LOG.warn("Load default transaction message hook service: {}",
TransactionalMessageServiceImpl.class.getSimpleName());
TransactionalMessageServiceImpl.class.getSimpleName());
}
this.transactionalMessageCheckListener = ServiceProvider.loadClass(
AbstractTransactionalMessageCheckListener.class);
AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
LOG.warn("Load default discard message hook service: {}",
DefaultTransactionalMessageCheckListener.class.getSimpleName());
DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
Expand Down Expand Up @@ -2412,8 +2418,4 @@ public ColdDataCgCtrService getColdDataCgCtrService() {
public void setColdDataCgCtrService(ColdDataCgCtrService coldDataCgCtrService) {
this.coldDataCgCtrService = coldDataCgCtrService;
}

public boolean isEnableRocksDBStore() {
return StoreType.DEFAULT_ROCKSDB.getStoreType().equalsIgnoreCase(this.messageStoreConfig.getStoreType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void shutdown() {

public synchronized void changeBrokerRole(final Long newMasterBrokerId, final String newMasterAddress,
final Integer newMasterEpoch,
final Integer syncStateSetEpoch, final Set<Long> syncStateSet) {
final Integer syncStateSetEpoch, final Set<Long> syncStateSet) throws Exception {
if (newMasterBrokerId != null && newMasterEpoch > this.masterEpoch) {
if (newMasterBrokerId.equals(this.brokerControllerId)) {
changeToMaster(newMasterEpoch, syncStateSetEpoch, syncStateSet);
Expand All @@ -234,7 +234,7 @@ public synchronized void changeBrokerRole(final Long newMasterBrokerId, final St
}
}

public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch, final Set<Long> syncStateSet) {
public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch, final Set<Long> syncStateSet) throws Exception {
synchronized (this) {
if (newMasterEpoch > this.masterEpoch) {
LOGGER.info("Begin to change to master, brokerName:{}, replicas:{}, new Epoch:{}", this.brokerConfig.getBrokerName(), this.brokerAddress, newMasterEpoch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {

public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
this.rocksDBConfigManager = new RocksDBConfigManager(this.brokerController.getMessageStoreConfig().getMemTableFlushInterval());
this.rocksDBConfigManager = new RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}

@Override
Expand All @@ -49,7 +49,7 @@ public boolean stop() {
@Override
protected void removeConsumerOffset(String topicAtGroup) {
try {
byte[] keyBytes = topicAtGroup.getBytes(DataConverter.charset);
byte[] keyBytes = topicAtGroup.getBytes(DataConverter.CHARSET_UTF8);
this.rocksDBConfigManager.delete(keyBytes);
} catch (Exception e) {
LOG.error("kv remove consumerOffset Failed, {}", topicAtGroup);
Expand All @@ -58,7 +58,7 @@ protected void removeConsumerOffset(String topicAtGroup) {

@Override
protected void decode0(final byte[] key, final byte[] body) {
String topicAtGroup = new String(key, DataConverter.charset);
String topicAtGroup = new String(key, DataConverter.CHARSET_UTF8);
RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, RocksDBOffsetSerializeWrapper.class);

this.offsetTable.put(topicAtGroup, wrapper.getOffsetTable());
Expand Down Expand Up @@ -93,7 +93,7 @@ public synchronized void persist() {
}

private void putWriteBatch(final WriteBatch writeBatch, final String topicGroupName, final ConcurrentMap<Integer, Long> offsetMap) throws Exception {
byte[] keyBytes = topicGroupName.getBytes(DataConverter.charset);
byte[] keyBytes = topicGroupName.getBytes(DataConverter.CHARSET_UTF8);
RocksDBOffsetSerializeWrapper wrapper = new RocksDBOffsetSerializeWrapper();
wrapper.setOffsetTable(offsetMap);
byte[] valueBytes = JSON.toJSONBytes(wrapper, SerializerFeature.BrowserCompatible);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA

MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(reviveTopic);
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.CHARSET_UTF8));
msgInner.setQueueId(rqId);
if (ackMsg instanceof BatchAckMsg) {
msgInner.setTags(PopAckConstants.BATCH_ACK_TAG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,18 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,

final Set<String> groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
// delete pop retry topics first
for (String group : groups) {
final String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) {
deleteTopicInBroker(popRetryTopic);
try {
for (String group : groups) {
final String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) {
deleteTopicInBroker(popRetryTopic);
}
}
// delete topic
deleteTopicInBroker(topic);
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
// delete topic
deleteTopicInBroker(topic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
Expand Down Expand Up @@ -2081,7 +2085,11 @@ private RemotingCommand getSystemTopicListFromBroker(ChannelHandlerContext ctx,
public RemotingCommand cleanExpiredConsumeQueue() {
LOGGER.info("AdminBrokerProcessor#cleanExpiredConsumeQueue: start.");
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
brokerController.getMessageStore().cleanExpiredConsumerQueue();
try {
brokerController.getMessageStore().cleanExpiredConsumerQueue();
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
LOGGER.info("AdminBrokerProcessor#cleanExpiredConsumeQueue: end.");
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
Expand Down Expand Up @@ -2781,7 +2789,11 @@ private RemotingCommand notifyBrokerRoleChanged(ChannelHandlerContext ctx,

final ReplicasManager replicasManager = this.brokerController.getReplicasManager();
if (replicasManager != null) {
replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch(), syncStateSetInfo.getSyncStateSet());
try {
replicasManager.changeBrokerRole(requestHeader.getMasterBrokerId(), requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch(), syncStateSetInfo.getSyncStateSet());
} catch (Exception e) {
throw new RemotingCommandException(e.getMessage());
}
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private void ackOrigin(final ChangeInvisibleTimeRequestHeader requestHeader, Str
}

msgInner.setTopic(reviveTopic);
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.CHARSET_UTF8));
msgInner.setQueueId(rqId);
msgInner.setTags(PopAckConstants.ACK_TAG);
msgInner.setBornTimestamp(System.currentTimeMillis());
Expand Down Expand Up @@ -216,7 +216,7 @@ private PutMessageResult appendCheckPoint(final ChangeInvisibleTimeRequestHeader
ck.addDiff(0);
ck.setBrokerName(brokerName);

msgInner.setBody(JSON.toJSONString(ck).getBytes(DataConverter.charset));
msgInner.setBody(JSON.toJSONString(ck).getBytes(DataConverter.CHARSET_UTF8));
msgInner.setQueueId(reviveQid);
msgInner.setTags(PopAckConstants.CK_TAG);
msgInner.setBornTimestamp(System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ private boolean putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgI
ackMsg.setQueueId(point.getQueueId());
ackMsg.setPopTime(point.getPopTime());
msgInner.setTopic(popMessageProcessor.reviveTopic);
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.CHARSET_UTF8));
msgInner.setQueueId(pointWrapper.getReviveQueueId());
msgInner.setTags(PopAckConstants.ACK_TAG);
msgInner.setBornTimestamp(System.currentTimeMillis());
Expand Down Expand Up @@ -673,7 +673,7 @@ private boolean putBatchAckToStore(final PopCheckPointWrapper pointWrapper, fina
batchAckMsg.setQueueId(point.getQueueId());
batchAckMsg.setPopTime(point.getPopTime());
msgInner.setTopic(popMessageProcessor.reviveTopic);
msgInner.setBody(JSON.toJSONString(batchAckMsg).getBytes(DataConverter.charset));
msgInner.setBody(JSON.toJSONString(batchAckMsg).getBytes(DataConverter.CHARSET_UTF8));
msgInner.setQueueId(pointWrapper.getReviveQueueId());
msgInner.setTags(PopAckConstants.BATCH_ACK_TAG);
msgInner.setBornTimestamp(System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ public final MessageExtBrokerInner buildCkMsg(final PopCheckPoint ck, final int
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();

msgInner.setTopic(reviveTopic);
msgInner.setBody(JSON.toJSONString(ck).getBytes(DataConverter.charset));
msgInner.setBody(JSON.toJSONString(ck).getBytes(DataConverter.CHARSET_UTF8));
msgInner.setQueueId(reviveQid);
msgInner.setTags(PopAckConstants.CK_TAG);
msgInner.setBornTimestamp(System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
}
for (MessageExt messageExt : messageExts) {
if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) {
String raw = new String(messageExt.getBody(), DataConverter.charset);
String raw = new String(messageExt.getBody(), DataConverter.CHARSET_UTF8);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},find ck, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
}
Expand All @@ -371,7 +371,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
firstRt = point.getReviveTime();
}
} else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) {
String raw = new String(messageExt.getBody(), DataConverter.charset);
String raw = new String(messageExt.getBody(), DataConverter.CHARSET_UTF8);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},find ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
}
Expand All @@ -395,7 +395,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
}
}
} else if (PopAckConstants.BATCH_ACK_TAG.equals(messageExt.getTags())) {
String raw = new String(messageExt.getBody(), DataConverter.charset);
String raw = new String(messageExt.getBody(), DataConverter.CHARSET_UTF8);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={}, find batch ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
}
Expand Down
Loading

0 comments on commit f565654

Please sign in to comment.