Skip to content

Commit

Permalink
feat(store): add getConsumeOffset API for MessageStore and fix proxy …
Browse files Browse the repository at this point in the history
…tests (#210)

Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Oct 10, 2023
1 parent b4fc81b commit 302b891
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,17 @@ public CompletableFuture<PopResult> popMessage(ProxyContext ctx, AddressableMess
@Override
public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx, ReceiptHandle handle, String messageId,
ChangeInvisibleTimeRequestHeader requestHeader, long timeoutMillis) {
String receiptHandle = requestHeader.getExtraInfo();
String rawHandle = ReceiptHandleUtil.decodeReceiptHandle(receiptHandle);

// The real receipt handle generated by S3RocketMQ.
String rawHandle;
try {
rawHandle = ReceiptHandleUtil.decodeReceiptHandle(requestHeader.getExtraInfo());
} catch (Exception e) {
org.apache.rocketmq.client.consumer.AckResult result = new org.apache.rocketmq.client.consumer.AckResult();
result.setStatus(AckStatus.NO_EXIST);
return CompletableFuture.completedFuture(result);
}

return store.changeInvisibleDuration(rawHandle, requestHeader.getInvisibleTime())
.thenApply(changeInvisibleDurationResult -> {
org.apache.rocketmq.client.consumer.AckResult ackResult = new org.apache.rocketmq.client.consumer.AckResult();
Expand All @@ -250,12 +259,19 @@ public CompletableFuture<AckResult> ackMessage(ProxyContext ctx, ReceiptHandle h
AckMessageRequestHeader requestHeader, long timeoutMillis) {
Integer queueId = requestHeader.getQueueId();

// The real receipt handle generated by S3RocketMQ.
String rawHandle;
try {
rawHandle = ReceiptHandleUtil.decodeReceiptHandle(requestHeader.getExtraInfo());
} catch (Exception e) {
org.apache.rocketmq.client.consumer.AckResult result = new org.apache.rocketmq.client.consumer.AckResult();
result.setStatus(AckStatus.NO_EXIST);
return CompletableFuture.completedFuture(result);
}

CompletableFuture<Topic> topicFuture = topicOf(requestHeader.getTopic());
CompletableFuture<ConsumerGroup> groupFuture = consumerGroupOf(requestHeader.getConsumerGroup());

// The real receipt handle generated by S3RocketMQ.
String rawHandle = ReceiptHandleUtil.decodeReceiptHandle(requestHeader.getExtraInfo());

CompletableFuture<AckResult> resultF = store.ack(rawHandle).thenApply(ackResult -> {
org.apache.rocketmq.client.consumer.AckResult result = new org.apache.rocketmq.client.consumer.AckResult();
switch (ackResult.status()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,27 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.tuple.Pair;

public class MockMessageStore implements MessageStore {
private final HashMap<Long, AtomicLong> offsetMap = new HashMap<>();
private final Set<String> receiptHandleSet = new HashSet<>();
private final Map<Long, List<FlatMessageExt>> messageMap = new HashMap<>();
private final InflightService inflightService = new InflightService();

private long consumeOffset = 0;
private final Map<Pair<Long, Integer>, Long> consumerOffsetMap = new HashMap<>();

public MockMessageStore() {
receiptHandleSet.add("FAAAAAAAAAAMABwABAAAAAwAFAAMAAAAAgAAAAAAAAACAAAAAAAAAAMAAAAAAAAA");
}

@Override
public void start() throws Exception {
public void start() {

}

@Override
public void shutdown() throws Exception {
public void shutdown() {

}

Expand All @@ -66,6 +67,7 @@ public CompletableFuture<PopResult> pop(long consumerGroupId, long topicId, int
}

List<FlatMessageExt> messageList = messageMap.computeIfAbsent(topicId + queueId, v -> new ArrayList<>());
long consumeOffset = consumerOffsetMap.computeIfAbsent(Pair.of(topicId, queueId), v -> 0L);
int start = consumeOffset > messageList.size() ? -1 : (int) consumeOffset;
int end = consumeOffset + batchSize >= messageList.size() ? messageList.size() : (int) consumeOffset + batchSize;

Expand All @@ -76,7 +78,7 @@ public CompletableFuture<PopResult> pop(long consumerGroupId, long topicId, int
} else {
status = PopResult.Status.FOUND;
messageList = messageList.subList(start, end);
consumeOffset = end;
consumerOffsetMap.put(Pair.of(topicId, queueId), consumeOffset + messageList.size());
inflightService.increaseInflightCount(consumerGroupId, topicId, queueId, messageList.size());
}
return CompletableFuture.completedFuture(new PopResult(status, 0L, messageList));
Expand Down Expand Up @@ -135,4 +137,9 @@ public CompletableFuture<TopicQueue.QueueOffsetRange> getOffsetRange(long topicI
long endOffset = offsetMap.computeIfAbsent(topicId + queueId, v -> new AtomicLong()).get();
return CompletableFuture.completedFuture(new TopicQueue.QueueOffsetRange(startOffset, endOffset));
}

@Override
public CompletableFuture<Long> getConsumeOffset(long consumerGroupId, long topicId, int queueId) {
return CompletableFuture.completedFuture(consumerOffsetMap.getOrDefault(Pair.of(topicId, queueId), 0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.automq.rocketmq.proxy.mock.MockProxyMetadataService;
import com.automq.rocketmq.proxy.model.VirtualQueue;
import com.automq.rocketmq.proxy.util.FlatMessageUtil;
import com.automq.rocketmq.proxy.util.ReceiptHandleUtil;
import com.automq.rocketmq.store.api.MessageStore;
import com.automq.rocketmq.store.model.message.TagFilter;
import java.util.List;
Expand All @@ -35,7 +36,6 @@
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.service.message.MessageService;
Expand All @@ -47,15 +47,12 @@
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

@Disabled
class MessageServiceImplTest {
public static final String RECEIPT_HANDLE = "FAAAAAAAAAAMABwABAAAAAwAFAAMAAAAAgAAAAAAAAACAAAAAAAAAAMAAAAAAAAA";

Expand Down Expand Up @@ -125,55 +122,12 @@ void popMessage() {
assertEquals(PopStatus.FOUND, result.getPopStatus());
assertEquals(2, result.getMsgFoundList().size());
// All messages in queue 0 has been consumed
assertEquals(2, metadataService.consumerOffsetOf(consumerGroupId, topicId, 0).join());

// Pop all queues.
header.setQueueId(-1);
header.setMaxMsgNums(4);
messageStore.put(FlatMessageUtil.convertFrom(topicId, 1, "", new Message(topicName, "", new byte[] {})));
messageStore.put(FlatMessageUtil.convertFrom(topicId, 2, "", new Message(topicName, "", new byte[] {})));
messageStore.put(FlatMessageUtil.convertFrom(topicId, 2, "", new Message(topicName, "", new byte[] {})));
messageStore.put(FlatMessageUtil.convertFrom(topicId, 4, "", new Message(topicName, "", new byte[] {})));
messageStore.put(FlatMessageUtil.convertFrom(topicId, 4, "", new Message(topicName, "", new byte[] {})));
messageStore.put(FlatMessageUtil.convertFrom(topicId, 4, "", new Message(topicName, "", new byte[] {})));
assertEquals(2, messageStore.getConsumeOffset(consumerGroupId, topicId, 0).join());

// Pop again.
result = messageService.popMessage(ProxyContext.create(), messageQueue, header, 0L).join();
assertEquals(PopStatus.FOUND, result.getPopStatus());
assertEquals(4, result.getMsgFoundList().size());
// Queue 1 should not be touched because it is not assigned.
assertEquals(0, metadataService.consumerOffsetOf(consumerGroupId, topicId, 1).join());

// The priorities of queues 2 and 4 are not fixed, so there are the following two results:
// 1. pop one message from queue 2 and three messages from queue 4
// 2. pop two messages each from queue 2 and 4
int messageFromQueue2 = 0;
int messageFromQueue4 = 0;
for (MessageExt messageExt : result.getMsgFoundList()) {
switch (messageExt.getQueueId()) {
case 2 -> messageFromQueue2++;
case 4 -> messageFromQueue4++;
default -> fail("All messages should be popped from queue 2 or 4.");
}
}
assertEquals(messageFromQueue2, metadataService.consumerOffsetOf(consumerGroupId, topicId, 2).join());
assertEquals(messageFromQueue4, metadataService.consumerOffsetOf(consumerGroupId, topicId, 4).join());

// Pop remaining messages.
header.setMaxMsgNums(1);
result = messageService.popMessage(ProxyContext.create(), messageQueue, header, 0L).join();
assertEquals(PopStatus.FOUND, result.getPopStatus());
assertEquals(1, result.getMsgFoundList().size());

int remainingQueue;
if (messageFromQueue4 == 3) {
remainingQueue = 2;
} else {
remainingQueue = 4;
}

MessageExt messageExt = result.getMsgFoundList().get(0);
assertEquals(remainingQueue, messageExt.getQueueId());
assertEquals(remainingQueue == 2 ? 1 : 2, messageExt.getQueueOffset());
assertEquals(PopStatus.NO_NEW_MSG, result.getPopStatus());
assertEquals(0, result.getMsgFoundList().size());
}

@Test
Expand Down Expand Up @@ -214,9 +168,10 @@ void pop_withFifo() {
assertEquals(0, result.getMsgFoundList().size());

AckMessageRequestHeader ackHeader = new AckMessageRequestHeader();
ackHeader.setExtraInfo(RECEIPT_HANDLE);
ackHeader.setExtraInfo(ReceiptHandleUtil.encodeReceiptHandle(RECEIPT_HANDLE, 0L));
ackHeader.setTopic(topicName);
ackHeader.setQueueId(0);
ackHeader.setConsumerGroup("group");
messageService.ackMessage(context, null, "", ackHeader, 0L);
messageService.ackMessage(context, null, "", ackHeader, 0L);

Expand All @@ -232,7 +187,7 @@ void pop_withFifo() {
@Test
void changeInvisibleTime() {
ChangeInvisibleTimeRequestHeader header = new ChangeInvisibleTimeRequestHeader();
header.setExtraInfo(RECEIPT_HANDLE);
header.setExtraInfo(ReceiptHandleUtil.encodeReceiptHandle(RECEIPT_HANDLE, 0L));
header.setInvisibleTime(100L);
AckResult ackResult = messageService.changeInvisibleTime(ProxyContext.create(), null, null, header, 0).join();
assertEquals(AckStatus.OK, ackResult.getStatus());
Expand All @@ -245,9 +200,10 @@ void changeInvisibleTime() {
@Test
void ackMessage() {
AckMessageRequestHeader header = new AckMessageRequestHeader();
header.setExtraInfo(RECEIPT_HANDLE);
header.setExtraInfo(ReceiptHandleUtil.encodeReceiptHandle(RECEIPT_HANDLE, 0L));
header.setTopic("topic");
header.setQueueId(0);
header.setConsumerGroup("group");
AckResult ackResult = messageService.ackMessage(ProxyContext.create(), null, null, header, 0).join();
assertEquals(AckStatus.OK, ackResult.getStatus());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,10 @@ public CompletableFuture<TopicQueue.QueueOffsetRange> getOffsetRange(long topicI
return topicQueueManager.getOrCreate(topicId, queueId)
.thenCompose(TopicQueue::getOffsetRange);
}

@Override
public CompletableFuture<Long> getConsumeOffset(long consumerGroupId, long topicId, int queueId) {
return topicQueueManager.getOrCreate(topicId, queueId)
.thenCompose(topicQueue -> topicQueue.getConsumeOffset(consumerGroupId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,14 @@ CompletableFuture<PopResult> pop(long consumerGroupId, long topicId, int queueId
* @return offset range, <code>[startOffset, endOffset)</code>
*/
CompletableFuture<TopicQueue.QueueOffsetRange> getOffsetRange(long topicId, int queueId);

/**
* Get consume offset of specified consumer group.
*
* @param consumerGroupId consumer group id
* @param topicId topic id
* @param queueId queue id
* @return consume offset
*/
CompletableFuture<Long> getConsumeOffset(long consumerGroupId, long topicId, int queueId);
}

0 comments on commit 302b891

Please sign in to comment.