diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GroupCommitContext.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GroupCommitContext.java new file mode 100644 index 00000000000..f677e7c934e --- /dev/null +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GroupCommitContext.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tieredstore.common; + +import java.util.List; +import org.apache.rocketmq.store.DispatchRequest; +import org.apache.rocketmq.store.SelectMappedBufferResult; + +public class GroupCommitContext { + + private long endOffset; + + private List bufferList; + + private List dispatchRequests; + + public long getEndOffset() { + return endOffset; + } + + public void setEndOffset(long endOffset) { + this.endOffset = endOffset; + } + + public List getBufferList() { + return bufferList; + } + + public void setBufferList(List bufferList) { + this.bufferList = bufferList; + } + + public List getDispatchRequests() { + return dispatchRequests; + } + + public void setDispatchRequests(List dispatchRequests) { + this.dispatchRequests = dispatchRequests; + } + + public void release() { + if (bufferList != null) { + for (SelectMappedBufferResult bufferResult : bufferList) { + bufferResult.release(); + } + bufferList.clear(); + bufferList = null; + } + if (dispatchRequests != null) { + dispatchRequests.clear(); + dispatchRequests = null; + } + + } +} diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java index 9b1e53564d7..32fe964a1c0 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java @@ -19,12 +19,16 @@ import io.opentelemetry.api.common.Attributes; import java.nio.ByteBuffer; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; @@ -42,6 +46,7 @@ import org.apache.rocketmq.tieredstore.TieredMessageStore; import org.apache.rocketmq.tieredstore.common.AppendResult; import org.apache.rocketmq.tieredstore.common.FileSegmentType; +import org.apache.rocketmq.tieredstore.common.GroupCommitContext; import org.apache.rocketmq.tieredstore.file.FlatFileInterface; import org.apache.rocketmq.tieredstore.file.FlatFileStore; import org.apache.rocketmq.tieredstore.index.IndexService; @@ -65,6 +70,7 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message protected final MessageStoreFilter topicFilter; protected final Semaphore semaphore; protected final IndexService indexService; + protected final Map failedGroupCommitMap; public MessageStoreDispatcherImpl(TieredMessageStore messageStore) { this.messageStore = messageStore; @@ -77,6 +83,7 @@ public MessageStoreDispatcherImpl(TieredMessageStore messageStore) { this.flatFileStore = messageStore.getFlatFileStore(); this.storeExecutor = messageStore.getStoreExecutor(); this.indexService = messageStore.getIndexService(); + this.failedGroupCommitMap = new ConcurrentHashMap<>(); } @Override @@ -153,10 +160,22 @@ public CompletableFuture doScheduleDispatch(FlatFileInterface flatFile, // If the previous commit fails, attempt to trigger a commit directly. if (commitOffset < currentOffset) { - this.commitAsync(flatFile); + this.commitAsync(flatFile).whenComplete((result, throwable) -> { + if (throwable != null) { + log.error("topic: {}, queueId: {} flat file flush cache failed more than twice.", topic, queueId, throwable); + } + }); return CompletableFuture.completedFuture(false); } + if (failedGroupCommitMap.containsKey(flatFile)) { + GroupCommitContext failedCommit = failedGroupCommitMap.get(flatFile); + if (failedCommit.getEndOffset() <= commitOffset) { + failedGroupCommitMap.remove(flatFile); + constructIndexFile(flatFile.getTopicId(), failedCommit); + } + } + if (currentOffset < minOffsetInQueue) { log.warn("MessageDispatcher#dispatch, current offset is too small, topic={}, queueId={}, offset={}-{}, current={}", topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset); @@ -224,6 +243,8 @@ public CompletableFuture doScheduleDispatch(FlatFileInterface flatFile, } long offset = currentOffset; + List appendingBufferList = new ArrayList<>(); + List dispatchRequestList = new ArrayList<>(); for (; offset < targetOffset; offset++) { cqUnit = consumeQueue.get(offset); bufferSize += cqUnit.getSize(); @@ -231,6 +252,7 @@ public CompletableFuture doScheduleDispatch(FlatFileInterface flatFile, break; } message = defaultStore.selectOneMessageByOffset(cqUnit.getPos(), cqUnit.getSize()); + appendingBufferList.add(message); ByteBuffer byteBuffer = message.getByteBuffer(); AppendResult result = flatFile.appendCommitLog(message); @@ -251,13 +273,20 @@ public CompletableFuture doScheduleDispatch(FlatFileInterface flatFile, result = flatFile.appendConsumeQueue(dispatchRequest); if (!AppendResult.SUCCESS.equals(result)) { break; + } else { + dispatchRequestList.add(dispatchRequest); } } + GroupCommitContext groupCommitContext = new GroupCommitContext(); + groupCommitContext.setEndOffset(offset); + groupCommitContext.setBufferList(appendingBufferList); + groupCommitContext.setDispatchRequests(dispatchRequestList); + // If there are many messages waiting to be uploaded, call the upload logic immediately. boolean repeat = timeout || maxOffsetInQueue - offset > storeConfig.getTieredStoreGroupCommitCount(); - if (!flatFile.getDispatchRequestList().isEmpty()) { + if (!dispatchRequestList.isEmpty()) { Attributes attributes = TieredStoreMetricsManager.newAttributesBuilder() .put(TieredStoreMetricsConstant.LABEL_TOPIC, topic) .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, queueId) @@ -265,8 +294,19 @@ public CompletableFuture doScheduleDispatch(FlatFileInterface flatFile, .build(); TieredStoreMetricsManager.messagesDispatchTotal.add(offset - currentOffset, attributes); - this.commitAsync(flatFile).whenComplete((unused, throwable) -> { - if (repeat) { + this.commitAsync(flatFile).whenComplete((success, throwable) -> { + if (success) { + constructIndexFile(flatFile.getTopicId(), groupCommitContext); + } + else { + //next commit async,execute constructIndexFile. + GroupCommitContext oldCommit = failedGroupCommitMap.put(flatFile, groupCommitContext); + if (oldCommit != null) { + log.warn("MessageDispatcher#dispatch, topic={}, queueId={} old failed commit context not release", topic, queueId); + oldCommit.release(); + } + } + if (success && repeat) { storeExecutor.commonExecutor.submit(() -> dispatchWithSemaphore(flatFile)); } } @@ -282,22 +322,28 @@ public CompletableFuture doScheduleDispatch(FlatFileInterface flatFile, return CompletableFuture.completedFuture(false); } - public CompletableFuture commitAsync(FlatFileInterface flatFile) { - return flatFile.commitAsync().thenAcceptAsync(success -> { - if (success) { - if (storeConfig.isMessageIndexEnable()) { - flatFile.getDispatchRequestList().forEach( - request -> constructIndexFile(flatFile.getTopicId(), request)); + public CompletableFuture commitAsync(FlatFileInterface flatFile) { + return flatFile.commitAsync(); + } + + private void constructIndexFile(long topicId, GroupCommitContext groupCommitContext) { + MessageStoreExecutor.getInstance().bufferCommitExecutor.submit(() -> { + if (storeConfig.isMessageIndexEnable()) { + try { + groupCommitContext.getDispatchRequests().forEach(request -> constructIndexFile0(topicId, request)); + } + catch (Throwable e) { + log.error("constructIndexFile error {}", topicId, e); } - flatFile.release(); } - }, storeExecutor.bufferCommitExecutor); + groupCommitContext.release(); + }); } /** * Building indexes with offsetId is no longer supported because offsetId has changed in tiered storage */ - public void constructIndexFile(long topicId, DispatchRequest request) { + public void constructIndexFile0(long topicId, DispatchRequest request) { Set keySet = new HashSet<>(); if (StringUtils.isNotBlank(request.getUniqKey())) { keySet.add(request.getUniqKey()); @@ -309,12 +355,27 @@ public void constructIndexFile(long topicId, DispatchRequest request) { request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp()); } + private void releaseClosedPendingGroupCommit() { + Iterator> iterator = failedGroupCommitMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getKey().isClosed()) { + entry.getValue().release(); + iterator.remove(); + } + } + } + + @Override public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore); + + releaseClosedPendingGroupCommit(); + this.waitForRunning(Duration.ofSeconds(20).toMillis()); } catch (Throwable t) { log.error("MessageStore dispatch error", t); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileInterface.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileInterface.java index 619470fbc27..01e7f25a467 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileInterface.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileInterface.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.tieredstore.file; import java.nio.ByteBuffer; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.Lock; import org.apache.rocketmq.common.BoundaryType; @@ -58,8 +57,6 @@ public interface FlatFileInterface { */ AppendResult appendConsumeQueue(DispatchRequest request); - List getDispatchRequestList(); - void release(); long getMinStoreTimestamp(); @@ -143,6 +140,8 @@ public interface FlatFileInterface { */ CompletableFuture getQueueOffsetByTimeAsync(long timestamp, BoundaryType boundaryType); + boolean isClosed(); + /** * Shutdown process */ diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java index d5675976cb1..89661963978 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java @@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -51,14 +52,13 @@ public class FlatMessageFile implements FlatFileInterface { protected final String filePath; protected final ReentrantLock fileLock; + protected final Semaphore commitLock = new Semaphore(1); protected final MessageStoreConfig storeConfig; protected final MetadataStore metadataStore; protected final FlatCommitLogFile commitLog; protected final FlatConsumeQueueFile consumeQueue; protected final AtomicLong lastDestroyTime; - protected final List bufferResultList; - protected final List dispatchRequestList; protected final ConcurrentMap> inFlightRequestMap; public FlatMessageFile(FlatFileFactory fileFactory, String topic, int queueId) { @@ -76,8 +76,6 @@ public FlatMessageFile(FlatFileFactory fileFactory, String filePath) { this.commitLog = fileFactory.createFlatFileForCommitLog(filePath); this.consumeQueue = fileFactory.createFlatFileForConsumeQueue(filePath); this.lastDestroyTime = new AtomicLong(); - this.bufferResultList = new ArrayList<>(); - this.dispatchRequestList = new ArrayList<>(); this.inFlightRequestMap = new ConcurrentHashMap<>(); } @@ -156,7 +154,6 @@ public AppendResult appendCommitLog(SelectMappedBufferResult message) { if (closed) { return AppendResult.FILE_CLOSED; } - this.bufferResultList.add(message); return this.appendCommitLog(message.getByteBuffer()); } @@ -172,29 +169,14 @@ public AppendResult appendConsumeQueue(DispatchRequest request) { buffer.putLong(request.getTagsCode()); buffer.flip(); - this.dispatchRequestList.add(request); return consumeQueue.append(buffer, request.getStoreTimestamp()); } - @Override - public List getDispatchRequestList() { - return dispatchRequestList; - } + @Override public void release() { - for (SelectMappedBufferResult bufferResult : bufferResultList) { - bufferResult.release(); - } - - if (queueMetadata != null) { - log.trace("FlatMessageFile release, topic={}, queueId={}, bufferSize={}, requestListSize={}", - queueMetadata.getQueue().getTopic(), queueMetadata.getQueue().getQueueId(), - bufferResultList.size(), dispatchRequestList.size()); - } - bufferResultList.clear(); - dispatchRequestList.clear(); } @Override @@ -246,13 +228,18 @@ public long getConsumeQueueCommitOffset() { @Override public CompletableFuture commitAsync() { + // acquire lock + if (commitLock.drainPermits() <= 0) { + return CompletableFuture.completedFuture(false); + } + return this.commitLog.commitAsync() .thenCompose(result -> { if (result) { return consumeQueue.commitAsync(); } return CompletableFuture.completedFuture(false); - }); + }).whenComplete((result, throwable) -> commitLock.release()); } @Override @@ -363,6 +350,11 @@ public boolean equals(Object obj) { return StringUtils.equals(filePath, ((FlatMessageFile) obj).filePath); } + @Override + public boolean isClosed() { + return closed; + } + @Override public void shutdown() { closed = true;