Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into lock
Browse files Browse the repository at this point in the history
# Conflicts:
#	client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
  • Loading branch information
shenjianeng committed Jan 29, 2024
2 parents 1f41101 + a0cb9d4 commit 17259ae
Show file tree
Hide file tree
Showing 213 changed files with 3,809 additions and 1,182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionMetricsFlushService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener;
Expand Down Expand Up @@ -277,6 +278,7 @@ public class BrokerController {
private BrokerMetricsManager brokerMetricsManager;
private ColdDataPullRequestHoldService coldDataPullRequestHoldService;
private ColdDataCgCtrService coldDataCgCtrService;
private TransactionMetricsFlushService transactionMetricsFlushService;

public BrokerController(
final BrokerConfig brokerConfig,
Expand Down Expand Up @@ -963,6 +965,9 @@ private void initialTransaction() {
}
this.transactionalMessageCheckListener.setBrokerController(this);
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
this.transactionMetricsFlushService = new TransactionMetricsFlushService(this);
this.transactionMetricsFlushService.start();

}

private void initialAcl() {
Expand Down Expand Up @@ -1440,6 +1445,10 @@ protected void shutdownBasicService() {
this.endTransactionExecutor.shutdown();
}

if (this.transactionMetricsFlushService != null) {
this.transactionMetricsFlushService.shutdown();
}

if (this.escapeBridge != null) {
escapeBridge.shutdown();
}
Expand Down Expand Up @@ -2108,6 +2117,7 @@ public synchronized void changeScheduleServiceStatus(boolean shouldStart) {
isScheduleServiceStart = shouldStart;

if (timerMessageStore != null) {
timerMessageStore.syncLastReadTimeMs();
timerMessageStore.setShouldRunningDequeue(shouldStart);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public static String getTimerCheckPath(final String rootDir) {
public static String getTimerMetricsPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "timermetrics";
}
public static String getTransactionMetricsPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "transactionMetrics";
}

public static String getConsumerFilterPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "consumerFilter.json";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public class BrokerMetricsConstant {
public static final String GAUGE_CONSUMER_READY_MESSAGES = "rocketmq_consumer_ready_messages";
public static final String COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL = "rocketmq_send_to_dlq_messages_total";

public static final String COUNTER_COMMIT_MESSAGES_TOTAL = "rocketmq_commit_messages_total";
public static final String COUNTER_ROLLBACK_MESSAGES_TOTAL = "rocketmq_rollback_messages_total";
public static final String HISTOGRAM_FINISH_MSG_LATENCY = "rocketmq_finish_message_latency";
public static final String GAUGE_HALF_MESSAGES = "rocketmq_half_messages";

public static final String LABEL_CLUSTER_NAME = "cluster";
public static final String LABEL_NODE_TYPE = "node_type";
public static final String NODE_TYPE_BROKER = "broker";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,6 @@
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerManager;
Expand All @@ -68,12 +61,23 @@
import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant;
import org.slf4j.bridge.SLF4JBridgeHandler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.AGGREGATION_DELTA;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_COMMIT_MESSAGES_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_IN_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_OUT_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_ROLLBACK_MESSAGES_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_IN_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_OUT_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_BROKER_PERMISSION;
Expand All @@ -83,8 +87,10 @@
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_LAG_MESSAGES;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_QUEUEING_LATENCY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_READY_MESSAGES;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_HALF_MESSAGES;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PROCESSOR_WATERMARK;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PRODUCER_CONNECTIONS;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_FINISH_MSG_LATENCY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGGREGATION;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CLUSTER_NAME;
Expand Down Expand Up @@ -141,6 +147,10 @@ public class BrokerMetricsManager {
public static ObservableLongGauge consumerQueueingLatency = new NopObservableLongGauge();
public static ObservableLongGauge consumerReadyMessages = new NopObservableLongGauge();
public static LongCounter sendToDlqMessages = new NopLongCounter();
public static ObservableLongGauge halfMessages = new NopObservableLongGauge();
public static LongCounter commitMessagesTotal = new NopLongCounter();
public static LongCounter rollBackMessagesTotal = new NopLongCounter();
public static LongHistogram transactionFinishLatency = new NopLongHistogram();

public static final List<String> SYSTEM_GROUP_PREFIX_LIST = new ArrayList<String>() {
{
Expand Down Expand Up @@ -210,7 +220,8 @@ public static TopicMessageType getMessageType(SendMessageRequestHeader requestHe
} else if (properties.get("__STARTDELIVERTIME") != null
|| properties.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null
|| properties.get(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null
|| properties.get(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) {
|| properties.get(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null
|| properties.get(MessageConst.PROPERTY_TIMER_DELAY_MS) != null) {
topicMessageType = TopicMessageType.DELAY;
}
return topicMessageType;
Expand Down Expand Up @@ -347,6 +358,7 @@ private void init() {
initRequestMetrics();
initConnectionMetrics();
initLagAndDlqMetrics();
initTransactionMetrics();
initOtherMetrics();
}

Expand All @@ -360,6 +372,15 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
2d * 1024 * 1024, //2MB
4d * 1024 * 1024 //4MB
);

List<Double> commitLatencyBuckets = Arrays.asList(
1d * 1 * 1 * 5, //5s
1d * 1 * 1 * 60, //1min
1d * 1 * 10 * 60, //10min
1d * 1 * 60 * 60, //1h
1d * 12 * 60 * 60, //12h
1d * 24 * 60 * 60 //24h
);
InstrumentSelector messageSizeSelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM)
.setName(HISTOGRAM_MESSAGE_SIZE)
Expand All @@ -370,6 +391,16 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
SdkMeterProviderUtil.setCardinalityLimit(messageSizeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
providerBuilder.registerView(messageSizeSelector, messageSizeViewBuilder.build());

InstrumentSelector commitLatencySelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM)
.setName(HISTOGRAM_FINISH_MSG_LATENCY)
.build();
ViewBuilder commitLatencyViewBuilder = View.builder()
.setAggregation(Aggregation.explicitBucketHistogram(commitLatencyBuckets));
// To config the cardinalityLimit for openTelemetry metrics exporting.
SdkMeterProviderUtil.setCardinalityLimit(commitLatencyViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
providerBuilder.registerView(commitLatencySelector, commitLatencyViewBuilder.build());

for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : RemotingMetricsManager.getMetricsView()) {
ViewBuilder viewBuilder = selectorViewPair.getObject2();
SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
Expand Down Expand Up @@ -559,6 +590,34 @@ private void initLagAndDlqMetrics() {
.build();
}

private void initTransactionMetrics() {
commitMessagesTotal = brokerMeter.counterBuilder(COUNTER_COMMIT_MESSAGES_TOTAL)
.setDescription("Total number of commit messages")
.build();

rollBackMessagesTotal = brokerMeter.counterBuilder(COUNTER_ROLLBACK_MESSAGES_TOTAL)
.setDescription("Total number of rollback messages")
.build();

transactionFinishLatency = brokerMeter.histogramBuilder(HISTOGRAM_FINISH_MSG_LATENCY)
.setDescription("Transaction finish latency")
.ofLongs()
.setUnit("ms")
.build();

halfMessages = brokerMeter.gaugeBuilder(GAUGE_HALF_MESSAGES)
.setDescription("Half messages of all topics")
.ofLongs()
.buildWithCallback(measurement -> {
brokerController.getTransactionalMessageService().getTransactionMetrics().getTransactionCounts()
.forEach((topic, metric) -> {
measurement.record(
metric.getCount().get(),
newAttributesBuilder().put(DefaultStoreMetricsConstant.LABEL_TOPIC, topic).build()
);
});
});
}
private void initOtherMetrics() {
RemotingMetricsManager.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
messageStore.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private void processAllGroup(Consumer<ProcessGroupInfo> consumer) {
}

if (isPop) {
String retryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
String retryTopic = KeyBuilder.buildPopRetryTopic(topic, group, brokerConfig.isEnableRetryTopicV2());
TopicConfig retryTopicConfig = topicConfigManager.selectTopicConfig(retryTopic);
if (retryTopicConfig != null) {
int retryTopicPerm = retryTopicConfig.getPerm() & brokerConfig.getBrokerPermission();
Expand All @@ -185,7 +185,7 @@ private void processAllGroup(Consumer<ProcessGroupInfo> consumer) {
continue;
}
}
if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
if (brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
String retryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
TopicConfig retryTopicConfigV1 = topicConfigManager.selectTopicConfig(retryTopicV1);
if (retryTopicConfigV1 != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void update(String attemptId, boolean isRetry, String topic, String group
Set<Long> offsetSet = offsetConsumedCount.keySet();
for (Long offset : offsetSet) {
Integer consumedTimes = offsetConsumedCount.getOrDefault(offset, 0);
ExtraInfoUtil.buildQueueOffsetOrderCountInfo(orderInfoBuilder, isRetry, queueId, offset, consumedTimes);
ExtraInfoUtil.buildQueueOffsetOrderCountInfo(orderInfoBuilder, topic, queueId, offset, consumedTimes);
minConsumedTimes = Math.min(minConsumedTimes, consumedTimes);
}

Expand All @@ -136,7 +136,7 @@ public void update(String attemptId, boolean isRetry, String topic, String group

// for compatibility
// the old pop sdk use queueId to get consumedTimes from orderCountInfo
ExtraInfoUtil.buildQueueIdOrderCountInfo(orderInfoBuilder, isRetry, queueId, minConsumedTimes);
ExtraInfoUtil.buildQueueIdOrderCountInfo(orderInfoBuilder, topic, queueId, minConsumedTimes);
updateLockFreeTimestamp(topic, group, queueId, orderInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
Expand Down Expand Up @@ -906,7 +908,7 @@ public void lockBatchMQAsync(
final LockBatchRequestBody requestBody,
final long timeoutMillis,
final LockCallback callback) throws RemotingException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, new LockBatchMqRequestHeader());

request.setBody(requestBody.encode());
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
Expand Down Expand Up @@ -945,7 +947,7 @@ public void unlockBatchMQAsync(
final UnlockBatchRequestBody requestBody,
final long timeoutMillis,
final UnlockCallback callback) throws RemotingException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, new UnlockBatchMqRequestHeader());

request.setBody(requestBody.encode());

Expand Down Expand Up @@ -1377,7 +1379,7 @@ public CompletableFuture<PullResult> pullMessageFromSpecificBrokerAsync(String b
requestHeader.setSubVersion(System.currentTimeMillis());
requestHeader.setMaxMsgBytes(Integer.MAX_VALUE);
requestHeader.setExpressionType(ExpressionType.TAG);
requestHeader.setBname(brokerName);
requestHeader.setBrokerName(brokerName);

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
CompletableFuture<PullResult> pullResultFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
} else {
// batch ack
consumeGroup = batchAck.getConsumerGroup();
topic = ExtraInfoUtil.getRealTopic(batchAck.getTopic(), batchAck.getConsumerGroup(), ExtraInfoUtil.RETRY_TOPIC.equals(batchAck.getRetry()));
topic = ExtraInfoUtil.getRealTopic(batchAck.getTopic(), batchAck.getConsumerGroup(), batchAck.getRetry());
qId = batchAck.getQueueId();
rqId = batchAck.getReviveQueueId();
startOffset = batchAck.getStartOffset();
Expand Down
Loading

0 comments on commit 17259ae

Please sign in to comment.