diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel index c21f9d114c3..77d456bc16a 100644 --- a/broker/BUILD.bazel +++ b/broker/BUILD.bazel @@ -30,6 +30,7 @@ java_library( "//srvutil", "//store", "//tieredstore", + "@maven//:org_slf4j_slf4j_api", "@maven//:ch_qos_logback_logback_classic", "@maven//:com_alibaba_fastjson", "@maven//:com_alibaba_fastjson2_fastjson2", diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 744aba19118..006695c6bc8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -79,6 +79,7 @@ import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager; import org.apache.rocketmq.broker.out.BrokerOuterAPI; import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin; +import org.apache.rocketmq.broker.pop.PopConsumerService; import org.apache.rocketmq.broker.processor.AckMessageProcessor; import org.apache.rocketmq.broker.processor.AdminBrokerProcessor; import org.apache.rocketmq.broker.processor.ChangeInvisibleTimeProcessor; @@ -198,6 +199,7 @@ public class BrokerController { protected final ConsumerFilterManager consumerFilterManager; protected final ConsumerOrderInfoManager consumerOrderInfoManager; protected final PopInflightMessageCounter popInflightMessageCounter; + protected final PopConsumerService popConsumerService; protected final ProducerManager producerManager; protected final ScheduleMessageService scheduleMessageService; protected final ClientHousekeepingService clientHousekeepingService; @@ -380,6 +382,7 @@ public BrokerController( this.consumerFilterManager = new ConsumerFilterManager(this); this.consumerOrderInfoManager = new ConsumerOrderInfoManager(this); this.popInflightMessageCounter = new PopInflightMessageCounter(this); + this.popConsumerService = brokerConfig.isPopConsumerKVServiceInit() ? new PopConsumerService(this) : null; this.clientHousekeepingService = new ClientHousekeepingService(this); this.broker2Client = new Broker2Client(this); this.scheduleMessageService = new ScheduleMessageService(this); @@ -1314,6 +1317,10 @@ public PopInflightMessageCounter getPopInflightMessageCounter() { return popInflightMessageCounter; } + public PopConsumerService getPopConsumerService() { + return popConsumerService; + } + public ConsumerOffsetManager getConsumerOffsetManager() { return consumerOffsetManager; } @@ -1417,12 +1424,13 @@ protected void shutdownBasicService() { this.pullRequestHoldService.shutdown(); } - { - this.popMessageProcessor.getPopLongPollingService().shutdown(); - this.popMessageProcessor.getQueueLockManager().shutdown(); + if (this.popConsumerService != null) { + this.popConsumerService.shutdown(); } { + this.popMessageProcessor.getPopLongPollingService().shutdown(); + this.popMessageProcessor.getQueueLockManager().shutdown(); this.popMessageProcessor.getPopBufferMergeService().shutdown(); this.ackMessageProcessor.shutdownPopReviveService(); } @@ -1673,18 +1681,26 @@ protected void startBasicService() throws Exception { if (this.popMessageProcessor != null) { this.popMessageProcessor.getPopLongPollingService().start(); - this.popMessageProcessor.getPopBufferMergeService().start(); + if (brokerConfig.isPopConsumerFSServiceInit()) { + this.popMessageProcessor.getPopBufferMergeService().start(); + } this.popMessageProcessor.getQueueLockManager().start(); } if (this.ackMessageProcessor != null) { - this.ackMessageProcessor.startPopReviveService(); + if (brokerConfig.isPopConsumerFSServiceInit()) { + this.ackMessageProcessor.startPopReviveService(); + } } if (this.notificationProcessor != null) { this.notificationProcessor.getPopLongPollingService().start(); } + if (this.popConsumerService != null) { + this.popConsumerService.start(); + } + if (this.topicQueueMappingCleanService != null) { this.topicQueueMappingCleanService.start(); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerCache.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerCache.java new file mode 100644 index 00000000000..e7ce68e0193 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerCache.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.pop; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PopConsumerCache extends ServiceThread { + + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); + private static final long OFFSET_NOT_EXIST = -1L; + + private final BrokerController brokerController; + private final PopConsumerKVStore consumerRecordStore; + private final PopConsumerLockService consumerLockService; + private final Consumer reviveConsumer; + + private final AtomicInteger estimateCacheSize; + private final ConcurrentMap consumerRecordTable; + + public PopConsumerCache(BrokerController brokerController, PopConsumerKVStore consumerRecordStore, + PopConsumerLockService popConsumerLockService, Consumer reviveConsumer) { + + this.reviveConsumer = reviveConsumer; + this.brokerController = brokerController; + this.consumerRecordStore = consumerRecordStore; + this.consumerLockService = popConsumerLockService; + this.estimateCacheSize = new AtomicInteger(); + this.consumerRecordTable = new ConcurrentHashMap<>(); + } + + public String getKey(String groupId, String topicId, int queueId) { + return groupId + "@" + topicId + "@" + queueId; + } + + public String getKey(PopConsumerRecord consumerRecord) { + return consumerRecord.getGroupId() + "@" + consumerRecord.getTopicId() + "@" + consumerRecord.getQueueId(); + } + + public int getCacheKeySize() { + return this.consumerRecordTable.size(); + } + + public int getCacheSize() { + return this.estimateCacheSize.intValue(); + } + + public boolean isCacheFull() { + return this.estimateCacheSize.intValue() > brokerController.getBrokerConfig().getPopCkMaxBufferSize(); + } + + public long getMinOffsetInCache(String groupId, String topicId, int queueId) { + ConsumerRecords consumerRecords = consumerRecordTable.get(this.getKey(groupId, topicId, queueId)); + return consumerRecords != null ? consumerRecords.getMinOffsetInBuffer() : OFFSET_NOT_EXIST; + } + + public long getPopInFlightMessageCount(String groupId, String topicId, int queueId) { + ConsumerRecords consumerRecords = consumerRecordTable.get(this.getKey(groupId, topicId, queueId)); + return consumerRecords != null ? consumerRecords.getInFlightRecordCount() : 0L; + } + + public void writeRecords(List consumerRecordList) { + this.estimateCacheSize.addAndGet(consumerRecordList.size()); + consumerRecordList.forEach(consumerRecord -> { + ConsumerRecords consumerRecords = ConcurrentHashMapUtils.computeIfAbsent(consumerRecordTable, + this.getKey(consumerRecord), k -> new ConsumerRecords(brokerController.getBrokerConfig(), + consumerRecord.getGroupId(), consumerRecord.getTopicId(), consumerRecord.getQueueId())); + assert consumerRecords != null; + consumerRecords.write(consumerRecord); + }); + } + + /** + * Remove the record from the input list then return the content that has not been deleted + */ + public List deleteRecords(List consumerRecordList) { + int total = consumerRecordList.size(); + List remain = new ArrayList<>(); + consumerRecordList.forEach(consumerRecord -> { + ConsumerRecords consumerRecords = consumerRecordTable.get(this.getKey(consumerRecord)); + if (consumerRecords == null || !consumerRecords.delete(consumerRecord)) { + remain.add(consumerRecord); + } + }); + this.estimateCacheSize.addAndGet(remain.size() - total); + return remain; + } + + public int cleanupRecords(Consumer consumer) { + int remain = 0; + Iterator> iterator = consumerRecordTable.entrySet().iterator(); + while (iterator.hasNext()) { + // revive or write record to store + ConsumerRecords records = iterator.next().getValue(); + boolean timeout = consumerLockService.isLockTimeout( + records.getGroupId(), records.getTopicId()); + + if (timeout) { + List removeExpiredRecords = + records.removeExpiredRecords(Long.MAX_VALUE); + if (removeExpiredRecords != null) { + consumerRecordStore.writeRecords(removeExpiredRecords); + } + log.info("PopConsumerOffline, so clean expire records, groupId={}, topic={}, queueId={}, records={}", + records.getGroupId(), records.getTopicId(), records.getQueueId(), + removeExpiredRecords != null ? removeExpiredRecords.size() : 0); + iterator.remove(); + continue; + } + + long currentTime = System.currentTimeMillis(); + List writeConsumerRecords = new ArrayList<>(); + List consumerRecords = records.removeExpiredRecords(currentTime); + if (consumerRecords != null) { + consumerRecords.forEach(consumerRecord -> { + if (consumerRecord.getVisibilityTimeout() <= currentTime) { + consumer.accept(consumerRecord); + } else { + writeConsumerRecords.add(consumerRecord); + } + }); + } + + // write to store and handle it later + consumerRecordStore.writeRecords(writeConsumerRecords); + + // commit min offset in buffer to offset store + long offset = records.getMinOffsetInBuffer(); + if (offset > OFFSET_NOT_EXIST) { + this.commitOffset("PopConsumerCache", + records.getGroupId(), records.getTopicId(), records.getQueueId(), offset); + } + + remain += records.getInFlightRecordCount(); + } + return remain; + } + + public void commitOffset(String clientHost, String groupId, String topicId, int queueId, long offset) { + if (!consumerLockService.tryLock(groupId, topicId)) { + return; + } + try { + ConsumerOffsetManager consumerOffsetManager = brokerController.getConsumerOffsetManager(); + long commit = consumerOffsetManager.queryOffset(groupId, topicId, queueId); + if (commit != OFFSET_NOT_EXIST && offset < commit) { + log.info("PopConsumerCache, consumer offset less than store, " + + "groupId={}, topicId={}, queueId={}, offset={}", groupId, topicId, queueId, offset); + } + consumerOffsetManager.commitOffset(clientHost, groupId, topicId, queueId, offset); + } finally { + consumerLockService.unlock(groupId, topicId); + } + } + + public void removeRecords(String groupId, String topicId, int queueId) { + this.consumerRecordTable.remove(this.getKey(groupId, topicId, queueId)); + } + + @Override + public String getServiceName() { + return PopConsumerCache.class.getSimpleName(); + } + + @Override + public void run() { + while (!this.isStopped()) { + try { + this.waitForRunning(TimeUnit.SECONDS.toMillis(1)); + int cacheSize = this.cleanupRecords(reviveConsumer); + this.estimateCacheSize.set(cacheSize); + } catch (Exception e) { + log.error("PopConsumerCacheService revive error", e); + } + } + } + + protected static class ConsumerRecords { + + private final Lock lock; + private final String groupId; + private final String topicId; + private final int queueId; + private final BrokerConfig brokerConfig; + private final TreeMap recordTreeMap; + + public ConsumerRecords(BrokerConfig brokerConfig, String groupId, String topicId, int queueId) { + this.groupId = groupId; + this.topicId = topicId; + this.queueId = queueId; + this.lock = new ReentrantLock(); + this.brokerConfig = brokerConfig; + this.recordTreeMap = new TreeMap<>(); + } + + public void write(PopConsumerRecord record) { + lock.lock(); + try { + recordTreeMap.put(record.getOffset(), record); + } finally { + lock.unlock(); + } + } + + public boolean delete(PopConsumerRecord record) { + PopConsumerRecord popConsumerRecord; + lock.lock(); + try { + popConsumerRecord = recordTreeMap.remove(record.getOffset()); + } finally { + lock.unlock(); + } + return popConsumerRecord != null; + } + + public long getMinOffsetInBuffer() { + Map.Entry entry = recordTreeMap.firstEntry(); + return entry != null ? entry.getKey() : OFFSET_NOT_EXIST; + } + + public int getInFlightRecordCount() { + return recordTreeMap.size(); + } + + public List removeExpiredRecords(long currentTime) { + List result = null; + lock.lock(); + try { + Iterator> iterator = recordTreeMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + // org.apache.rocketmq.broker.processor.PopBufferMergeService.scan + if (entry.getValue().getVisibilityTimeout() <= currentTime || + entry.getValue().getPopTime() + brokerConfig.getPopCkStayBufferTime() <= currentTime) { + if (result == null) { + result = new ArrayList<>(); + } + result.add(entry.getValue()); + iterator.remove(); + } + } + } finally { + lock.unlock(); + } + return result; + } + + public String getGroupId() { + return groupId; + } + + public String getTopicId() { + return topicId; + } + + public int getQueueId() { + return queueId; + } + + @Override + public String toString() { + return "ConsumerRecords{" + + "lock=" + lock + + ", topicId=" + topicId + + ", groupId=" + groupId + + ", queueId=" + queueId + + ", recordTreeMap=" + recordTreeMap.size() + + '}'; + } + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerContext.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerContext.java new file mode 100644 index 00000000000..09bc4e6b47c --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerContext.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.pop; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; +import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.GetMessageStatus; + +public class PopConsumerContext { + + private final String clientHost; + + private final long popTime; + + private final long invisibleTime; + + private final String groupId; + + private final boolean fifo; + + private final String attemptId; + + private final AtomicLong restCount; + + private final StringBuilder startOffsetInfo; + + private final StringBuilder msgOffsetInfo; + + private final StringBuilder orderCountInfo; + + private List getMessageResultList; + + private List popConsumerRecordList; + + public PopConsumerContext(String clientHost, + long popTime, long invisibleTime, String groupId, boolean fifo, String attemptId) { + + this.clientHost = clientHost; + this.popTime = popTime; + this.invisibleTime = invisibleTime; + this.groupId = groupId; + this.fifo = fifo; + this.attemptId = attemptId; + this.restCount = new AtomicLong(0); + this.startOffsetInfo = new StringBuilder(); + this.msgOffsetInfo = new StringBuilder(); + this.orderCountInfo = new StringBuilder(); + } + + public boolean isFound() { + return getMessageResultList != null && !getMessageResultList.isEmpty(); + } + + // offset is consumer last request offset + public void addGetMessageResult(GetMessageResult result, + String topicId, int queueId, PopConsumerRecord.RetryType retryType, long offset) { + + if (result.getStatus() != GetMessageStatus.FOUND || result.getMessageQueueOffset().isEmpty()) { + return; + } + + if (this.getMessageResultList == null) { + this.getMessageResultList = new ArrayList<>(); + } + + if (this.popConsumerRecordList == null) { + this.popConsumerRecordList = new ArrayList<>(); + } + + this.getMessageResultList.add(result); + this.addRestCount(result.getMaxOffset() - result.getNextBeginOffset()); + + for (int i = 0; i < result.getMessageQueueOffset().size(); i++) { + this.popConsumerRecordList.add(new PopConsumerRecord(popTime, groupId, topicId, queueId, + retryType.getCode(), invisibleTime, result.getMessageQueueOffset().get(i), attemptId)); + } + + ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topicId, queueId, offset); + ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topicId, queueId, result.getMessageQueueOffset()); + } + + public String getClientHost() { + return clientHost; + } + + public String getGroupId() { + return groupId; + } + + public void addRestCount(long delta) { + this.restCount.addAndGet(delta); + } + + public long getRestCount() { + return restCount.get(); + } + + public long getPopTime() { + return popTime; + } + + public boolean isFifo() { + return fifo; + } + + public long getInvisibleTime() { + return invisibleTime; + } + + public String getAttemptId() { + return attemptId; + } + + public int getMessageCount() { + return getMessageResultList != null ? + getMessageResultList.stream().mapToInt(GetMessageResult::getMessageCount).sum() : 0; + } + + public String getStartOffsetInfo() { + return startOffsetInfo.toString(); + } + + public String getMsgOffsetInfo() { + return msgOffsetInfo.toString(); + } + + public StringBuilder getOrderCountInfoBuilder() { + return orderCountInfo; + } + + public String getOrderCountInfo() { + return orderCountInfo.toString(); + } + + public List getGetMessageResultList() { + return getMessageResultList; + } + + public List getPopConsumerRecordList() { + return popConsumerRecordList; + } + + @Override + public String toString() { + return "PopConsumerContext{" + + "clientHost=" + clientHost + + ", popTime=" + popTime + + ", invisibleTime=" + invisibleTime + + ", groupId=" + groupId + + ", isFifo=" + fifo + + ", attemptId=" + attemptId + + ", restCount=" + restCount + + ", startOffsetInfo=" + startOffsetInfo + + ", msgOffsetInfo=" + msgOffsetInfo + + ", orderCountInfo=" + orderCountInfo + + ", getMessageResultList=" + (getMessageResultList != null ? getMessageResultList.size() : 0) + + ", popConsumerRecordList=" + (popConsumerRecordList != null ? popConsumerRecordList.size() : 0) + + '}'; + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerKVStore.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerKVStore.java new file mode 100644 index 00000000000..5569abe3db7 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerKVStore.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.pop; + +import java.util.List; + +public interface PopConsumerKVStore { + + /** + * Starts the storage service. + */ + boolean start(); + + /** + * Shutdown the storage service. + */ + boolean shutdown(); + + /** + * Gets the file path of the storage. + * @return The file path of the storage. + */ + String getFilePath(); + + /** + * Writes a list of consumer records to the storage. + * @param consumerRecordList The list of consumer records to be written. + */ + void writeRecords(List consumerRecordList); + + /** + * Deletes a list of consumer records from the storage. + * @param consumerRecordList The list of consumer records to be deleted. + */ + void deleteRecords(List consumerRecordList); + + /** + * Scans and returns a list of expired consumer records before the current time. + * @param currentTime The current revive checkpoint timestamp. + * @param maxCount The maximum number of records to return. + * @return A list of expired consumer records. + */ + List scanExpiredRecords(long currentTime, int maxCount); +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerLockService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerLockService.java new file mode 100644 index 00000000000..33221430492 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerLockService.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.pop; + +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rocketmq.common.KeyBuilder; +import org.apache.rocketmq.common.PopAckConstants; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PopConsumerLockService { + + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); + + private final long timeout; + private final ConcurrentMap lockTable; + + public PopConsumerLockService(long timeout) { + this.timeout = timeout; + this.lockTable = new ConcurrentHashMap<>(); + } + + public boolean tryLock(String groupId, String topicId) { + return Objects.requireNonNull(ConcurrentHashMapUtils.computeIfAbsent(lockTable, + groupId + PopAckConstants.SPLIT + topicId, s -> new TimedLock())).tryLock(); + } + + public void unlock(String groupId, String topicId) { + TimedLock lock = lockTable.get(groupId + PopAckConstants.SPLIT + topicId); + if (lock != null) { + lock.unlock(); + } + } + + // For retry topics, should lock origin group and topic + public boolean isLockTimeout(String groupId, String topicId) { + topicId = KeyBuilder.parseNormalTopic(topicId, groupId); + TimedLock lock = lockTable.get(groupId + PopAckConstants.SPLIT + topicId); + return lock == null || System.currentTimeMillis() - lock.getLockTime() > timeout; + } + + public void removeTimeout() { + Iterator> iterator = lockTable.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (System.currentTimeMillis() - entry.getValue().getLockTime() > timeout) { + log.info("PopConsumerLockService remove timeout lock, " + + "key={}, locked={}", entry.getKey(), entry.getValue().lock.get()); + iterator.remove(); + } + } + } + + static class TimedLock { + private volatile long lockTime; + private final AtomicBoolean lock; + + public TimedLock() { + this.lockTime = System.currentTimeMillis(); + this.lock = new AtomicBoolean(false); + } + + public boolean tryLock() { + if (lock.compareAndSet(false, true)) { + this.lockTime = System.currentTimeMillis(); + return true; + } + return false; + } + + public void unlock() { + lock.set(false); + } + + public long getLockTime() { + return lockTime; + } + } +} \ No newline at end of file diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java new file mode 100644 index 00000000000..1ee01fea1c8 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.pop; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.annotation.JSONField; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +public class PopConsumerRecord { + + public enum RetryType { + + NORMAL_TOPIC(0), + + RETRY_TOPIC_V1(1), + + RETRY_TOPIC_V2(2); + + private final int code; + + RetryType(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + } + + @JSONField() + private long popTime; + + @JSONField(ordinal = 1) + private String groupId; + + @JSONField(ordinal = 2) + private String topicId; + + @JSONField(ordinal = 3) + private int queueId; + + @JSONField(ordinal = 4) + private int retryFlag; + + @JSONField(ordinal = 5) + private long invisibleTime; + + @JSONField(ordinal = 6) + private long offset; + + @JSONField(ordinal = 7) + private int attemptTimes; + + @JSONField(ordinal = 8) + private String attemptId; + + // used for test and fastjson + public PopConsumerRecord() { + } + + public PopConsumerRecord(long popTime, String groupId, String topicId, int queueId, + int retryFlag, long invisibleTime, long offset, String attemptId) { + + this.popTime = popTime; + this.groupId = groupId; + this.topicId = topicId; + this.queueId = queueId; + this.retryFlag = retryFlag; + this.invisibleTime = invisibleTime; + this.offset = offset; + this.attemptId = attemptId; + } + + @JSONField(serialize = false) + public long getVisibilityTimeout() { + return popTime + invisibleTime; + } + + /** + * Key: timestamp(8) + groupId + topicId + queueId + offset + */ + @JSONField(serialize = false) + public byte[] getKeyBytes() { + int length = Long.BYTES + groupId.length() + 1 + topicId.length() + 1 + Integer.BYTES + 1 + Long.BYTES; + byte[] bytes = new byte[length]; + ByteBuffer buffer = ByteBuffer.wrap(bytes); + buffer.putLong(this.getVisibilityTimeout()); + buffer.put(groupId.getBytes(StandardCharsets.UTF_8)).put((byte) '@'); + buffer.put(topicId.getBytes(StandardCharsets.UTF_8)).put((byte) '@'); + buffer.putInt(queueId).put((byte) '@'); + buffer.putLong(offset); + return bytes; + } + + @JSONField(serialize = false) + public boolean isRetry() { + return retryFlag != 0; + } + + @JSONField(serialize = false) + public byte[] getValueBytes() { + return JSON.toJSONBytes(this); + } + + public static PopConsumerRecord decode(byte[] body) { + return JSONObject.parseObject(body, PopConsumerRecord.class); + } + + public long getPopTime() { + return popTime; + } + + public void setPopTime(long popTime) { + this.popTime = popTime; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + public String getTopicId() { + return topicId; + } + + public void setTopicId(String topicId) { + this.topicId = topicId; + } + + public int getQueueId() { + return queueId; + } + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + public int getRetryFlag() { + return retryFlag; + } + + public void setRetryFlag(int retryFlag) { + this.retryFlag = retryFlag; + } + + public long getInvisibleTime() { + return invisibleTime; + } + + public void setInvisibleTime(long invisibleTime) { + this.invisibleTime = invisibleTime; + } + + public long getOffset() { + return offset; + } + + public void setOffset(long offset) { + this.offset = offset; + } + + public int getAttemptTimes() { + return attemptTimes; + } + + public void setAttemptTimes(int attemptTimes) { + this.attemptTimes = attemptTimes; + } + + public String getAttemptId() { + return attemptId; + } + + public void setAttemptId(String attemptId) { + this.attemptId = attemptId; + } + + @Override + public String toString() { + return "PopDeliveryRecord{" + + "popTime=" + popTime + + ", groupId='" + groupId + '\'' + + ", topicId='" + topicId + '\'' + + ", queueId=" + queueId + + ", retryFlag=" + retryFlag + + ", invisibleTime=" + invisibleTime + + ", offset=" + offset + + ", attemptTimes=" + attemptTimes + + ", attemptId='" + attemptId + '\'' + + '}'; + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java new file mode 100644 index 00000000000..9c940034a95 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.pop; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.config.AbstractRocksDBStorage; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.store.rocksdb.RocksDBOptionsFactory; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactRangeOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PopConsumerRocksdbStore extends AbstractRocksDBStorage implements PopConsumerKVStore { + + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); + private static final byte[] COLUMN_FAMILY_NAME = "popState".getBytes(StandardCharsets.UTF_8); + + private WriteOptions writeOptions; + private WriteOptions deleteOptions; + private ColumnFamilyHandle columnFamilyHandle; + + public PopConsumerRocksdbStore(String filePath) { + super(filePath); + } + + // https://www.cnblogs.com/renjc/p/rocksdb-class-db.html + // https://github.com/johnzeng/rocksdb-doc-cn/blob/master/doc/RocksDB-Tuning-Guide.md + protected void initOptions() { + this.options = RocksDBOptionsFactory.createDBOptions(); + + this.writeOptions = new WriteOptions(); + this.writeOptions.setSync(true); + this.writeOptions.setDisableWAL(false); + this.writeOptions.setNoSlowdown(false); + + this.deleteOptions = new WriteOptions(); + this.deleteOptions.setSync(false); + this.deleteOptions.setLowPri(true); + this.deleteOptions.setDisableWAL(true); + this.deleteOptions.setNoSlowdown(false); + + this.compactRangeOptions = new CompactRangeOptions(); + this.compactRangeOptions.setBottommostLevelCompaction( + CompactRangeOptions.BottommostLevelCompaction.kForce); + this.compactRangeOptions.setAllowWriteStall(true); + this.compactRangeOptions.setExclusiveManualCompaction(false); + this.compactRangeOptions.setChangeLevel(true); + this.compactRangeOptions.setTargetLevel(-1); + this.compactRangeOptions.setMaxSubcompactions(4); + } + + @Override + protected boolean postLoad() { + try { + UtilAll.ensureDirOK(this.dbPath); + initOptions(); + + // init column family here + ColumnFamilyOptions defaultOptions = new ColumnFamilyOptions().optimizeForSmallDb(); + ColumnFamilyOptions popStateOptions = new ColumnFamilyOptions().optimizeForSmallDb(); + this.cfOptions.add(defaultOptions); + this.cfOptions.add(popStateOptions); + + this.options = new DBOptions() + .setCreateIfMissing(true) + .setCreateMissingColumnFamilies(true); + + List cfDescriptors = new ArrayList<>(); + cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions)); + cfDescriptors.add(new ColumnFamilyDescriptor(COLUMN_FAMILY_NAME, popStateOptions)); + this.open(cfDescriptors); + this.defaultCFHandle = cfHandles.get(0); + this.columnFamilyHandle = cfHandles.get(1); + + log.debug("PopConsumerRocksdbStore init, filePath={}", this.dbPath); + } catch (final Exception e) { + log.error("PopConsumerRocksdbStore init error, filePath={}", this.dbPath, e); + return false; + } + return true; + } + + public String getFilePath() { + return this.dbPath; + } + + @Override + public void writeRecords(List consumerRecordList) { + if (!consumerRecordList.isEmpty()) { + try (WriteBatch writeBatch = new WriteBatch()) { + for (PopConsumerRecord record : consumerRecordList) { + writeBatch.put(columnFamilyHandle, record.getKeyBytes(), record.getValueBytes()); + } + this.db.write(writeOptions, writeBatch); + } catch (RocksDBException e) { + throw new RuntimeException("Write record error", e); + } + } + } + + @Override + public void deleteRecords(List consumerRecordList) { + if (!consumerRecordList.isEmpty()) { + try (WriteBatch writeBatch = new WriteBatch()) { + for (PopConsumerRecord record : consumerRecordList) { + writeBatch.delete(columnFamilyHandle, record.getKeyBytes()); + } + this.db.write(deleteOptions, writeBatch); + } catch (RocksDBException e) { + throw new RuntimeException("Delete record error", e); + } + } + } + + @Override + public List scanExpiredRecords(long currentTime, int maxCount) { + // In RocksDB, we can use SstPartitionerFixedPrefixFactory in cfOptions + // and new ColumnFamilyOptions().useFixedLengthPrefixExtractor() to + // configure prefix indexing to improve the performance of scans. + // However, in the current implementation, this is not the bottleneck. + List consumerRecordList = new ArrayList<>(); + try (RocksIterator iterator = db.newIterator(this.columnFamilyHandle)) { + iterator.seekToFirst(); + while (iterator.isValid() && consumerRecordList.size() < maxCount) { + if (ByteBuffer.wrap(iterator.key()).getLong() > currentTime) { + break; + } + consumerRecordList.add(PopConsumerRecord.decode(iterator.value())); + iterator.next(); + } + } + return consumerRecordList; + } + + @Override + protected void preShutdown() { + if (this.writeOptions != null) { + this.writeOptions.close(); + } + if (this.deleteOptions != null) { + this.deleteOptions.close(); + } + if (this.defaultCFHandle != null) { + this.defaultCFHandle.close(); + } + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java new file mode 100644 index 00000000000..fb371dce05f --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -0,0 +1,714 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.pop; + +import com.alibaba.fastjson.JSON; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import java.nio.ByteBuffer; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Triple; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.KeyBuilder; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; +import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; +import org.apache.rocketmq.store.AppendMessageStatus; +import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.GetMessageStatus; +import org.apache.rocketmq.store.MessageFilter; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.apache.rocketmq.store.exception.ConsumeQueueException; +import org.apache.rocketmq.store.pop.PopCheckPoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PopConsumerService extends ServiceThread { + + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); + private static final long OFFSET_NOT_EXIST = -1L; + private static final String ROCKSDB_DIRECTORY = "kvStore"; + private static final int[] REWRITE_INTERVALS_IN_SECONDS = + new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200}; + + private final AtomicBoolean consumerRunning; + private final BrokerConfig brokerConfig; + private final BrokerController brokerController; + private final AtomicLong lastCleanupLockTime; + private final PopConsumerCache popConsumerCache; + private final PopConsumerKVStore popConsumerStore; + private final PopConsumerLockService consumerLockService; + private final ConcurrentMap requestCountTable; + + public PopConsumerService(BrokerController brokerController) { + + this.brokerController = brokerController; + this.brokerConfig = brokerController.getBrokerConfig(); + + this.consumerRunning = new AtomicBoolean(false); + this.requestCountTable = new ConcurrentHashMap<>(); + this.lastCleanupLockTime = new AtomicLong(System.currentTimeMillis()); + this.consumerLockService = new PopConsumerLockService(TimeUnit.MINUTES.toMillis(2)); + this.popConsumerStore = new PopConsumerRocksdbStore(Paths.get( + brokerController.getMessageStoreConfig().getStorePathRootDir(), ROCKSDB_DIRECTORY).toString()); + this.popConsumerCache = brokerConfig.isEnablePopBufferMerge() ? new PopConsumerCache( + brokerController, this.popConsumerStore, this.consumerLockService, this::revive) : null; + + log.info("PopConsumerService init, buffer={}, rocksdb filePath={}", + brokerConfig.isEnablePopBufferMerge(), this.popConsumerStore.getFilePath()); + } + + /** + * In-flight messages are those that have been received from a queue + * by a consumer but have not yet been deleted. For standard queues, + * there is a limit on the number of in-flight messages, depending on queue traffic and message backlog. + */ + public boolean isPopShouldStop(String group, String topic, int queueId) { + return brokerConfig.isEnablePopMessageThreshold() && popConsumerCache != null && + popConsumerCache.getPopInFlightMessageCount(group, topic, queueId) >= + brokerConfig.getPopInflightMessageThreshold(); + } + + public long getPendingFilterCount(String groupId, String topicId, int queueId) { + try { + long maxOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topicId, queueId); + long consumeOffset = this.brokerController.getConsumerOffsetManager().queryOffset(groupId, topicId, queueId); + return maxOffset - consumeOffset; + } catch (ConsumeQueueException e) { + throw new RuntimeException(e); + } + } + + public GetMessageResult recodeRetryMessage(GetMessageResult getMessageResult, + String topicId, long offset, long popTime, long invisibleTime) { + + if (getMessageResult.getMessageCount() == 0 || + getMessageResult.getMessageMapedList().isEmpty()) { + return getMessageResult; + } + + GetMessageResult result = new GetMessageResult(getMessageResult.getMessageCount()); + result.setStatus(GetMessageStatus.FOUND); + String brokerName = brokerConfig.getBrokerName(); + + for (SelectMappedBufferResult bufferResult : getMessageResult.getMessageMapedList()) { + List messageExtList = MessageDecoder.decodesBatch( + bufferResult.getByteBuffer(), true, false, true); + bufferResult.release(); + for (MessageExt messageExt : messageExtList) { + try { + // When override retry message topic to origin topic, + // need clear message store size to recode + String ckInfo = ExtraInfoUtil.buildExtraInfo(offset, popTime, invisibleTime, 0, + messageExt.getTopic(), brokerName, messageExt.getQueueId(), messageExt.getQueueOffset()); + messageExt.getProperties().putIfAbsent(MessageConst.PROPERTY_POP_CK, ckInfo); + messageExt.setTopic(topicId); + messageExt.setStoreSize(0); + byte[] encode = MessageDecoder.encode(messageExt, false); + ByteBuffer buffer = ByteBuffer.wrap(encode); + SelectMappedBufferResult tmpResult = new SelectMappedBufferResult( + bufferResult.getStartOffset(), buffer, encode.length, null); + result.addMessage(tmpResult); + } catch (Exception e) { + log.error("PopConsumerService exception in recode retry message, topic={}", topicId, e); + } + } + } + + return result; + } + + public PopConsumerContext addGetMessageResult(PopConsumerContext context, GetMessageResult result, + String topicId, int queueId, PopConsumerRecord.RetryType retryType, long offset) { + + if (result.getStatus() == GetMessageStatus.FOUND && !result.getMessageQueueOffset().isEmpty()) { + if (context.isFifo()) { + this.setFifoBlocked(context, context.getGroupId(), topicId, queueId, result.getMessageQueueOffset()); + } + + // build request header here + context.addGetMessageResult(result, topicId, queueId, retryType, offset); + + if (brokerConfig.isPopConsumerKVServiceLog()) { + log.info("PopConsumerService pop, time={}, invisible={}, " + + "groupId={}, topic={}, queueId={}, offset={}, attemptId={}", + context.getPopTime(), context.getInvisibleTime(), context.getGroupId(), + topicId, queueId, result.getMessageQueueOffset(), context.getAttemptId()); + } + } + + if (!context.isFifo() && result.getNextBeginOffset() > OFFSET_NOT_EXIST) { + this.brokerController.getConsumerOffsetManager().commitPullOffset( + context.getClientHost(), context.getGroupId(), topicId, queueId, result.getNextBeginOffset()); + long commitOffset = result.getStatus() == GetMessageStatus.FOUND ? offset : result.getNextBeginOffset(); + if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache != null) { + long minOffset = popConsumerCache.getMinOffsetInCache(context.getGroupId(), topicId, queueId); + if (minOffset != OFFSET_NOT_EXIST) { + commitOffset = minOffset; + } + } + this.brokerController.getConsumerOffsetManager().commitOffset( + context.getClientHost(), context.getGroupId(), topicId, queueId, commitOffset); + } + + return context; + } + + public CompletableFuture getMessageAsync(String clientHost, + String groupId, String topicId, int queueId, long offset, int batchSize, MessageFilter filter) { + + log.debug("PopConsumerService getMessageAsync, groupId={}, topicId={}, queueId={}, offset={}, batchSize={}, filter={}", + groupId, topicId, offset, queueId, batchSize, filter != null); + + CompletableFuture getMessageFuture = + brokerController.getMessageStore().getMessageAsync(groupId, topicId, queueId, offset, batchSize, filter); + + // refer org.apache.rocketmq.broker.processor.PopMessageProcessor#popMsgFromQueue + return getMessageFuture.thenCompose(result -> { + if (result == null) { + return CompletableFuture.completedFuture(null); + } + + // maybe store offset is not correct. + if (GetMessageStatus.OFFSET_TOO_SMALL.equals(result.getStatus()) || + GetMessageStatus.OFFSET_OVERFLOW_BADLY.equals(result.getStatus()) || + GetMessageStatus.OFFSET_FOUND_NULL.equals(result.getStatus())) { + + // commit offset, because the offset is not correct + // If offset in store is greater than cq offset, it will cause duplicate messages, + // because offset in PopBuffer is not committed. + this.brokerController.getConsumerOffsetManager().commitOffset( + clientHost, groupId, topicId, queueId, result.getNextBeginOffset()); + + log.warn("PopConsumerService getMessageAsync, initial offset because store is no correct, " + + "groupId={}, topicId={}, queueId={}, batchSize={}, offset={}->{}", + groupId, topicId, queueId, batchSize, offset, result.getNextBeginOffset()); + + return brokerController.getMessageStore().getMessageAsync( + groupId, topicId, queueId, result.getNextBeginOffset(), batchSize, filter); + } + + return CompletableFuture.completedFuture(result); + + }).whenComplete((result, throwable) -> { + if (throwable != null) { + log.error("Pop getMessageAsync error", throwable); + } + }); + } + + /** + * Fifo message does not have retry feature in broker + */ + public void setFifoBlocked(PopConsumerContext context, + String groupId, String topicId, int queueId, List queueOffsetList) { + brokerController.getConsumerOrderInfoManager().update( + context.getAttemptId(), false, topicId, groupId, queueId, + context.getPopTime(), context.getInvisibleTime(), queueOffsetList, context.getOrderCountInfoBuilder()); + } + + public boolean isFifoBlocked(PopConsumerContext context, String groupId, String topicId, int queueId) { + return brokerController.getConsumerOrderInfoManager().checkBlock( + context.getAttemptId(), topicId, groupId, queueId, context.getInvisibleTime()); + } + + protected CompletableFuture getMessageAsync(CompletableFuture future, + String clientHost, String groupId, String topicId, int queueId, int batchSize, MessageFilter filter, + PopConsumerRecord.RetryType retryType) { + + return future.thenCompose(result -> { + + // pop request too much, should not add rest count here + if (isPopShouldStop(groupId, topicId, queueId)) { + return CompletableFuture.completedFuture(result); + } + + // Current requests would calculate the total number of messages + // waiting to be filtered for new message arrival notifications in + // the long-polling service, need disregarding the backlog in order + // consumption scenario. If rest message num including the blocked + // queue accumulation would lead to frequent unnecessary wake-ups + // of long-polling requests, resulting unnecessary CPU usage. + // When client ack message, long-polling request would be notifications + // by AckMessageProcessor.ackOrderly() and message will not be delayed. + if (result.isFifo() && isFifoBlocked(result, groupId, topicId, queueId)) { + // should not add accumulation(max offset - consumer offset) here + return CompletableFuture.completedFuture(result); + } + + int remain = batchSize - result.getMessageCount(); + if (remain <= 0) { + result.addRestCount(this.getPendingFilterCount(groupId, topicId, queueId)); + return CompletableFuture.completedFuture(result); + } else { + long consumeOffset = brokerController.getConsumerOffsetManager().queryPullOffset(groupId, topicId, queueId); + return getMessageAsync(clientHost, groupId, topicId, queueId, consumeOffset, remain, filter) + .thenApply(getMessageResult -> addGetMessageResult( + result, getMessageResult, topicId, queueId, retryType, consumeOffset)); + } + }); + } + + public CompletableFuture popAsync(String clientHost, long popTime, long invisibleTime, + String groupId, String topicId, int queueId, int batchSize, boolean fifo, String attemptId, + MessageFilter filter) { + + PopConsumerContext popConsumerContext = + new PopConsumerContext(clientHost, popTime, invisibleTime, groupId, fifo, attemptId); + + TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topicId); + if (topicConfig == null || !consumerLockService.tryLock(groupId, topicId)) { + return CompletableFuture.completedFuture(popConsumerContext); + } + + log.debug("PopConsumerService popAsync, groupId={}, topicId={}, queueId={}, " + + "batchSize={}, invisibleTime={}, fifo={}, attemptId={}, filter={}", + groupId, topicId, queueId, batchSize, invisibleTime, fifo, attemptId, filter); + + String requestKey = groupId + "@" + topicId; + String retryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topicId, groupId); + String retryTopicV2 = KeyBuilder.buildPopRetryTopicV2(topicId, groupId); + long requestCount = Objects.requireNonNull(ConcurrentHashMapUtils.computeIfAbsent( + requestCountTable, requestKey, k -> new AtomicLong(0L))).getAndIncrement(); + boolean preferRetry = requestCount % 5L == 0L; + + CompletableFuture getMessageFuture = + CompletableFuture.completedFuture(popConsumerContext); + + try { + if (!fifo && preferRetry) { + if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { + getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId, + retryTopicV1, 0, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V1); + } + + if (brokerConfig.isEnableRetryTopicV2()) { + getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId, + retryTopicV2, 0, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V2); + } + } + + if (queueId != -1) { + getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId, + topicId, queueId, batchSize, filter, PopConsumerRecord.RetryType.NORMAL_TOPIC); + } else { + for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { + int current = (int) ((requestCount + i) % topicConfig.getReadQueueNums()); + getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId, + topicId, current, batchSize, filter, PopConsumerRecord.RetryType.NORMAL_TOPIC); + } + + if (!fifo && !preferRetry) { + if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { + getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId, + retryTopicV1, 0, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V1); + } + + if (brokerConfig.isEnableRetryTopicV2()) { + getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId, + retryTopicV2, 0, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V2); + } + } + } + + return getMessageFuture.thenCompose(result -> { + if (result.isFound() && !result.isFifo()) { + if (brokerConfig.isEnablePopBufferMerge() && + popConsumerCache != null && !popConsumerCache.isCacheFull()) { + this.popConsumerCache.writeRecords(result.getPopConsumerRecordList()); + } else { + this.popConsumerStore.writeRecords(result.getPopConsumerRecordList()); + } + + for (int i = 0; i < result.getGetMessageResultList().size(); i++) { + GetMessageResult getMessageResult = result.getGetMessageResultList().get(i); + PopConsumerRecord popConsumerRecord = result.getPopConsumerRecordList().get(i); + + // If the buffer belong retries message, the message needs to be re-encoded. + // The buffer should not be re-encoded when popResponseReturnActualRetryTopic + // is true or the current topic is not a retry topic. + boolean recode = brokerConfig.isPopResponseReturnActualRetryTopic(); + if (recode && popConsumerRecord.isRetry()) { + result.getGetMessageResultList().set(i, this.recodeRetryMessage( + getMessageResult, popConsumerRecord.getTopicId(), + popConsumerRecord.getQueueId(), result.getPopTime(), invisibleTime)); + } + } + } + return CompletableFuture.completedFuture(result); + }).whenComplete((result, throwable) -> { + try { + if (throwable != null) { + log.error("PopConsumerService popAsync get message error", + throwable instanceof CompletionException ? throwable.getCause() : throwable); + } + if (result.getMessageCount() > 0) { + log.debug("PopConsumerService popAsync result, found={}, groupId={}, topicId={}, queueId={}, " + + "batchSize={}, invisibleTime={}, fifo={}, attemptId={}, filter={}", result.getMessageCount(), + groupId, topicId, queueId, batchSize, invisibleTime, fifo, attemptId, filter); + } + } finally { + consumerLockService.unlock(groupId, topicId); + } + }); + } catch (Throwable t) { + log.error("PopConsumerService popAsync error", t); + } + + return getMessageFuture; + } + + // Notify polling request when receive orderly ack + public CompletableFuture ackAsync( + long popTime, long invisibleTime, String groupId, String topicId, int queueId, long offset) { + + if (brokerConfig.isPopConsumerKVServiceLog()) { + log.info("PopConsumerService ack, time={}, invisible={}, groupId={}, topic={}, queueId={}, offset={}", + popTime, invisibleTime, groupId, topicId, queueId, offset); + } + + PopConsumerRecord record = new PopConsumerRecord( + popTime, groupId, topicId, queueId, 0, invisibleTime, offset, null); + + if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache != null) { + if (popConsumerCache.deleteRecords(Collections.singletonList(record)).isEmpty()) { + return CompletableFuture.completedFuture(true); + } + } + + this.popConsumerStore.deleteRecords(Collections.singletonList(record)); + return CompletableFuture.completedFuture(true); + } + + // refer ChangeInvisibleTimeProcessor.appendCheckPointThenAckOrigin + public void changeInvisibilityDuration(long popTime, long invisibleTime, + long changedPopTime, long changedInvisibleTime, String groupId, String topicId, int queueId, long offset) { + + if (brokerConfig.isPopConsumerKVServiceLog()) { + log.info("PopConsumerService change, time={}, invisible={}, " + + "groupId={}, topic={}, queueId={}, offset={}, new time={}, new invisible={}", + popTime, invisibleTime, groupId, topicId, queueId, offset, changedPopTime, changedInvisibleTime); + } + + PopConsumerRecord ckRecord = new PopConsumerRecord( + changedPopTime, groupId, topicId, queueId, 0, changedInvisibleTime, offset, null); + + PopConsumerRecord ackRecord = new PopConsumerRecord( + popTime, groupId, topicId, queueId, 0, invisibleTime, offset, null); + + this.popConsumerStore.writeRecords(Collections.singletonList(ckRecord)); + + if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache != null) { + if (popConsumerCache.deleteRecords(Collections.singletonList(ackRecord)).isEmpty()) { + return; + } + } + + this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord)); + } + + // Use broker escape bridge to support remote read + public CompletableFuture> getMessageAsync(PopConsumerRecord consumerRecord) { + return this.brokerController.getEscapeBridge().getMessageAsync(consumerRecord.getTopicId(), + consumerRecord.getOffset(), consumerRecord.getQueueId(), brokerConfig.getBrokerName(), false); + } + + public CompletableFuture revive(PopConsumerRecord record) { + return this.getMessageAsync(record) + .thenCompose(result -> { + if (result == null) { + log.error("PopConsumerService revive error, message may be lost, record={}", record); + return CompletableFuture.completedFuture(false); + } + // true in triple right means get message needs to be retried + if (result.getLeft() == null) { + log.info("PopConsumerService revive no need retry, record={}", record); + return CompletableFuture.completedFuture(!result.getRight()); + } + return CompletableFuture.completedFuture(this.reviveRetry(record, result.getLeft())); + }); + } + + public void clearCache(String groupId, String topicId, int queueId) { + while (consumerLockService.tryLock(groupId, topicId)) { + } + try { + if (popConsumerCache != null) { + popConsumerCache.removeRecords(groupId, topicId, queueId); + } + } finally { + consumerLockService.unlock(groupId, topicId); + } + } + + public long revive(long currentTime, int maxCount) { + Stopwatch stopwatch = Stopwatch.createStarted(); + List consumerRecords = + this.popConsumerStore.scanExpiredRecords(currentTime, maxCount); + Queue failureList = new LinkedBlockingQueue<>(); + List> futureList = new ArrayList<>(consumerRecords.size()); + + // could merge read operation here + for (PopConsumerRecord record : consumerRecords) { + futureList.add(this.revive(record).thenAccept(result -> { + if (!result) { + if (record.getAttemptTimes() < brokerConfig.getPopReviveMaxAttemptTimes()) { + long backoffInterval = 1000L * REWRITE_INTERVALS_IN_SECONDS[ + Math.min(REWRITE_INTERVALS_IN_SECONDS.length, record.getAttemptTimes())]; + record.setInvisibleTime(record.getInvisibleTime() + backoffInterval); + record.setAttemptTimes(record.getAttemptTimes() + 1); + failureList.add(record); + log.warn("PopConsumerService revive backoff retry, record={}", record); + } else { + log.error("PopConsumerService drop record, message may be lost, record={}", record); + } + } + })); + } + + CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join(); + this.popConsumerStore.writeRecords(new ArrayList<>(failureList)); + this.popConsumerStore.deleteRecords(consumerRecords); + + if (brokerConfig.isEnablePopBufferMerge()) { + log.info("PopConsumerService, key size={}, cache size={}, revive count={}, failure count={}, cost={}ms", + popConsumerCache.getCacheKeySize(), popConsumerCache.getCacheSize(), consumerRecords.size(), + failureList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } else { + log.info("PopConsumerService, revive count={}, failure count={}, cost={}ms", + consumerRecords.size(), failureList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } + + return consumerRecords.size(); + } + + public void createRetryTopicIfNeeded(String groupId, String topicId) { + TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topicId); + if (topicConfig != null) { + return; + } + + topicConfig = new TopicConfig(topicId, 1, 1, + PermName.PERM_READ | PermName.PERM_WRITE, 0); + topicConfig.setTopicFilterType(TopicFilterType.SINGLE_TAG); + brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); + + long offset = this.brokerController.getConsumerOffsetManager().queryOffset(groupId, topicId, 0); + if (offset < 0) { + this.brokerController.getConsumerOffsetManager().commitOffset( + "InitPopOffset", groupId, topicId, 0, 0); + } + } + + @SuppressWarnings("DuplicatedCode") + // org.apache.rocketmq.broker.processor.PopReviveService#reviveRetry + public boolean reviveRetry(PopConsumerRecord record, MessageExt messageExt) { + + if (brokerConfig.isPopConsumerKVServiceLog()) { + log.info("PopConsumerService revive, time={}, invisible={}, groupId={}, topic={}, queueId={}, offset={}", + record.getPopTime(), record.getInvisibleTime(), record.getGroupId(), record.getTopicId(), + record.getQueueId(), record.getOffset()); + } + + boolean retry = StringUtils.startsWith(record.getTopicId(), MixAll.RETRY_GROUP_TOPIC_PREFIX); + String retryTopic = retry ? record.getTopicId() : KeyBuilder.buildPopRetryTopic( + record.getTopicId(), record.getGroupId(), brokerConfig.isEnableRetryTopicV2()); + this.createRetryTopicIfNeeded(record.getGroupId(), retryTopic); + + // deep copy here + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setTopic(retryTopic); + msgInner.setBody(messageExt.getBody() != null ? messageExt.getBody() : new byte[] {}); + msgInner.setQueueId(0); + if (messageExt.getTags() != null) { + msgInner.setTags(messageExt.getTags()); + } else { + MessageAccessor.setProperties(msgInner, new HashMap<>()); + } + + msgInner.setBornTimestamp(messageExt.getBornTimestamp()); + msgInner.setFlag(messageExt.getFlag()); + msgInner.setSysFlag(messageExt.getSysFlag()); + msgInner.setBornHost(brokerController.getStoreHost()); + msgInner.setStoreHost(brokerController.getStoreHost()); + msgInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1); + msgInner.getProperties().putAll(messageExt.getProperties()); + + // set first pop time here + if (messageExt.getReconsumeTimes() == 0 || + msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) { + msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(record.getPopTime())); + } + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + + PutMessageResult putMessageResult = + brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); + + if (brokerConfig.isEnablePopLog()) { + log.debug("PopConsumerService revive retry msg, put status={}, ck={}, delay={}ms", + putMessageResult, JSON.toJSONString(record), System.currentTimeMillis() - record.getVisibilityTimeout()); + } + + if (putMessageResult.getAppendMessageResult() == null || + putMessageResult.getAppendMessageResult().getStatus() != AppendMessageStatus.PUT_OK) { + log.error("PopConsumerService revive retry msg error, put status={}, ck={}, delay={}ms", + putMessageResult, JSON.toJSONString(record), System.currentTimeMillis() - record.getVisibilityTimeout()); + return false; + } + + if (this.brokerController.getBrokerStatsManager() != null) { + this.brokerController.getBrokerStatsManager().incBrokerPutNums(msgInner.getTopic(), 1); + this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic()); + this.brokerController.getBrokerStatsManager().incTopicPutSize( + msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); + } + return true; + } + + // Export kv store record to revive topic + @SuppressWarnings("ExtractMethodRecommender") + public synchronized void transferToFsStore() { + Stopwatch stopwatch = Stopwatch.createStarted(); + while (true) { + try { + List consumerRecords = this.popConsumerStore.scanExpiredRecords( + Long.MAX_VALUE, brokerConfig.getPopReviveMaxReturnSizePerRead()); + if (consumerRecords == null || consumerRecords.isEmpty()) { + break; + } + for (PopConsumerRecord record : consumerRecords) { + PopCheckPoint ck = new PopCheckPoint(); + ck.setBitMap(0); + ck.setNum((byte) 1); + ck.setPopTime(record.getPopTime()); + ck.setInvisibleTime(record.getInvisibleTime()); + ck.setStartOffset(record.getOffset()); + ck.setCId(record.getGroupId()); + ck.setTopic(record.getTopicId()); + ck.setQueueId(record.getQueueId()); + ck.setBrokerName(brokerConfig.getBrokerName()); + ck.addDiff(0); + ck.setRePutTimes(ck.getRePutTimes()); + int reviveQueueId = (int) record.getOffset() % brokerConfig.getReviveQueueNum(); + MessageExtBrokerInner ckMsg = + brokerController.getPopMessageProcessor().buildCkMsg(ck, reviveQueueId); + brokerController.getMessageStore().asyncPutMessage(ckMsg).join(); + } + log.info("PopConsumerStore transfer from kvStore to fsStore, count={}", consumerRecords.size()); + this.popConsumerStore.deleteRecords(consumerRecords); + this.waitForRunning(1); + } catch (Throwable t) { + log.error("PopConsumerStore transfer from kvStore to fsStore failure", t); + } + } + log.info("PopConsumerStore transfer to fsStore finish, cost={}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } + + @Override + public String getServiceName() { + return PopConsumerService.class.getSimpleName(); + } + + @VisibleForTesting + protected PopConsumerKVStore getPopConsumerStore() { + return popConsumerStore; + } + + public PopConsumerLockService getConsumerLockService() { + return consumerLockService; + } + + @Override + public void start() { + if (!this.popConsumerStore.start()) { + throw new RuntimeException("PopConsumerStore init error"); + } + if (this.popConsumerCache != null) { + this.popConsumerCache.start(); + } + super.start(); + } + + @Override + public void shutdown() { + // Block shutdown thread until write records finish + super.shutdown(); + do { + this.waitForRunning(10); + } + while (consumerRunning.get()); + if (this.popConsumerCache != null) { + this.popConsumerCache.shutdown(); + } + if (this.popConsumerStore != null) { + this.popConsumerStore.shutdown(); + } + } + + @Override + public void run() { + this.consumerRunning.set(true); + while (!isStopped()) { + try { + // to prevent concurrency issues during read and write operations + long reviveCount = this.revive(System.currentTimeMillis() - 50L, + brokerConfig.getPopReviveMaxReturnSizePerRead()); + + long current = System.currentTimeMillis(); + if (lastCleanupLockTime.get() + TimeUnit.MINUTES.toMillis(1) < current) { + this.consumerLockService.removeTimeout(); + this.lastCleanupLockTime.set(current); + } + + if (reviveCount == 0) { + this.waitForRunning(500); + } + } catch (Exception e) { + log.error("PopConsumerService revive error", e); + this.waitForRunning(500); + } + } + this.consumerRunning.set(false); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java index 043ef13f5a9..23a4f6167c6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java @@ -23,6 +23,9 @@ import java.nio.charset.StandardCharsets; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.metrics.PopMetricsManager; +import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; +import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager; +import org.apache.rocketmq.broker.pop.PopConsumerLockService; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.PopAckConstants; import org.apache.rocketmq.common.TopicConfig; @@ -50,6 +53,7 @@ import org.apache.rocketmq.store.pop.BatchAckMsg; public class AckMessageProcessor implements NettyRequestProcessor { + private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); private final BrokerController brokerController; private final String reviveTopic; @@ -57,7 +61,8 @@ public class AckMessageProcessor implements NettyRequestProcessor { public AckMessageProcessor(final BrokerController brokerController) { this.brokerController = brokerController; - this.reviveTopic = PopAckConstants.buildClusterReviveTopic(this.brokerController.getBrokerConfig().getBrokerClusterName()); + this.reviveTopic = PopAckConstants.buildClusterReviveTopic( + this.brokerController.getBrokerConfig().getBrokerClusterName()); this.popReviveServices = new PopReviveService[this.brokerController.getBrokerConfig().getReviveQueueNum()]; for (int i = 0; i < this.brokerController.getBrokerConfig().getReviveQueueNum(); i++) { this.popReviveServices[i] = new PopReviveService(brokerController, reviveTopic, i); @@ -149,8 +154,11 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re response.setRemark(errorInfo); return response; } - - appendAck(requestHeader, null, response, channel, null); + if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) { + appendAckNew(requestHeader, null, response, channel, null); + } else { + appendAck(requestHeader, null, response, channel, null); + } } else if (request.getCode() == RequestCode.BATCH_ACK_MESSAGE) { if (request.getBody() != null) { reqBody = BatchAckMessageRequestBody.decode(request.getBody(), BatchAckMessageRequestBody.class); @@ -160,7 +168,11 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re return response; } for (BatchAck bAck : reqBody.getAcks()) { - appendAck(null, bAck, response, channel, reqBody.getBrokerName()); + if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) { + appendAckNew(null, bAck, response, channel, reqBody.getBrokerName()); + } else { + appendAck(null, bAck, response, channel, reqBody.getBrokerName()); + } } } else { POP_LOGGER.error("AckMessageProcessor failed to process RequestCode: {}, consumer: {} ", request.getCode(), RemotingHelper.parseChannelRemoteAddr(channel)); @@ -296,6 +308,74 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA } } + private void appendAckNew(final AckMessageRequestHeader requestHeader, final BatchAck batchAck, + final RemotingCommand response, final Channel channel, String brokerName) throws RemotingCommandException { + + if (requestHeader != null && batchAck == null) { + String[] extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo()); + String groupId = requestHeader.getConsumerGroup(); + String topicId = requestHeader.getTopic(); + int queueId = requestHeader.getQueueId(); + long ackOffset = requestHeader.getOffset(); + long popTime = ExtraInfoUtil.getPopTime(extraInfo); + long invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo); + + int reviveQueueId = ExtraInfoUtil.getReviveQid(extraInfo); + if (reviveQueueId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) { + ackOrderlyNew(topicId, groupId, queueId, ackOffset, popTime, invisibleTime, channel, response); + } else { + this.brokerController.getPopConsumerService().ackAsync( + popTime, invisibleTime, groupId, topicId, queueId, ackOffset); + } + + this.brokerController.getBrokerStatsManager().incBrokerAckNums(1); + this.brokerController.getBrokerStatsManager().incGroupAckNums(groupId, topicId, 1); + } else { + String groupId = batchAck.getConsumerGroup(); + String topicId = ExtraInfoUtil.getRealTopic( + batchAck.getTopic(), batchAck.getConsumerGroup(), batchAck.getRetry()); + int queueId = batchAck.getQueueId(); + int reviveQueueId = batchAck.getReviveQueueId(); + long startOffset = batchAck.getStartOffset(); + long popTime = batchAck.getPopTime(); + long invisibleTime = batchAck.getInvisibleTime(); + + try { + long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topicId, queueId); + long maxOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topicId, queueId); + if (minOffset == -1 || maxOffset == -1) { + POP_LOGGER.error("Illegal topic or queue found when batch ack {}", batchAck); + return; + } + + int ackCount = 0; + // Maintain consistency with the old implementation code style + BitSet bitSet = batchAck.getBitSet(); + for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) { + if (i == Integer.MAX_VALUE) { + break; + } + long offset = startOffset + i; + if (offset < minOffset || offset > maxOffset) { + continue; + } + if (reviveQueueId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) { + ackOrderlyNew(topicId, groupId, queueId, offset, popTime, invisibleTime, channel, response); + } else { + this.brokerController.getPopConsumerService().ackAsync( + popTime, invisibleTime, groupId, topicId, queueId, offset); + } + ackCount++; + } + + this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount); + this.brokerController.getBrokerStatsManager().incGroupAckNums(groupId, topicId, ackCount); + } catch (ConsumeQueueException e) { + throw new RemotingCommandException("Failed to ack message", e); + } + } + } + private void handlePutMessageResult(PutMessageResult putMessageResult, AckMsg ackMsg, String topic, String consumeGroup, long popTime, int qId, int ackCount) { if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK @@ -323,9 +403,7 @@ protected void ackOrderly(String topic, String consumeGroup, int qId, long ackOf return; } long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext( - topic, consumeGroup, - qId, ackOffset, - popTime); + topic, consumeGroup, qId, ackOffset, popTime); if (nextOffset > -1) { if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset(topic, consumeGroup, qId)) { this.brokerController.getConsumerOffsetManager().commitOffset( @@ -347,4 +425,55 @@ protected void ackOrderly(String topic, String consumeGroup, int qId, long ackOf } brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, 1); } + + protected void ackOrderlyNew(String topic, String consumeGroup, int qId, long ackOffset, long popTime, + long invisibleTime, Channel channel, RemotingCommand response) { + + ConsumerOffsetManager consumerOffsetManager = this.brokerController.getConsumerOffsetManager(); + ConsumerOrderInfoManager consumerOrderInfoManager = brokerController.getConsumerOrderInfoManager(); + PopConsumerLockService consumerLockService = this.brokerController.getPopConsumerService().getConsumerLockService(); + + long oldOffset = consumerOffsetManager.queryOffset(consumeGroup, topic, qId); + if (ackOffset < oldOffset) { + return; + } + + while (!consumerLockService.tryLock(consumeGroup, topic)) { + } + + try { + // double check + oldOffset = consumerOffsetManager.queryOffset(consumeGroup, topic, qId); + if (ackOffset < oldOffset) { + return; + } + + long nextOffset = consumerOrderInfoManager.commitAndNext(topic, consumeGroup, qId, ackOffset, popTime); + if (brokerController.getBrokerConfig().isPopConsumerKVServiceLog()) { + POP_LOGGER.info("PopConsumerService ack orderly, time={}, topicId={}, groupId={}, queueId={}, " + + "offset={}, next={}", popTime, topic, consumeGroup, qId, ackOffset, nextOffset); + } + + if (nextOffset > -1L) { + if (!consumerOffsetManager.hasOffsetReset(topic, consumeGroup, qId)) { + String remoteAddress = RemotingHelper.parseSocketAddressAddr(channel.remoteAddress()); + consumerOffsetManager.commitOffset(remoteAddress, consumeGroup, topic, qId, nextOffset); + } + if (!consumerOrderInfoManager.checkBlock(null, topic, consumeGroup, qId, invisibleTime)) { + this.brokerController.getPopMessageProcessor().notifyMessageArriving(topic, qId, consumeGroup); + } + return; + } + + if (nextOffset == -1) { + String errorInfo = String.format("offset is illegal, key:%s %s %s, old:%d, commit:%d, next:%d, %s", + consumeGroup, topic, qId, oldOffset, ackOffset, nextOffset, channel.remoteAddress()); + POP_LOGGER.warn(errorInfo); + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + response.setRemark(errorInfo); + } + } finally { + consumerLockService.unlock(consumeGroup, topic); + } + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index ffac714c1ba..58568739557 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -406,6 +406,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return this.getAcl(ctx, request); case RequestCode.AUTH_LIST_ACL: return this.listAcl(ctx, request); + case RequestCode.POP_ROLLBACK: + return this.transferPopToFsStore(ctx, request); default: return getUnknownCmdResponse(ctx, request); } @@ -2186,6 +2188,9 @@ private RemotingCommand resetOffsetInner(String topic, String group, int queueId String brokerName = brokerController.getBrokerConfig().getBrokerName(); for (Map.Entry entry : queueOffsetMap.entrySet()) { brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(topic, group, entry.getKey()); + if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) { + brokerController.getPopConsumerService().clearCache(group, topic, queueId); + } body.getOffsetTable().put(new MessageQueue(topic, brokerName, entry.getKey()), entry.getValue()); } @@ -3521,4 +3526,19 @@ private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) { } return cqUnit1.getTagsCode() == cqUnit2.getTagsCode(); } + + private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, RemotingCommand request) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + try { + if (brokerController.getPopConsumerService() != null) { + brokerController.getPopConsumerService().transferToFsStore(); + } + response.setCode(ResponseCode.SUCCESS); + } catch (Exception e) { + LOGGER.error("PopConsumerStore transfer from kvStore to fsStore finish [{}]", request, e); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(e.getMessage()); + } + return response; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java index d29ff2a55b0..a7180f66545 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java @@ -24,6 +24,9 @@ import java.nio.charset.StandardCharsets; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.metrics.PopMetricsManager; +import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; +import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager; +import org.apache.rocketmq.broker.pop.PopConsumerLockService; import org.apache.rocketmq.common.PopAckConstants; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.LoggerName; @@ -133,15 +136,36 @@ public CompletableFuture processRequestAsync(final Channel chan } String[] extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo()); + if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) { + if (ExtraInfoUtil.isOrder(extraInfo)) { + return this.processChangeInvisibleTimeForOrderNew( + requestHeader, extraInfo, response, responseHeader); + } + try { + long current = System.currentTimeMillis(); + brokerController.getPopConsumerService().changeInvisibilityDuration( + ExtraInfoUtil.getPopTime(extraInfo), ExtraInfoUtil.getInvisibleTime(extraInfo), current, + requestHeader.getInvisibleTime(), requestHeader.getConsumerGroup(), requestHeader.getTopic(), + requestHeader.getQueueId(), requestHeader.getOffset()); + responseHeader.setInvisibleTime(requestHeader.getInvisibleTime()); + responseHeader.setPopTime(current); + responseHeader.setReviveQid(ExtraInfoUtil.getReviveQid(extraInfo)); + } catch (Exception e) { + response.setCode(ResponseCode.SYSTEM_ERROR); + } + return CompletableFuture.completedFuture(response); + } if (ExtraInfoUtil.isOrder(extraInfo)) { - return CompletableFuture.completedFuture(processChangeInvisibleTimeForOrder(requestHeader, extraInfo, response, responseHeader)); + return CompletableFuture.completedFuture( + processChangeInvisibleTimeForOrder(requestHeader, extraInfo, response, responseHeader)); } // add new ck long now = System.currentTimeMillis(); + CompletableFuture futureResult = appendCheckPointThenAckOrigin(requestHeader, + ExtraInfoUtil.getReviveQid(extraInfo), requestHeader.getQueueId(), requestHeader.getOffset(), now, extraInfo); - CompletableFuture futureResult = appendCheckPointThenAckOrigin(requestHeader, ExtraInfoUtil.getReviveQid(extraInfo), requestHeader.getQueueId(), requestHeader.getOffset(), now, extraInfo); return futureResult.thenCompose(result -> { if (result) { responseHeader.setInvisibleTime(requestHeader.getInvisibleTime()); @@ -154,6 +178,50 @@ public CompletableFuture processRequestAsync(final Channel chan }); } + @SuppressWarnings({"StatementWithEmptyBody", "DuplicatedCode"}) + public CompletableFuture processChangeInvisibleTimeForOrderNew( + ChangeInvisibleTimeRequestHeader requestHeader, String[] extraInfo, + RemotingCommand response, ChangeInvisibleTimeResponseHeader responseHeader) { + + String groupId = requestHeader.getConsumerGroup(); + String topicId = requestHeader.getTopic(); + Integer queueId = requestHeader.getQueueId(); + long popTime = ExtraInfoUtil.getPopTime(extraInfo); + + PopConsumerLockService consumerLockService = + this.brokerController.getPopConsumerService().getConsumerLockService(); + ConsumerOffsetManager consumerOffsetManager = this.brokerController.getConsumerOffsetManager(); + ConsumerOrderInfoManager consumerOrderInfoManager = brokerController.getConsumerOrderInfoManager(); + + long oldOffset = consumerOffsetManager.queryOffset(groupId, topicId, queueId); + if (requestHeader.getOffset() < oldOffset) { + return CompletableFuture.completedFuture(response); + } + + while (!consumerLockService.tryLock(groupId, topicId)) { + } + + try { + // double check + oldOffset = consumerOffsetManager.queryOffset(groupId, topicId, queueId); + if (requestHeader.getOffset() < oldOffset) { + return CompletableFuture.completedFuture(response); + } + + long visibilityTimeout = System.currentTimeMillis() + requestHeader.getInvisibleTime(); + consumerOrderInfoManager.updateNextVisibleTime( + topicId, groupId, queueId, requestHeader.getOffset(), popTime, visibilityTimeout); + + responseHeader.setInvisibleTime(visibilityTimeout - popTime); + responseHeader.setPopTime(popTime); + responseHeader.setReviveQid(ExtraInfoUtil.getReviveQid(extraInfo)); + } finally { + consumerLockService.unlock(groupId, topicId); + } + + return CompletableFuture.completedFuture(response); + } + protected RemotingCommand processChangeInvisibleTimeForOrder(ChangeInvisibleTimeRequestHeader requestHeader, String[] extraInfo, RemotingCommand response, ChangeInvisibleTimeResponseHeader responseHeader) { long popTime = ExtraInfoUtil.getPopTime(extraInfo); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index 6317d6ad7d2..b95055efba7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -172,7 +172,6 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, private boolean hasMsgFromTopic(String topicName, int randomQ, NotificationRequestHeader requestHeader) throws RemotingCommandException { - boolean hasMsg; TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topicName); return hasMsgFromTopic(topicConfig, randomQ, requestHeader); } @@ -212,13 +211,16 @@ private long getPopOffset(String topic, String cid, int queueId) { if (offset < 0) { offset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId); } - long bufferOffset = this.brokerController.getPopMessageProcessor().getPopBufferMergeService() - .getLatestOffset(topic, cid, queueId); - if (bufferOffset < 0) { - return offset; + + long bufferOffset; + if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) { + bufferOffset = this.brokerController.getConsumerOffsetManager().queryPullOffset(cid, topic, queueId); } else { - return bufferOffset > offset ? bufferOffset : offset; + bufferOffset = this.brokerController.getPopMessageProcessor() + .getPopBufferMergeService().getLatestOffset(topic, cid, queueId); } + + return bufferOffset < 0L ? offset : Math.max(bufferOffset, offset); } public PopLongPollingService getPopLongPollingService() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java index 9f10b483ddb..05a92c54b18 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java @@ -62,7 +62,7 @@ public class PopBufferMergeService extends ServiceThread { private final int countOfSecond1 = (int) (1000 / interval); private final int countOfSecond30 = (int) (30 * 1000 / interval); - private final List batchAckIndexList = new ArrayList(32); + private final List batchAckIndexList = new ArrayList<>(32); private volatile boolean master = false; public PopBufferMergeService(BrokerController brokerController, PopMessageProcessor popMessageProcessor) { @@ -645,7 +645,7 @@ private void putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgInde ackMsg.setQueueId(point.getQueueId()); ackMsg.setPopTime(point.getPopTime()); ackMsg.setBrokerName(point.getBrokerName()); - msgInner.setTopic(popMessageProcessor.reviveTopic); + msgInner.setTopic(popMessageProcessor.getReviveTopic()); msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.CHARSET_UTF8)); msgInner.setQueueId(pointWrapper.getReviveQueueId()); msgInner.setTags(PopAckConstants.ACK_TAG); @@ -701,7 +701,7 @@ private void putBatchAckToStore(final PopCheckPointWrapper pointWrapper, final L batchAckMsg.setTopic(point.getTopic()); batchAckMsg.setQueueId(point.getQueueId()); batchAckMsg.setPopTime(point.getPopTime()); - msgInner.setTopic(popMessageProcessor.reviveTopic); + msgInner.setTopic(popMessageProcessor.getReviveTopic()); msgInner.setBody(JSON.toJSONString(batchAckMsg).getBytes(DataConverter.CHARSET_UTF8)); msgInner.setQueueId(pointWrapper.getReviveQueueId()); msgInner.setTags(PopAckConstants.BATCH_ACK_TAG); @@ -751,7 +751,7 @@ private boolean cancelCkTimer(final PopCheckPointWrapper pointWrapper) { } PopCheckPoint point = pointWrapper.getCk(); MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); - msgInner.setTopic(popMessageProcessor.reviveTopic); + msgInner.setTopic(popMessageProcessor.getReviveTopic()); msgInner.setBody((pointWrapper.getReviveQueueId() + "-" + pointWrapper.getReviveQueueOffset()).getBytes(StandardCharsets.UTF_8)); msgInner.setQueueId(pointWrapper.getReviveQueueId()); msgInner.setTags(PopAckConstants.CK_TAG); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 05efc14b7b4..9355af319ee 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -47,6 +47,7 @@ import org.apache.rocketmq.broker.longpolling.PopRequest; import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer; +import org.apache.rocketmq.broker.pop.PopConsumerContext; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; @@ -99,13 +100,12 @@ public class PopMessageProcessor implements NettyRequestProcessor { - private static final Logger POP_LOGGER = - LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); + private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); + private static final String BORN_TIME = "bornTime"; private final BrokerController brokerController; private final Random random = new Random(System.currentTimeMillis()); - String reviveTopic; - private static final String BORN_TIME = "bornTime"; + private final String reviveTopic; private final PopLongPollingService popLongPollingService; private final PopBufferMergeService popBufferMergeService; @@ -114,13 +114,18 @@ public class PopMessageProcessor implements NettyRequestProcessor { public PopMessageProcessor(final BrokerController brokerController) { this.brokerController = brokerController; - this.reviveTopic = PopAckConstants.buildClusterReviveTopic(this.brokerController.getBrokerConfig().getBrokerClusterName()); + this.reviveTopic = PopAckConstants.buildClusterReviveTopic( + this.brokerController.getBrokerConfig().getBrokerClusterName()); this.popLongPollingService = new PopLongPollingService(brokerController, this, false); this.queueLockManager = new QueueLockManager(); this.popBufferMergeService = new PopBufferMergeService(this.brokerController, this); this.ckMessageNumber = new AtomicLong(); } + protected String getReviveTopic() { + return reviveTopic; + } + public PopLongPollingService getPopLongPollingService() { return popLongPollingService; } @@ -213,27 +218,26 @@ public void notifyMessageArriving(final String topic, final int queueId, final S @Override public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final long beginTimeMills = this.brokerController.getMessageStore().now(); + + // fill bron time to properties if not exist, why we need this? request.addExtFieldIfNotExist(BORN_TIME, String.valueOf(System.currentTimeMillis())); if (Objects.equals(request.getExtFields().get(BORN_TIME), "0")) { request.addExtField(BORN_TIME, String.valueOf(System.currentTimeMillis())); } - Channel channel = ctx.channel(); + Channel channel = ctx.channel(); RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class); + response.setOpaque(request.getOpaque()); + + final PopMessageRequestHeader requestHeader = + request.decodeCommandCustomHeader(PopMessageRequestHeader.class, true); final PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader(); - final PopMessageRequestHeader requestHeader = request.decodeCommandCustomHeader(PopMessageRequestHeader.class, true); - StringBuilder startOffsetInfo = new StringBuilder(64); - StringBuilder msgOffsetInfo = new StringBuilder(64); - StringBuilder orderCountInfo = null; - if (requestHeader.isOrder()) { - orderCountInfo = new StringBuilder(64); - } - brokerController.getConsumerManager().compensateBasicConsumerInfo(requestHeader.getConsumerGroup(), - ConsumeType.CONSUME_POP, MessageModel.CLUSTERING); - - response.setOpaque(request.getOpaque()); + // Pop mode only supports consumption in cluster load balancing mode + brokerController.getConsumerManager().compensateBasicConsumerInfo( + requestHeader.getConsumerGroup(), ConsumeType.CONSUME_POP, MessageModel.CLUSTERING); if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("receive PopMessage request command, {}", request); @@ -245,12 +249,14 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC this.brokerController.getBrokerConfig().getBrokerIP1())); return response; } + if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark(String.format("the broker[%s] pop message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1())); return response; } + if (requestHeader.getMaxMsgNums() > 32) { response.setCode(ResponseCode.INVALID_PARAMETER); response.setRemark(String.format("the broker[%s] pop message's num is greater than 32", @@ -292,6 +298,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC response.setRemark(errorInfo); return response; } + SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); if (null == subscriptionGroupConfig) { @@ -312,21 +319,25 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC ExpressionMessageFilter messageFilter = null; if (requestHeader.getExp() != null && !requestHeader.getExp().isEmpty()) { try { - subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType()); - brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(), - requestHeader.getTopic(), subscriptionData); - - String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); - SubscriptionData retrySubscriptionData = FilterAPI.build(retryTopic, SubscriptionData.SUB_ALL, requestHeader.getExpType()); - brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(), - retryTopic, retrySubscriptionData); + // origin topic + subscriptionData = FilterAPI.build( + requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType()); + brokerController.getConsumerManager().compensateSubscribeData( + requestHeader.getConsumerGroup(), requestHeader.getTopic(), subscriptionData); + + // retry topic + String retryTopic = KeyBuilder.buildPopRetryTopic( + requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); + SubscriptionData retrySubscriptionData = FilterAPI.build( + retryTopic, SubscriptionData.SUB_ALL, requestHeader.getExpType()); + brokerController.getConsumerManager().compensateSubscribeData( + requestHeader.getConsumerGroup(), retryTopic, retrySubscriptionData); ConsumerFilterData consumerFilterData = null; if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { consumerFilterData = ConsumerFilterManager.build( requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getExp(), - requestHeader.getExpType(), System.currentTimeMillis() - ); + requestHeader.getExpType(), System.currentTimeMillis()); if (consumerFilterData == null) { POP_LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getExp(), requestHeader.getConsumerGroup()); @@ -335,8 +346,8 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC return response; } } - messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, - brokerController.getConsumerFilterManager()); + messageFilter = new ExpressionMessageFilter( + subscriptionData, consumerFilterData, brokerController.getConsumerFilterManager()); } catch (Exception e) { POP_LOGGER.warn("Parse the consumer's subscription[{}] error, group: {}", requestHeader.getExp(), requestHeader.getConsumerGroup()); @@ -346,30 +357,139 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC } } else { try { + // origin topic subscriptionData = FilterAPI.build(requestHeader.getTopic(), "*", ExpressionType.TAG); - brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(), - requestHeader.getTopic(), subscriptionData); + brokerController.getConsumerManager().compensateSubscribeData( + requestHeader.getConsumerGroup(), requestHeader.getTopic(), subscriptionData); - String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); + // retry topic + String retryTopic = KeyBuilder.buildPopRetryTopic( + requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); SubscriptionData retrySubscriptionData = FilterAPI.build(retryTopic, "*", ExpressionType.TAG); - brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(), - retryTopic, retrySubscriptionData); + brokerController.getConsumerManager().compensateSubscribeData( + requestHeader.getConsumerGroup(), retryTopic, retrySubscriptionData); } catch (Exception e) { POP_LOGGER.warn("Build default subscription error, group: {}", requestHeader.getConsumerGroup()); } } + GetMessageResult getMessageResult = new GetMessageResult(requestHeader.getMaxMsgNums()); + ExpressionMessageFilter finalMessageFilter = messageFilter; + SubscriptionData finalSubscriptionData = subscriptionData; + + if (brokerConfig.isPopConsumerKVServiceEnable()) { + + CompletableFuture popAsyncFuture = brokerController.getPopConsumerService().popAsync( + RemotingHelper.parseChannelRemoteAddr(channel), beginTimeMills, requestHeader.getInvisibleTime(), + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), + requestHeader.getMaxMsgNums(), requestHeader.isOrder(), requestHeader.getAttemptId(), messageFilter); + + popAsyncFuture.thenApply(result -> { + if (result.isFound()) { + response.setCode(ResponseCode.SUCCESS); + getMessageResult.setStatus(GetMessageStatus.FOUND); + // recursive processing + if (result.getRestCount() > 0) { + popLongPollingService.notifyMessageArriving( + requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getConsumerGroup(), + null, 0L, null, null); + } + } else { + POP_LOGGER.debug("Processor not found, polling request, popTime={}, restCount={}", + result.getPopTime(), result.getRestCount()); + + PollingResult pollingResult = popLongPollingService.polling( + ctx, request, new PollingHeader(requestHeader), finalSubscriptionData, finalMessageFilter); + + if (PollingResult.POLLING_SUC == pollingResult) { + // recursive processing + if (result.getRestCount() > 0) { + popLongPollingService.notifyMessageArriving( + requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getConsumerGroup(), + null, 0L, null, null); + } + return null; + } else if (PollingResult.POLLING_FULL == pollingResult) { + response.setCode(ResponseCode.POLLING_FULL); + } else { + response.setCode(ResponseCode.POLLING_TIMEOUT); + } + getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE); + } + + responseHeader.setPopTime(result.getPopTime()); + responseHeader.setInvisibleTime(result.getInvisibleTime()); + responseHeader.setReviveQid( + requestHeader.isOrder() ? KeyBuilder.POP_ORDER_REVIVE_QUEUE : 0); + responseHeader.setRestNum(result.getRestCount()); + responseHeader.setStartOffsetInfo(result.getStartOffsetInfo()); + responseHeader.setMsgOffsetInfo(result.getMsgOffsetInfo()); + if (requestHeader.isOrder() && !result.getOrderCountInfo().isEmpty()) { + responseHeader.setOrderCountInfo(result.getOrderCountInfo()); + } + + response.setRemark(getMessageResult.getStatus().name()); + if (response.getCode() != ResponseCode.SUCCESS) { + return response; + } + + // add message + result.getGetMessageResultList().forEach(temp -> { + for (int i = 0; i < temp.getMessageMapedList().size(); i++) { + getMessageResult.addMessage(temp.getMessageMapedList().get(i)); + } + }); + + if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { + final byte[] r = this.readGetMessageResult(getMessageResult, + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); + this.brokerController.getBrokerStatsManager().incGroupGetLatency( + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), + (int) (this.brokerController.getMessageStore().now() - beginTimeMills)); + response.setBody(r); + } else { + final GetMessageResult tmpGetMessageResult = getMessageResult; + try { + FileRegion fileRegion = new ManyMessageTransfer( + response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult); + channel.writeAndFlush(fileRegion) + .addListener((ChannelFutureListener) future -> { + tmpGetMessageResult.release(); + Attributes attributes = RemotingMetricsManager.newAttributesBuilder() + .put(LABEL_REQUEST_CODE, RemotingHelper.getRequestCodeDesc(request.getCode())) + .put(LABEL_RESPONSE_CODE, RemotingHelper.getResponseCodeDesc(response.getCode())) + .put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future)) + .build(); + RemotingMetricsManager.rpcLatency.record( + request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes); + if (!future.isSuccess()) { + POP_LOGGER.error("Fail to transfer messages from page cache to {}", + channel.remoteAddress(), future.cause()); + } + }); + } catch (Throwable e) { + POP_LOGGER.error("Error occurred when transferring messages from page cache", e); + getMessageResult.release(); + } + return null; + } + return response; + }).thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result)); + return null; + } + int randomQ = random.nextInt(100); int reviveQid; if (requestHeader.isOrder()) { reviveQid = KeyBuilder.POP_ORDER_REVIVE_QUEUE; } else { - reviveQid = (int) Math.abs(ckMessageNumber.getAndIncrement() % this.brokerController.getBrokerConfig().getReviveQueueNum()); + reviveQid = (int) Math.abs(ckMessageNumber.getAndIncrement() % + this.brokerController.getBrokerConfig().getReviveQueueNum()); } - GetMessageResult getMessageResult = new GetMessageResult(requestHeader.getMaxMsgNums()); - ExpressionMessageFilter finalMessageFilter = messageFilter; - StringBuilder finalOrderCountInfo = orderCountInfo; + StringBuilder startOffsetInfo = new StringBuilder(64); + StringBuilder msgOffsetInfo = new StringBuilder(64); + StringBuilder orderCountInfo = requestHeader.isOrder() ? new StringBuilder(64) : null; // Due to the design of the fields startOffsetInfo, msgOffsetInfo, and orderCountInfo, // a single POP request could only invoke the popMsgFromQueue method once @@ -404,7 +524,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(topicConfig.getTopicName(), requestHeader.getAttemptId(), false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, - startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); + startOffsetInfo, msgOffsetInfo, orderCountInfo)); } // if not full , fetch retry again if (!needRetry && getMessageResult.getMessageMapedList().size() < requestHeader.getMaxMsgNums() && !requestHeader.isOrder()) { @@ -420,7 +540,6 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC } final RemotingCommand finalResponse = response; - SubscriptionData finalSubscriptionData = subscriptionData; getMessageFuture.thenApply(restNum -> { try { if (request.getCallbackList() != null) { @@ -463,8 +582,8 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC responseHeader.setRestNum(restNum); responseHeader.setStartOffsetInfo(startOffsetInfo.toString()); responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString()); - if (requestHeader.isOrder() && finalOrderCountInfo != null) { - responseHeader.setOrderCountInfo(finalOrderCountInfo.toString()); + if (requestHeader.isOrder() && orderCountInfo != null) { + responseHeader.setOrderCountInfo(orderCountInfo.toString()); } finalResponse.setRemark(getMessageResult.getStatus().name()); switch (finalResponse.getCode()) { @@ -537,10 +656,12 @@ private CompletableFuture popMsgFromTopic(String topic, boolean isRetry, G messageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture); } - private CompletableFuture popMsgFromQueue(String topic, String attemptId, boolean isRetry, GetMessageResult getMessageResult, + private CompletableFuture popMsgFromQueue(String topic, String attemptId, boolean isRetry, + GetMessageResult getMessageResult, PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid, Channel channel, long popTime, ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo, StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) { + String lockKey = topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId; boolean isOrder = requestHeader.isOrder(); @@ -792,7 +913,7 @@ private long getInitOffset(String topic, String group, int queueId, int initMode return offset; } - public final MessageExtBrokerInner buildCkMsg(final PopCheckPoint ck, final int reviveQid) { + public MessageExtBrokerInner buildCkMsg(final PopCheckPoint ck, final int reviveQid) { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(reviveTopic); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java new file mode 100644 index 00000000000..3f6e893a527 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.pop; + +import java.util.Collections; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; +import org.apache.rocketmq.common.BrokerConfig; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.mockito.ArgumentMatchers.any; + +public class PopConsumerCacheTest { + + private final String attemptId = "attemptId"; + private final String topicId = "TopicTest"; + private final String groupId = "GroupTest"; + private final int queueId = 2; + + @Test + public void consumerRecordsTest() { + BrokerConfig brokerConfig = new BrokerConfig(); + brokerConfig.setPopConsumerKVServiceLog(true); + PopConsumerCache.ConsumerRecords consumerRecords = + new PopConsumerCache.ConsumerRecords(brokerConfig, groupId, topicId, queueId); + Assert.assertNotNull(consumerRecords.toString()); + + for (int i = 0; i < 5; i++) { + consumerRecords.write(new PopConsumerRecord(i, groupId, topicId, queueId, 0, + 20000, 100 + i, attemptId)); + } + Assert.assertEquals(100, consumerRecords.getMinOffsetInBuffer()); + Assert.assertEquals(5, consumerRecords.getInFlightRecordCount()); + + for (int i = 0; i < 2; i++) { + consumerRecords.delete(new PopConsumerRecord(i, groupId, topicId, queueId, 0, + 20000, 100 + i, attemptId)); + } + Assert.assertEquals(102, consumerRecords.getMinOffsetInBuffer()); + Assert.assertEquals(3, consumerRecords.getInFlightRecordCount()); + + long bufferTimeout = brokerConfig.getPopCkStayBufferTime(); + Assert.assertEquals(1, consumerRecords.removeExpiredRecords(bufferTimeout + 2).size()); + Assert.assertNull(consumerRecords.removeExpiredRecords(bufferTimeout + 2)); + Assert.assertEquals(2, consumerRecords.removeExpiredRecords(bufferTimeout + 4).size()); + Assert.assertNull(consumerRecords.removeExpiredRecords(bufferTimeout + 4)); + } + + @Test + public void consumerOffsetTest() throws IllegalAccessException { + BrokerController brokerController = Mockito.mock(BrokerController.class); + PopConsumerKVStore consumerKVStore = Mockito.mock(PopConsumerRocksdbStore.class); + PopConsumerLockService consumerLockService = Mockito.mock(PopConsumerLockService.class); + ConsumerOffsetManager consumerOffsetManager = Mockito.mock(ConsumerOffsetManager.class); + Mockito.when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig()); + Mockito.when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); + Mockito.when(consumerLockService.tryLock(groupId, topicId)).thenReturn(true); + + PopConsumerCache consumerCache = + new PopConsumerCache(brokerController, consumerKVStore, consumerLockService, null); + consumerCache.commitOffset("CommitOffsetTest", groupId, topicId, queueId, 100L); + consumerCache.removeRecords(groupId, topicId, queueId); + + AtomicInteger estimateCacheSize = (AtomicInteger) FieldUtils.readField( + consumerCache, "estimateCacheSize", true); + estimateCacheSize.set(2); + consumerCache.start(); + Awaitility.await().until(() -> estimateCacheSize.get() == 0); + consumerCache.shutdown(); + } + + @Test + public void consumerCacheTest() { + BrokerController brokerController = Mockito.mock(BrokerController.class); + PopConsumerKVStore consumerKVStore = Mockito.mock(PopConsumerRocksdbStore.class); + PopConsumerLockService consumerLockService = Mockito.mock(PopConsumerLockService.class); + Mockito.when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig()); + + PopConsumerCache consumerCache = + new PopConsumerCache(brokerController, consumerKVStore, consumerLockService, null); + Assert.assertEquals(-1L, consumerCache.getMinOffsetInCache(groupId, topicId, queueId)); + Assert.assertEquals(0, consumerCache.getPopInFlightMessageCount(groupId, topicId, queueId)); + Assert.assertEquals(0, consumerCache.getCacheKeySize()); + + // write + for (int i = 0; i < 3; i++) { + PopConsumerRecord record = new PopConsumerRecord(2L, groupId, topicId, queueId, + 0, 20000, 100 + i, attemptId); + Assert.assertEquals(consumerCache.getKey(record), consumerCache.getKey(groupId, topicId, queueId)); + consumerCache.writeRecords(Collections.singletonList(record)); + } + Assert.assertEquals(100, consumerCache.getMinOffsetInCache(groupId, topicId, queueId)); + Assert.assertEquals(3, consumerCache.getPopInFlightMessageCount(groupId, topicId, queueId)); + Assert.assertEquals(1, consumerCache.getCacheKeySize()); + Assert.assertEquals(3, consumerCache.getCacheSize()); + Assert.assertFalse(consumerCache.isCacheFull()); + + // delete + PopConsumerRecord record = new PopConsumerRecord(2L, groupId, topicId, queueId, + 0, 20000, 100, attemptId); + Assert.assertEquals(0, consumerCache.deleteRecords(Collections.singletonList(record)).size()); + Assert.assertEquals(101, consumerCache.getMinOffsetInCache(groupId, topicId, queueId)); + Assert.assertEquals(2, consumerCache.getPopInFlightMessageCount(groupId, topicId, queueId)); + Assert.assertEquals(2, consumerCache.getCacheSize()); + + record = new PopConsumerRecord(2L, groupId, topicId, queueId, + 0, 20000, 104, attemptId); + Assert.assertEquals(1, consumerCache.deleteRecords(Collections.singletonList(record)).size()); + Assert.assertEquals(101, consumerCache.getMinOffsetInCache(groupId, topicId, queueId)); + Assert.assertEquals(2, consumerCache.getPopInFlightMessageCount(groupId, topicId, queueId)); + + // clean expired records + Queue consumerRecordList = new LinkedBlockingQueue<>(); + consumerCache.cleanupRecords(consumerRecordList::add); + Assert.assertEquals(2, consumerRecordList.size()); + + // clean all + Mockito.when(consumerLockService.isLockTimeout(any(), any())).thenReturn(true); + consumerRecordList.clear(); + consumerCache.cleanupRecords(consumerRecordList::add); + Assert.assertEquals(0, consumerRecordList.size()); + } +} \ No newline at end of file diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerContextTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerContextTest.java new file mode 100644 index 00000000000..554933eabc4 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerContextTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.pop; + +import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.GetMessageStatus; +import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +public class PopConsumerContextTest { + + @Test + public void consumerContextTest() { + long popTime = System.currentTimeMillis(); + PopConsumerContext context = new PopConsumerContext("127.0.0.1:6789", + popTime, 20_000, "GroupId", true, "attemptId"); + + Assert.assertFalse(context.isFound()); + Assert.assertEquals("127.0.0.1:6789", context.getClientHost()); + Assert.assertEquals(popTime, context.getPopTime()); + Assert.assertEquals(20_000, context.getInvisibleTime()); + Assert.assertEquals("GroupId", context.getGroupId()); + Assert.assertTrue(context.isFifo()); + Assert.assertEquals("attemptId", context.getAttemptId()); + Assert.assertEquals(0, context.getRestCount()); + + GetMessageResult getMessageResult = new GetMessageResult(); + getMessageResult.setStatus(GetMessageStatus.FOUND); + getMessageResult.setMinOffset(10L); + getMessageResult.setMaxOffset(20L); + getMessageResult.setNextBeginOffset(15L); + getMessageResult.addMessage(Mockito.mock(SelectMappedBufferResult.class), 10); + getMessageResult.addMessage(Mockito.mock(SelectMappedBufferResult.class), 12); + getMessageResult.addMessage(Mockito.mock(SelectMappedBufferResult.class), 13); + + context.addGetMessageResult(getMessageResult, + "TopicId", 3, PopConsumerRecord.RetryType.NORMAL_TOPIC, 1); + + Assert.assertEquals(3, context.getMessageCount()); + Assert.assertEquals( + getMessageResult.getMaxOffset() - getMessageResult.getNextBeginOffset(), context.getRestCount()); + + // check header + Assert.assertNotNull(context.toString()); + Assert.assertEquals("0 3 1", context.getStartOffsetInfo()); + Assert.assertEquals("0 3 10,12,13", context.getMsgOffsetInfo()); + Assert.assertNotNull(context.getOrderCountInfoBuilder()); + Assert.assertEquals("", context.getOrderCountInfo()); + + Assert.assertEquals(1, context.getGetMessageResultList().size()); + Assert.assertEquals(3, context.getPopConsumerRecordList().size()); + } +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerLockServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerLockServiceTest.java new file mode 100644 index 00000000000..b5af2f31798 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerLockServiceTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.pop; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.PopAckConstants; +import org.junit.Assert; +import org.junit.Test; + +public class PopConsumerLockServiceTest { + + @Test + @SuppressWarnings("unchecked") + public void consumerLockTest() throws NoSuchFieldException, IllegalAccessException { + String groupId = "groupId"; + String topicId = "topicId"; + + PopConsumerLockService lockService = + new PopConsumerLockService(TimeUnit.MINUTES.toMillis(2)); + + Assert.assertTrue(lockService.tryLock(groupId, topicId)); + Assert.assertFalse(lockService.tryLock(groupId, topicId)); + lockService.unlock(groupId, topicId); + + Assert.assertTrue(lockService.tryLock(groupId, topicId)); + Assert.assertFalse(lockService.tryLock(groupId, topicId)); + Assert.assertFalse(lockService.isLockTimeout(groupId, topicId)); + lockService.removeTimeout(); + + // set expired + Field field = PopConsumerLockService.class.getDeclaredField("lockTable"); + field.setAccessible(true); + Map table = + (Map) field.get(lockService); + + Field lockTime = PopConsumerLockService.TimedLock.class.getDeclaredField("lockTime"); + lockTime.setAccessible(true); + lockTime.set(table.get(groupId + PopAckConstants.SPLIT + topicId), + System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(3)); + lockService.removeTimeout(); + + Assert.assertEquals(0, table.size()); + } +} \ No newline at end of file diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRecordTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRecordTest.java new file mode 100644 index 00000000000..24a79b33f31 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRecordTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.pop; + +import java.util.UUID; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.junit.Assert; +import org.junit.Test; + +public class PopConsumerRecordTest { + + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); + + @Test + public void retryCodeTest() { + Assert.assertEquals("NORMAL_TOPIC code should be 0", + 0, PopConsumerRecord.RetryType.NORMAL_TOPIC.getCode()); + Assert.assertEquals("RETRY_TOPIC code should be 1", + 1, PopConsumerRecord.RetryType.RETRY_TOPIC_V1.getCode()); + Assert.assertEquals("RETRY_TOPIC_V2 code should be 2", + 2, PopConsumerRecord.RetryType.RETRY_TOPIC_V2.getCode()); + } + + @Test + public void deliveryRecordSerializeTest() { + PopConsumerRecord consumerRecord = new PopConsumerRecord(); + consumerRecord.setPopTime(System.currentTimeMillis()); + consumerRecord.setGroupId("GroupId"); + consumerRecord.setTopicId("TopicId"); + consumerRecord.setQueueId(3); + consumerRecord.setRetryFlag(PopConsumerRecord.RetryType.RETRY_TOPIC_V1.getCode()); + consumerRecord.setInvisibleTime(20); + consumerRecord.setOffset(100); + consumerRecord.setAttemptTimes(2); + consumerRecord.setAttemptId(UUID.randomUUID().toString().toUpperCase()); + + Assert.assertTrue(consumerRecord.isRetry()); + Assert.assertEquals(consumerRecord.getPopTime() + consumerRecord.getInvisibleTime(), + consumerRecord.getVisibilityTimeout()); + Assert.assertEquals(8 + "GroupId".length() + 1 + "TopicId".length() + 1 + 4 + 1 + 8, + consumerRecord.getKeyBytes().length); + log.info("ConsumerRecord={}", consumerRecord.toString()); + + PopConsumerRecord decodeRecord = PopConsumerRecord.decode(consumerRecord.getValueBytes()); + PopConsumerRecord consumerRecord2 = new PopConsumerRecord(consumerRecord.getPopTime(), + consumerRecord.getGroupId(), consumerRecord.getTopicId(), consumerRecord.getQueueId(), + consumerRecord.getRetryFlag(), consumerRecord.getInvisibleTime(), + consumerRecord.getOffset(), consumerRecord.getAttemptId()); + Assert.assertEquals(decodeRecord.getPopTime(), consumerRecord2.getPopTime()); + Assert.assertEquals(decodeRecord.getGroupId(), consumerRecord2.getGroupId()); + Assert.assertEquals(decodeRecord.getTopicId(), consumerRecord2.getTopicId()); + Assert.assertEquals(decodeRecord.getQueueId(), consumerRecord2.getQueueId()); + Assert.assertEquals(decodeRecord.getRetryFlag(), consumerRecord2.getRetryFlag()); + Assert.assertEquals(decodeRecord.getInvisibleTime(), consumerRecord2.getInvisibleTime()); + Assert.assertEquals(decodeRecord.getOffset(), consumerRecord2.getOffset()); + Assert.assertEquals(0, consumerRecord2.getAttemptTimes()); + Assert.assertEquals(decodeRecord.getAttemptId(), consumerRecord2.getAttemptId()); + } +} \ No newline at end of file diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java new file mode 100644 index 00000000000..5facaeb55f1 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.pop; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.commons.io.FileUtils; +import org.apache.rocketmq.common.constant.LoggerName; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PopConsumerRocksdbStoreTest { + + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); + private static final String CONSUMER_STORE_PATH = "consumer_rocksdb"; + + public static String getRandomStorePath() { + return Paths.get(System.getProperty("user.home"), "store_test", CONSUMER_STORE_PATH, + UUID.randomUUID().toString().replace("-", "").toUpperCase().substring(0, 16)).toString(); + } + + public static void deleteStoreDirectory(String storePath) { + try { + FileUtils.deleteDirectory(new File(storePath)); + } catch (IOException e) { + log.error("Delete store directory failed, filePath: {}", storePath, e); + } + } + + public static PopConsumerRecord getConsumerRecord() { + return new PopConsumerRecord(1L, "GroupTest", "TopicTest", 2, + PopConsumerRecord.RetryType.NORMAL_TOPIC.getCode(), TimeUnit.SECONDS.toMillis(20), 100L, "AttemptId"); + } + + @Test + public void rocksdbStoreWriteDeleteTest() { + String filePath = getRandomStorePath(); + PopConsumerKVStore consumerStore = new PopConsumerRocksdbStore(filePath); + Assert.assertEquals(filePath, consumerStore.getFilePath()); + + consumerStore.start(); + consumerStore.writeRecords(IntStream.range(0, 3).boxed() + .flatMap(i -> + IntStream.range(0, 5).mapToObj(j -> { + PopConsumerRecord consumerRecord = getConsumerRecord(); + consumerRecord.setPopTime(j); + consumerRecord.setQueueId(i); + consumerRecord.setOffset(100L + j); + return consumerRecord; + }) + ) + .collect(Collectors.toList())); + consumerStore.deleteRecords(IntStream.range(0, 2).boxed() + .flatMap(i -> + IntStream.range(0, 5).mapToObj(j -> { + PopConsumerRecord consumerRecord = getConsumerRecord(); + consumerRecord.setPopTime(j); + consumerRecord.setQueueId(i); + consumerRecord.setOffset(100L + j); + return consumerRecord; + }) + ) + .collect(Collectors.toList())); + + List consumerRecords = + consumerStore.scanExpiredRecords(20002, 2); + Assert.assertEquals(2, consumerRecords.size()); + consumerStore.deleteRecords(consumerRecords); + + consumerRecords = consumerStore.scanExpiredRecords(20002, 2); + Assert.assertEquals(1, consumerRecords.size()); + consumerStore.deleteRecords(consumerRecords); + + consumerRecords = consumerStore.scanExpiredRecords(20004, 3); + Assert.assertEquals(2, consumerRecords.size()); + + consumerStore.shutdown(); + deleteStoreDirectory(filePath); + } +} \ No newline at end of file diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java new file mode 100644 index 00000000000..5e73adb1ea1 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.pop; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.commons.lang3.tuple.Triple; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.failover.EscapeBridge; +import org.apache.rocketmq.broker.longpolling.PopLongPollingService; +import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; +import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager; +import org.apache.rocketmq.broker.processor.PopMessageProcessor; +import org.apache.rocketmq.broker.topic.TopicConfigManager; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.KeyBuilder; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.store.AppendMessageResult; +import org.apache.rocketmq.store.AppendMessageStatus; +import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.GetMessageStatus; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.exception.ConsumeQueueException; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; + +public class PopConsumerServiceTest { + + private final String clientHost = "127.0.0.1:8888"; + private final String groupId = "groupId"; + private final String topicId = "topicId"; + private final int queueId = 2; + private final String attemptId = UUID.randomUUID().toString().toUpperCase(); + private final String filePath = PopConsumerRocksdbStoreTest.getRandomStorePath(); + + private BrokerController brokerController; + private PopConsumerService consumerService; + + @Before + public void init() throws IOException { + BrokerConfig brokerConfig = new BrokerConfig(); + brokerConfig.setEnablePopLog(true); + brokerConfig.setEnablePopBufferMerge(true); + brokerConfig.setEnablePopMessageThreshold(true); + brokerConfig.setPopInflightMessageThreshold(100); + brokerConfig.setPopConsumerKVServiceLog(true); + brokerConfig.setEnableRetryTopicV2(true); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setStorePathRootDir(filePath); + + TopicConfigManager topicConfigManager = Mockito.mock(TopicConfigManager.class); + ConsumerOffsetManager consumerOffsetManager = Mockito.mock(ConsumerOffsetManager.class); + PopMessageProcessor popMessageProcessor = Mockito.mock(PopMessageProcessor.class); + PopLongPollingService popLongPollingService = Mockito.mock(PopLongPollingService.class); + ConsumerOrderInfoManager consumerOrderInfoManager = Mockito.mock(ConsumerOrderInfoManager.class); + + brokerController = Mockito.mock(BrokerController.class); + Mockito.when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); + Mockito.when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); + Mockito.when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); + Mockito.when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); + Mockito.when(brokerController.getPopMessageProcessor()).thenReturn(popMessageProcessor); + Mockito.when(popMessageProcessor.getPopLongPollingService()).thenReturn(popLongPollingService); + Mockito.when(brokerController.getConsumerOrderInfoManager()).thenReturn(consumerOrderInfoManager); + + consumerService = new PopConsumerService(brokerController); + } + + @After + public void shutdown() throws IOException { + FileUtils.deleteDirectory(new File(filePath)); + } + + public PopConsumerRecord getConsumerTestRecord() { + PopConsumerRecord popConsumerRecord = new PopConsumerRecord(); + popConsumerRecord.setPopTime(System.currentTimeMillis()); + popConsumerRecord.setGroupId(groupId); + popConsumerRecord.setTopicId(topicId); + popConsumerRecord.setQueueId(queueId); + popConsumerRecord.setRetryFlag(PopConsumerRecord.RetryType.NORMAL_TOPIC.getCode()); + popConsumerRecord.setAttemptTimes(0); + popConsumerRecord.setInvisibleTime(TimeUnit.SECONDS.toMillis(20)); + popConsumerRecord.setAttemptId(UUID.randomUUID().toString().toUpperCase()); + return popConsumerRecord; + } + + @Test + public void isPopShouldStopTest() throws IllegalAccessException { + Assert.assertFalse(consumerService.isPopShouldStop(groupId, topicId, queueId)); + PopConsumerCache consumerCache = (PopConsumerCache) FieldUtils.readField( + consumerService, "popConsumerCache", true); + for (int i = 0; i < 100; i++) { + PopConsumerRecord record = getConsumerTestRecord(); + record.setOffset(i); + consumerCache.writeRecords(Collections.singletonList(record)); + } + Assert.assertTrue(consumerService.isPopShouldStop(groupId, topicId, queueId)); + } + + @Test + public void pendingFilterCountTest() throws ConsumeQueueException { + MessageStore messageStore = Mockito.mock(MessageStore.class); + Mockito.when(messageStore.getMaxOffsetInQueue(topicId, queueId)).thenReturn(100L); + Mockito.when(brokerController.getMessageStore()).thenReturn(messageStore); + ConsumerOffsetManager consumerOffsetManager = brokerController.getConsumerOffsetManager(); + Mockito.when(consumerOffsetManager.queryOffset(groupId, topicId, queueId)).thenReturn(20L); + Assert.assertEquals(consumerService.getPendingFilterCount(groupId, topicId, queueId), 80L); + } + + private MessageExt getMessageExt() { + MessageExt messageExt = new MessageExt(); + messageExt.setTopic(topicId); + messageExt.setQueueId(queueId); + messageExt.setBody(new byte[128]); + messageExt.setBornHost(new InetSocketAddress("127.0.0.1", 8080)); + messageExt.setStoreHost(new InetSocketAddress("127.0.0.1", 8080)); + messageExt.putUserProperty("Key", "Value"); + return messageExt; + } + + @Test + public void recodeRetryMessageTest() throws Exception { + GetMessageResult getMessageResult = new GetMessageResult(); + getMessageResult.setStatus(GetMessageStatus.FOUND); + + // result is empty + SelectMappedBufferResult bufferResult = new SelectMappedBufferResult( + 0, ByteBuffer.allocate(10), 10, null); + getMessageResult.addMessage(bufferResult); + getMessageResult.getMessageMapedList().clear(); + GetMessageResult result = consumerService.recodeRetryMessage( + getMessageResult, topicId, 0, 100, 200); + Assert.assertEquals(0, result.getMessageMapedList().size()); + + ByteBuffer buffer = ByteBuffer.wrap( + MessageDecoder.encode(getMessageExt(), false)); + getMessageResult = new GetMessageResult(); + getMessageResult.setStatus(GetMessageStatus.FOUND); + getMessageResult.addMessage(new SelectMappedBufferResult( + 0, buffer, buffer.remaining(), null)); + result = consumerService.recodeRetryMessage( + getMessageResult, topicId, 0, 100, 200); + Assert.assertNotNull(result); + Assert.assertEquals(1, result.getMessageMapedList().size()); + } + + @Test + public void addGetMessageResultTest() { + PopConsumerContext context = new PopConsumerContext( + clientHost, System.currentTimeMillis(), 20000, groupId, false, attemptId); + GetMessageResult result = new GetMessageResult(); + result.setStatus(GetMessageStatus.FOUND); + result.getMessageQueueOffset().add(100L); + consumerService.addGetMessageResult( + context, result, topicId, queueId, PopConsumerRecord.RetryType.NORMAL_TOPIC, 100); + Assert.assertEquals(1, context.getGetMessageResultList().size()); + } + + @Test + public void getMessageAsyncTest() throws Exception { + MessageStore messageStore = Mockito.mock(MessageStore.class); + Mockito.when(brokerController.getMessageStore()).thenReturn(messageStore); + Mockito.when(messageStore.getMessageAsync(groupId, topicId, queueId, 0, 10, null)) + .thenReturn(CompletableFuture.completedFuture(null)); + GetMessageResult getMessageResult = consumerService.getMessageAsync( + "127.0.0.1:8888", groupId, topicId, queueId, 0, 10, null).join(); + Assert.assertNull(getMessageResult); + + // success when first get message + GetMessageResult firstGetMessageResult = new GetMessageResult(); + firstGetMessageResult.setStatus(GetMessageStatus.FOUND); + Mockito.when(messageStore.getMessageAsync(groupId, topicId, queueId, 0, 10, null)) + .thenReturn(CompletableFuture.completedFuture(firstGetMessageResult)); + getMessageResult = consumerService.getMessageAsync( + "127.0.0.1:8888", groupId, topicId, queueId, 0, 10, null).join(); + Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus()); + + // reset offset from server + firstGetMessageResult.setStatus(GetMessageStatus.OFFSET_FOUND_NULL); + firstGetMessageResult.setNextBeginOffset(25); + GetMessageResult resetGetMessageResult = new GetMessageResult(); + resetGetMessageResult.setStatus(GetMessageStatus.FOUND); + Mockito.when(messageStore.getMessageAsync(groupId, topicId, queueId, 25, 10, null)) + .thenReturn(CompletableFuture.completedFuture(resetGetMessageResult)); + getMessageResult = consumerService.getMessageAsync( + "127.0.0.1:8888", groupId, topicId, queueId, 0, 10, null).join(); + Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus()); + + // fifo block + PopConsumerContext context = new PopConsumerContext( + clientHost, System.currentTimeMillis(), 20000, groupId, false, attemptId); + consumerService.setFifoBlocked(context, groupId, topicId, queueId, Collections.singletonList(100L)); + Mockito.when(brokerController.getConsumerOrderInfoManager() + .checkBlock(anyString(), anyString(), anyString(), anyInt(), anyLong())).thenReturn(true); + Assert.assertTrue(consumerService.isFifoBlocked(context, groupId, topicId, queueId)); + + // get message async normal + CompletableFuture future = CompletableFuture.completedFuture(context); + Assert.assertEquals(0L, consumerService.getMessageAsync(future, clientHost, groupId, topicId, queueId, + 10, null, PopConsumerRecord.RetryType.NORMAL_TOPIC).join().getRestCount()); + + // get message result full, no need get again + for (int i = 0; i < 10; i++) { + ByteBuffer buffer = ByteBuffer.wrap(MessageDecoder.encode(getMessageExt(), false)); + getMessageResult.addMessage(new SelectMappedBufferResult( + 0, buffer, buffer.remaining(), null), i); + } + context.addGetMessageResult(getMessageResult, topicId, queueId, PopConsumerRecord.RetryType.NORMAL_TOPIC, 0); + + Mockito.when(brokerController.getMessageStore().getMaxOffsetInQueue(topicId, queueId)).thenReturn(100L); + Mockito.when(brokerController.getConsumerOffsetManager().queryOffset(groupId, topicId, queueId)).thenReturn(0L); + Assert.assertEquals(100L, consumerService.getMessageAsync(future, clientHost, groupId, topicId, queueId, + 10, null, PopConsumerRecord.RetryType.NORMAL_TOPIC).join().getRestCount()); + + // fifo block test + context = new PopConsumerContext( + clientHost, System.currentTimeMillis(), 20000, groupId, true, attemptId); + future = CompletableFuture.completedFuture(context); + Assert.assertEquals(0L, consumerService.getMessageAsync(future, clientHost, groupId, topicId, queueId, + 10, null, PopConsumerRecord.RetryType.NORMAL_TOPIC).join().getRestCount()); + } + + @Test + public void popAsyncTest() { + PopConsumerService consumerServiceSpy = Mockito.spy(consumerService); + TopicConfigManager topicConfigManager = Mockito.mock(TopicConfigManager.class); + Mockito.when(topicConfigManager.selectTopicConfig(topicId)).thenReturn(new TopicConfig( + topicId, 2, 2, PermName.PERM_READ | PermName.PERM_WRITE, 0)); + Mockito.when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); + + String[] retryTopic = new String[] { + KeyBuilder.buildPopRetryTopicV1(topicId, groupId), + KeyBuilder.buildPopRetryTopicV2(topicId, groupId) + }; + + for (String retry : retryTopic) { + GetMessageResult getMessageResult = new GetMessageResult(); + getMessageResult.setStatus(GetMessageStatus.NO_MATCHED_MESSAGE); + getMessageResult.setMinOffset(0L); + getMessageResult.setMaxOffset(1L); + getMessageResult.setNextBeginOffset(1L); + Mockito.doReturn(CompletableFuture.completedFuture(getMessageResult)) + .when(consumerServiceSpy).getMessageAsync(clientHost, groupId, retry, 0, 0, 10, null); + Mockito.doReturn(CompletableFuture.completedFuture(getMessageResult)) + .when(consumerServiceSpy).getMessageAsync(clientHost, groupId, retry, 0, 0, 8, null); + } + + for (int i = -1; i < 2; i++) { + GetMessageResult getMessageResult = new GetMessageResult(); + getMessageResult.setStatus(GetMessageStatus.FOUND); + getMessageResult.setMinOffset(0L); + getMessageResult.setMaxOffset(1L); + getMessageResult.setNextBeginOffset(1L); + getMessageResult.addMessage(Mockito.mock(SelectMappedBufferResult.class), 1L); + + Mockito.doReturn(CompletableFuture.completedFuture(getMessageResult)) + .when(consumerServiceSpy).getMessageAsync(clientHost, groupId, topicId, i, 0, 8, null); + Mockito.doReturn(CompletableFuture.completedFuture(getMessageResult)) + .when(consumerServiceSpy).getMessageAsync(clientHost, groupId, topicId, i, 0, 9, null); + Mockito.doReturn(CompletableFuture.completedFuture(getMessageResult)) + .when(consumerServiceSpy).getMessageAsync(clientHost, groupId, topicId, i, 0, 10, null); + } + + // pop broker + consumerServiceSpy.popAsync(clientHost, System.currentTimeMillis(), + 20000, groupId, topicId, -1, 10, false, attemptId, null).join(); + } + + @Test + public void ackAsyncTest() { + long current = System.currentTimeMillis(); + consumerService.getPopConsumerStore().start(); + consumerService.ackAsync( + current, 10, groupId, topicId, queueId, 100).join(); + consumerService.changeInvisibilityDuration(current, 10, + current + 100, 10, groupId, topicId, queueId, 100); + consumerService.shutdown(); + } + + @Test + public void reviveRetryTest() { + Mockito.when(brokerController.getTopicConfigManager().selectTopicConfig(topicId)).thenReturn(null); + Mockito.when(brokerController.getConsumerOffsetManager().queryOffset(groupId, topicId, 0)).thenReturn(-1L); + + consumerService.createRetryTopicIfNeeded(groupId, topicId); + consumerService.clearCache(groupId, topicId, queueId); + MessageExt messageExt = new MessageExt(); + messageExt.setBody("body".getBytes()); + messageExt.setBornTimestamp(System.currentTimeMillis()); + messageExt.setFlag(0); + messageExt.setSysFlag(0); + messageExt.setReconsumeTimes(1); + messageExt.putUserProperty("key", "value"); + + PopConsumerRecord record = new PopConsumerRecord(); + record.setTopicId("topic"); + record.setGroupId("group"); + Mockito.when(brokerController.getBrokerStatsManager()).thenReturn(Mockito.mock(BrokerStatsManager.class)); + Mockito.when(brokerController.getEscapeBridge()).thenReturn(Mockito.mock(EscapeBridge.class)); + Mockito.when(brokerController.getEscapeBridge().putMessageToSpecificQueue(any(MessageExtBrokerInner.class))) + .thenReturn(new PutMessageResult( + PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + + PopConsumerService consumerServiceSpy = Mockito.spy(consumerService); + Mockito.doNothing().when(consumerServiceSpy).createRetryTopicIfNeeded(any(), any()); + Assert.assertTrue(consumerServiceSpy.reviveRetry(record, messageExt)); + + // write message error + Mockito.when(brokerController.getEscapeBridge().putMessageToSpecificQueue(any(MessageExtBrokerInner.class))) + .thenReturn(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, + new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + Assert.assertFalse(consumerServiceSpy.reviveRetry(record, messageExt)); + + // revive backoff + consumerService.getPopConsumerStore().start(); + List consumerRecordList = IntStream.range(0, 3) + .mapToObj(i -> { + PopConsumerRecord temp = new PopConsumerRecord(); + temp.setPopTime(0); + temp.setInvisibleTime(20 * 1000); + temp.setTopicId("topic"); + temp.setGroupId("group"); + temp.setQueueId(2); + temp.setOffset(i); + return temp; + }) + .collect(Collectors.toList()); + consumerService.getPopConsumerStore().writeRecords(consumerRecordList); + + Mockito.doReturn(CompletableFuture.completedFuture(null)) + .when(consumerServiceSpy).getMessageAsync(any(PopConsumerRecord.class)); + consumerServiceSpy.revive(20 * 1000, 1); + + Mockito.doReturn(CompletableFuture.completedFuture( + Triple.of(null, "GetMessageResult is null", false))) + .when(consumerServiceSpy).getMessageAsync(any(PopConsumerRecord.class)); + consumerServiceSpy.revive(20 * 1000, 1); + + Mockito.doReturn(CompletableFuture.completedFuture( + Triple.of(Mockito.mock(MessageExt.class), null, false))) + .when(consumerServiceSpy).getMessageAsync(any(PopConsumerRecord.class)); + consumerServiceSpy.revive(20 * 1000, 1); + consumerService.shutdown(); + } + + @Test + public void transferToFsStoreTest() { + Assert.assertNotNull(consumerService.getServiceName()); + List consumerRecordList = IntStream.range(0, 3) + .mapToObj(i -> { + PopConsumerRecord temp = new PopConsumerRecord(); + temp.setPopTime(0); + temp.setInvisibleTime(20 * 1000); + temp.setTopicId("topic"); + temp.setGroupId("group"); + temp.setQueueId(2); + temp.setOffset(i); + return temp; + }) + .collect(Collectors.toList()); + + Mockito.when(brokerController.getPopMessageProcessor().buildCkMsg(any(), anyInt())) + .thenReturn(new MessageExtBrokerInner()); + Mockito.when(brokerController.getMessageStore()).thenReturn(Mockito.mock(MessageStore.class)); + Mockito.when(brokerController.getMessageStore().asyncPutMessage(any())) + .thenReturn(CompletableFuture.completedFuture( + new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)))); + + consumerService.start(); + consumerService.getPopConsumerStore().writeRecords(consumerRecordList); + consumerService.transferToFsStore(); + consumerService.shutdown(); + } +} \ No newline at end of file diff --git a/broker/src/test/resources/rmq.logback-test.xml b/broker/src/test/resources/rmq.logback-test.xml index 8695d52d57c..7a2ff0bc933 100644 --- a/broker/src/test/resources/rmq.logback-test.xml +++ b/broker/src/test/resources/rmq.logback-test.xml @@ -19,9 +19,7 @@ - - %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n - + ${CONSOLE_LOG_PATTERN} @@ -29,7 +27,10 @@ - + + + diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 2e088ac9da5..c462dd1241c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -3573,4 +3573,16 @@ public void operationFail(Throwable throwable) { } }); } + + public void exportPopRecord(String brokerAddr, long timeout) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { + RemotingCommand request = RemotingCommand.createRequestCommand( + RequestCode.POP_ROLLBACK, null); + RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeout); + assert response != null; + if (response.getCode() == SUCCESS) { + return; + } + throw new MQBrokerException(response.getCode(), response.getRemark()); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index bac2e2c7e40..b5dc1899e94 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -236,6 +236,13 @@ public class BrokerConfig extends BrokerIdentity { private boolean retrieveMessageFromPopRetryTopicV1 = true; private boolean enableRetryTopicV2 = false; private int popFromRetryProbability = 20; + private boolean popConsumerFSServiceInit = true; + private boolean popConsumerKVServiceLog = false; + private boolean popConsumerKVServiceInit = false; + private boolean popConsumerKVServiceEnable = false; + private int popReviveMaxReturnSizePerRead = 16 * 1024; + private int popReviveMaxAttemptTimes = 16; + private boolean realTimeNotifyConsumerChange = true; private boolean litePullMessageEnable = true; @@ -590,6 +597,53 @@ public void setPopFromRetryProbability(int popFromRetryProbability) { this.popFromRetryProbability = popFromRetryProbability; } + public boolean isPopConsumerFSServiceInit() { + return popConsumerFSServiceInit; + } + + public void setPopConsumerFSServiceInit(boolean popConsumerFSServiceInit) { + this.popConsumerFSServiceInit = popConsumerFSServiceInit; + } + + public boolean isPopConsumerKVServiceLog() { + return popConsumerKVServiceLog; + } + + public void setPopConsumerKVServiceLog(boolean popConsumerKVServiceLog) { + this.popConsumerKVServiceLog = popConsumerKVServiceLog; + } + + public boolean isPopConsumerKVServiceInit() { + return popConsumerKVServiceInit; + } + + public void setPopConsumerKVServiceInit(boolean popConsumerKVServiceInit) { + this.popConsumerKVServiceInit = popConsumerKVServiceInit; + } + + public boolean isPopConsumerKVServiceEnable() { + return popConsumerKVServiceEnable; + } + + public void setPopConsumerKVServiceEnable(boolean popConsumerKVServiceEnable) { + this.popConsumerKVServiceEnable = popConsumerKVServiceEnable; + } + + public int getPopReviveMaxReturnSizePerRead() { + return popReviveMaxReturnSizePerRead; + } + + public void setPopReviveMaxReturnSizePerRead(int popReviveMaxReturnSizePerRead) { + this.popReviveMaxReturnSizePerRead = popReviveMaxReturnSizePerRead; + } + + public int getPopReviveMaxAttemptTimes() { + return popReviveMaxAttemptTimes; + } + + public void setPopReviveMaxAttemptTimes(int popReviveMaxAttemptTimes) { + this.popReviveMaxAttemptTimes = popReviveMaxAttemptTimes; + } public boolean isTraceOn() { return traceOn; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java index 9e86422c482..623f5748d5a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java @@ -95,6 +95,7 @@ public class RequestCode { public static final int CHANGE_MESSAGE_INVISIBLETIME = 200053; public static final int NOTIFICATION = 200054; public static final int POLLING_INFO = 200055; + public static final int POP_ROLLBACK = 200056; public static final int PUT_KV_CONFIG = 100; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index c5ecdefb529..4b97e14866a 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -1004,4 +1004,10 @@ public List listAcl(String brokerAddr, String subjectFilter, String resourceFilter) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { return defaultMQAdminExtImpl.listAcl(brokerAddr, subjectFilter, resourceFilter); } + + @Override + public void exportPopRecords(String brokerAddr, long timeout) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + defaultMQAdminExtImpl.exportPopRecords(brokerAddr, timeout); + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 17f14f23af8..2523013af0d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -2085,4 +2085,10 @@ public List listAcl(String brokerAddr, String subjectFilter, String resourceFilter) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { return this.mqClientInstance.getMQClientAPIImpl().listAcl(brokerAddr, subjectFilter, resourceFilter, timeoutMillis); } + + @Override + public void exportPopRecords(String brokerAddr, long timeout) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + this.mqClientInstance.getMQClientAPIImpl().exportPopRecord(brokerAddr, timeout); + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index aea43376eac..69a08218646 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -526,4 +526,7 @@ String setCommitLogReadAheadMode(final String brokerAddr, String mode) AclInfo getAcl(String brokerAddr, String subject) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException; List listAcl(String brokerAddr, String subjectFilter, String resourceFilter) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException; + + void exportPopRecords(String brokerAddr, long timeout) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index 313a777ce4f..a16c058ec44 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -80,6 +80,7 @@ import org.apache.rocketmq.tools.command.export.ExportMetadataCommand; import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand; import org.apache.rocketmq.tools.command.export.ExportMetricsCommand; +import org.apache.rocketmq.tools.command.export.ExportPopRecordCommand; import org.apache.rocketmq.tools.command.ha.GetSyncStateSetSubCommand; import org.apache.rocketmq.tools.command.ha.HAStatusSubCommand; import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand; @@ -273,6 +274,7 @@ public static void initCommand() { initCommand(new ExportConfigsCommand()); initCommand(new ExportMetricsCommand()); initCommand(new ExportMetadataInRocksDBCommand()); + initCommand(new ExportPopRecordCommand()); initCommand(new HAStatusSubCommand()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportPopRecordCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportPopRecordCommand.java new file mode 100644 index 00000000000..f8b67c97af3 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportPopRecordCommand.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.export; + +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class ExportPopRecordCommand implements SubCommand { + + @Override + public String commandName() { + return "exportPopRecord"; + } + + @Override + public String commandDesc() { + return "Export pop consumer record"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option( + "c", "clusterName", true, "choose one cluster to export"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("b", "brokerAddr", true, "choose one broker to export"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("d", "dryRun", true, "no actual changes will be made"); + opt.setRequired(false); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { + + DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook); + adminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + adminExt.start(); + boolean dryRun = commandLine.hasOption('d') && + Boolean.FALSE.toString().equalsIgnoreCase(commandLine.getOptionValue('d')); + if (commandLine.hasOption('b')) { + String brokerAddr = commandLine.getOptionValue('b').trim(); + String brokerName = adminExt.getBrokerConfig(brokerAddr).getProperty("brokerName"); + export(adminExt, brokerAddr, brokerName, dryRun); + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + ClusterInfo clusterInfo = adminExt.examineBrokerClusterInfo(); + if (clusterInfo != null) { + Set brokerNameSet = clusterInfo.getClusterAddrTable().get(clusterName); + if (brokerNameSet != null) { + brokerNameSet.forEach(brokerName -> { + BrokerData brokerData = clusterInfo.getBrokerAddrTable().get(brokerName); + if (brokerData != null) { + brokerData.getBrokerAddrs().forEach( + (brokerId, brokerAddr) -> export(adminExt, brokerAddr, brokerName, dryRun)); + } + }); + } + } + } + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + adminExt.shutdown(); + } + } + + private void export(DefaultMQAdminExt adminExt, String brokerAddr, String brokerName, boolean dryRun) { + try { + if (!dryRun) { + adminExt.exportPopRecords(brokerAddr, TimeUnit.SECONDS.toMillis(30)); + } + System.out.printf("Export broker records, " + + "brokerName=%s, brokerAddr=%s, dryRun=%s%n", brokerName, brokerAddr, dryRun); + } catch (Exception e) { + System.out.printf("Export broker records error, " + + "brokerName=%s, brokerAddr=%s, dryRun=%s%n%s", brokerName, brokerAddr, dryRun, e); + } + } +} +