Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7545] [RIP-65] Support efficient random index for massive messages #7546

Merged
merged 3 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion style/spotbugs-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<Bug pattern="RV_ABSOLUTE_VALUE_OF_HASHCODE"/>
</Match>
<Match>
<Class name="org.apache.rocketmq.tieredstore.file.TieredIndexFile"/>
<Class name="org.apache.rocketmq.tieredstore.index.TieredIndexFile"/>
<Method name="indexKeyHashMethod" />
<Bug pattern="RV_ABSOLUTE_VALUE_OF_HASHCODE"/>
</Match>
Expand Down
14 changes: 14 additions & 0 deletions tieredstore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,19 @@
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.36</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.36</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
Expand All @@ -50,15 +51,15 @@
import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
import org.apache.rocketmq.tieredstore.index.IndexItem;
import org.apache.rocketmq.tieredstore.index.IndexService;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.metadata.TopicMetadata;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import org.apache.rocketmq.common.BoundaryType;

public class TieredMessageFetcher implements MessageStoreFetcher {

Expand Down Expand Up @@ -555,85 +556,51 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, Bo
public CompletableFuture<QueryMessageResult> queryMessageAsync(
String topic, String key, int maxCount, long begin, long end) {

TieredIndexFile indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
IndexService indexStoreService = TieredFlatFileManager.getTieredIndexService(storeConfig);

int hashCode = TieredIndexFile.indexKeyHashMethod(TieredIndexFile.buildKey(topic, key));
long topicId;
try {
TopicMetadata topicMetadata = metadataStore.getTopic(topic);
if (topicMetadata == null) {
LOGGER.info("TieredMessageFetcher#queryMessageAsync, topic metadata not found, topic: {}", topic);
LOGGER.info("MessageFetcher#queryMessageAsync, topic metadata not found, topic: {}", topic);
return CompletableFuture.completedFuture(new QueryMessageResult());
}
topicId = topicMetadata.getTopicId();
} catch (Exception e) {
LOGGER.error("TieredMessageFetcher#queryMessageAsync, get topic id failed, topic: {}", topic, e);
LOGGER.error("MessageFetcher#queryMessageAsync, get topic id failed, topic: {}", topic, e);
return CompletableFuture.completedFuture(new QueryMessageResult());
}

return indexFile.queryAsync(topic, key, begin, end)
.thenCompose(indexBufferList -> {
QueryMessageResult result = new QueryMessageResult();
int resultCount = 0;
List<CompletableFuture<Void>> futureList = new ArrayList<>(maxCount);
for (Pair<Long, ByteBuffer> pair : indexBufferList) {
Long fileBeginTimestamp = pair.getKey();
ByteBuffer indexBuffer = pair.getValue();

if (indexBuffer.remaining() % TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE != 0) {
LOGGER.error("[Bug] TieredMessageFetcher#queryMessageAsync: " +
"index buffer size {} is not multiple of index item size {}",
indexBuffer.remaining(), TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE);
continue;
}

for (int indexOffset = indexBuffer.position();
indexOffset < indexBuffer.limit();
indexOffset += TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE) {

int indexItemHashCode = indexBuffer.getInt(indexOffset);
if (indexItemHashCode != hashCode) {
continue;
}

int indexItemTopicId = indexBuffer.getInt(indexOffset + 4);
if (indexItemTopicId != topicId) {
continue;
}

int queueId = indexBuffer.getInt(indexOffset + 4 + 4);
CompositeFlatFile flatFile =
flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, queueId));
if (flatFile == null) {
continue;
}

// decode index item
long offset = indexBuffer.getLong(indexOffset + 4 + 4 + 4);
int size = indexBuffer.getInt(indexOffset + 4 + 4 + 4 + 8);
int timeDiff = indexBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4);
long indexTimestamp = fileBeginTimestamp + timeDiff;
if (indexTimestamp < begin || indexTimestamp > end) {
continue;
}
CompletableFuture<List<IndexItem>> future = indexStoreService.queryAsync(topic, key, maxCount, begin, end);

CompletableFuture<Void> getMessageFuture = flatFile.getCommitLogAsync(offset, size)
.thenAccept(messageBuffer -> result.addMessage(
new SelectMappedBufferResult(0, messageBuffer, size, null)));
futureList.add(getMessageFuture);

resultCount++;
if (resultCount >= maxCount) {
break;
}
}

if (resultCount >= maxCount) {
break;
}
return future.thenCompose(indexItemList -> {
QueryMessageResult result = new QueryMessageResult();
List<CompletableFuture<Void>> futureList = new ArrayList<>(maxCount);
for (IndexItem indexItem : indexItemList) {
if (topicId != indexItem.getTopicId()) {
continue;
}
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
.thenApply(v -> result);
});
CompositeFlatFile flatFile =
flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, indexItem.getQueueId()));
if (flatFile == null) {
continue;
}
CompletableFuture<Void> getMessageFuture = flatFile
.getCommitLogAsync(indexItem.getOffset(), indexItem.getSize())
.thenAccept(messageBuffer -> result.addMessage(
new SelectMappedBufferResult(
indexItem.getOffset(), messageBuffer, indexItem.getSize(), null)));
futureList.add(getMessageFuture);
if (futureList.size() >= maxCount) {
break;
}
}
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> result);
}).whenComplete((result, throwable) -> {
if (result != null) {
LOGGER.info("MessageFetcher#queryMessageAsync, query result: {}, topic: {}, topicId: {}, key: {}, maxCount: {}, timestamp: {}-{}",
result.getMessageBufferList().size(), topic, topicId, key, maxCount, begin, end);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

package org.apache.rocketmq.tieredstore.file;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.index.IndexService;
import org.apache.rocketmq.tieredstore.metadata.QueueMetadata;
import org.apache.rocketmq.tieredstore.metadata.TopicMetadata;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
Expand All @@ -31,13 +35,13 @@ public class CompositeQueueFlatFile extends CompositeFlatFile {
private final MessageQueue messageQueue;
private long topicSequenceNumber;
private QueueMetadata queueMetadata;
private final TieredIndexFile indexFile;
private final IndexService indexStoreService;

public CompositeQueueFlatFile(TieredFileAllocator fileQueueFactory, MessageQueue messageQueue) {
super(fileQueueFactory, TieredStoreUtil.toPath(messageQueue));
this.messageQueue = messageQueue;
this.recoverQueueMetadata();
this.indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
this.indexStoreService = TieredFlatFileManager.getTieredIndexService(storeConfig);
}

@Override
Expand Down Expand Up @@ -85,24 +89,15 @@ public AppendResult appendIndexFile(DispatchRequest request) {
return AppendResult.FILE_CLOSED;
}

Set<String> keySet = new HashSet<>(
Arrays.asList(request.getKeys().split(MessageConst.KEY_SEPARATOR)));
if (StringUtils.isNotBlank(request.getUniqKey())) {
AppendResult result = indexFile.append(messageQueue, (int) topicSequenceNumber,
request.getUniqKey(), request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
if (result != AppendResult.SUCCESS) {
return result;
}
keySet.add(request.getUniqKey());
}

for (String key : request.getKeys().split(MessageConst.KEY_SEPARATOR)) {
if (StringUtils.isNotBlank(key)) {
AppendResult result = indexFile.append(messageQueue, (int) topicSequenceNumber,
key, request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
if (result != AppendResult.SUCCESS) {
return result;
}
}
}
return AppendResult.SUCCESS;
return indexStoreService.putKey(
messageQueue.getTopic(), (int) topicSequenceNumber, messageQueue.getQueueId(), keySet,
request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp());
}

public MessageQueue getMessageQueue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.BoundaryType;
lizhimins marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
import org.apache.rocketmq.common.BoundaryType;

public class TieredConsumeQueue {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ public FileSegmentType getFileType() {
return fileType;
}

@VisibleForTesting
public List<TieredFileSegment> getFileSegmentList() {
return fileSegmentList;
}
Expand Down Expand Up @@ -274,7 +273,7 @@ public int getFileSegmentCount() {
}

@Nullable
protected TieredFileSegment getFileByIndex(int index) {
public TieredFileSegment getFileByIndex(int index) {
fileSegmentLock.readLock().lock();
try {
if (index < fileSegmentList.size()) {
Expand Down Expand Up @@ -354,7 +353,7 @@ protected TieredFileSegment getFileByTime(long timestamp, BoundaryType boundaryT
}
}

protected List<TieredFileSegment> getFileListByTime(long beginTime, long endTime) {
public List<TieredFileSegment> getFileListByTime(long beginTime, long endTime) {
fileSegmentLock.readLock().lock();
try {
return fileSegmentList.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.apache.rocketmq.tieredstore.index.IndexService;
import org.apache.rocketmq.tieredstore.index.IndexStoreService;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;

Expand All @@ -43,7 +45,7 @@ public class TieredFlatFileManager {
private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);

private static volatile TieredFlatFileManager instance;
private static volatile TieredIndexFile indexFile;
private static volatile IndexStoreService indexStoreService;

private final TieredMetadataStore metadataStore;
private final TieredMessageStoreConfig storeConfig;
Expand Down Expand Up @@ -76,25 +78,26 @@ public static TieredFlatFileManager getInstance(TieredMessageStoreConfig storeCo
return instance;
}

public static TieredIndexFile getIndexFile(TieredMessageStoreConfig storeConfig) {
public static IndexService getTieredIndexService(TieredMessageStoreConfig storeConfig) {
if (storeConfig == null) {
return indexFile;
return indexStoreService;
}

if (indexFile == null) {
if (indexStoreService == null) {
synchronized (TieredFlatFileManager.class) {
if (indexFile == null) {
if (indexStoreService == null) {
try {
String filePath = TieredStoreUtil.toPath(new MessageQueue(
TieredStoreUtil.RMQ_SYS_TIERED_STORE_INDEX_TOPIC, storeConfig.getBrokerName(), 0));
indexFile = new TieredIndexFile(new TieredFileAllocator(storeConfig), filePath);
indexStoreService = new IndexStoreService(new TieredFileAllocator(storeConfig), filePath);
indexStoreService.start();
} catch (Exception e) {
logger.error("Construct FlatFileManager indexFile error", e);
}
}
}
}
return indexFile;
return indexStoreService;
}

public void doCommit() {
Expand All @@ -120,15 +123,6 @@ public void doCommit() {
}
}, delay, TimeUnit.MILLISECONDS);
}
TieredStoreExecutor.commitExecutor.schedule(() -> {
try {
if (indexFile != null) {
indexFile.commit(true);
}
} catch (Throwable e) {
logger.error("Commit indexFile periodically failed", e);
}
}, 0, TimeUnit.MILLISECONDS);
}

public void doCleanExpiredFile() {
Expand All @@ -148,10 +142,6 @@ public void doCleanExpiredFile() {
}
});
}
if (indexFile != null) {
indexFile.cleanExpiredFile(expiredTimeStamp);
indexFile.destroyExpiredFile();
}
}

private void doScheduleTask() {
Expand Down Expand Up @@ -244,7 +234,7 @@ public void cleanup() {

private static void cleanStaticReference() {
instance = null;
indexFile = null;
indexStoreService = null;
}

@Nullable
Expand All @@ -271,17 +261,17 @@ public ImmutableList<CompositeQueueFlatFile> deepCopyFlatFileToList() {
}

public void shutdown() {
if (indexFile != null) {
indexFile.commit(true);
if (indexStoreService != null) {
indexStoreService.shutdown();
}
for (CompositeFlatFile flatFile : deepCopyFlatFileToList()) {
flatFile.shutdown();
}
}

public void destroy() {
if (indexFile != null) {
indexFile.destroy();
if (indexStoreService != null) {
indexStoreService.destroy();
}
ImmutableList<CompositeQueueFlatFile> flatFileList = deepCopyFlatFileToList();
cleanup();
Expand Down
Loading
Loading