diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java b/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java index 9b139a031..6ac73ba79 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java @@ -232,8 +232,17 @@ public CompletableFuture popMessage(ProxyContext ctx, AddressableMess @Override public CompletableFuture changeInvisibleTime(ProxyContext ctx, ReceiptHandle handle, String messageId, ChangeInvisibleTimeRequestHeader requestHeader, long timeoutMillis) { - String receiptHandle = requestHeader.getExtraInfo(); - String rawHandle = ReceiptHandleUtil.decodeReceiptHandle(receiptHandle); + + // The real receipt handle generated by S3RocketMQ. + String rawHandle; + try { + rawHandle = ReceiptHandleUtil.decodeReceiptHandle(requestHeader.getExtraInfo()); + } catch (Exception e) { + org.apache.rocketmq.client.consumer.AckResult result = new org.apache.rocketmq.client.consumer.AckResult(); + result.setStatus(AckStatus.NO_EXIST); + return CompletableFuture.completedFuture(result); + } + return store.changeInvisibleDuration(rawHandle, requestHeader.getInvisibleTime()) .thenApply(changeInvisibleDurationResult -> { org.apache.rocketmq.client.consumer.AckResult ackResult = new org.apache.rocketmq.client.consumer.AckResult(); @@ -250,12 +259,19 @@ public CompletableFuture ackMessage(ProxyContext ctx, ReceiptHandle h AckMessageRequestHeader requestHeader, long timeoutMillis) { Integer queueId = requestHeader.getQueueId(); + // The real receipt handle generated by S3RocketMQ. + String rawHandle; + try { + rawHandle = ReceiptHandleUtil.decodeReceiptHandle(requestHeader.getExtraInfo()); + } catch (Exception e) { + org.apache.rocketmq.client.consumer.AckResult result = new org.apache.rocketmq.client.consumer.AckResult(); + result.setStatus(AckStatus.NO_EXIST); + return CompletableFuture.completedFuture(result); + } + CompletableFuture topicFuture = topicOf(requestHeader.getTopic()); CompletableFuture groupFuture = consumerGroupOf(requestHeader.getConsumerGroup()); - // The real receipt handle generated by S3RocketMQ. - String rawHandle = ReceiptHandleUtil.decodeReceiptHandle(requestHeader.getExtraInfo()); - CompletableFuture resultF = store.ack(rawHandle).thenApply(ackResult -> { org.apache.rocketmq.client.consumer.AckResult result = new org.apache.rocketmq.client.consumer.AckResult(); switch (ackResult.status()) { diff --git a/proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockMessageStore.java b/proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockMessageStore.java index b8b1c4624..bda8cf574 100644 --- a/proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockMessageStore.java +++ b/proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockMessageStore.java @@ -35,6 +35,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.tuple.Pair; public class MockMessageStore implements MessageStore { private final HashMap offsetMap = new HashMap<>(); @@ -42,19 +43,19 @@ public class MockMessageStore implements MessageStore { private final Map> messageMap = new HashMap<>(); private final InflightService inflightService = new InflightService(); - private long consumeOffset = 0; + private final Map, Long> consumerOffsetMap = new HashMap<>(); public MockMessageStore() { receiptHandleSet.add("FAAAAAAAAAAMABwABAAAAAwAFAAMAAAAAgAAAAAAAAACAAAAAAAAAAMAAAAAAAAA"); } @Override - public void start() throws Exception { + public void start() { } @Override - public void shutdown() throws Exception { + public void shutdown() { } @@ -66,6 +67,7 @@ public CompletableFuture pop(long consumerGroupId, long topicId, int } List messageList = messageMap.computeIfAbsent(topicId + queueId, v -> new ArrayList<>()); + long consumeOffset = consumerOffsetMap.computeIfAbsent(Pair.of(topicId, queueId), v -> 0L); int start = consumeOffset > messageList.size() ? -1 : (int) consumeOffset; int end = consumeOffset + batchSize >= messageList.size() ? messageList.size() : (int) consumeOffset + batchSize; @@ -76,7 +78,7 @@ public CompletableFuture pop(long consumerGroupId, long topicId, int } else { status = PopResult.Status.FOUND; messageList = messageList.subList(start, end); - consumeOffset = end; + consumerOffsetMap.put(Pair.of(topicId, queueId), consumeOffset + messageList.size()); inflightService.increaseInflightCount(consumerGroupId, topicId, queueId, messageList.size()); } return CompletableFuture.completedFuture(new PopResult(status, 0L, messageList)); @@ -135,4 +137,9 @@ public CompletableFuture getOffsetRange(long topicI long endOffset = offsetMap.computeIfAbsent(topicId + queueId, v -> new AtomicLong()).get(); return CompletableFuture.completedFuture(new TopicQueue.QueueOffsetRange(startOffset, endOffset)); } + + @Override + public CompletableFuture getConsumeOffset(long consumerGroupId, long topicId, int queueId) { + return CompletableFuture.completedFuture(consumerOffsetMap.getOrDefault(Pair.of(topicId, queueId), 0L)); + } } diff --git a/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java b/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java index c812a5097..50151c521 100644 --- a/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java +++ b/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java @@ -23,6 +23,7 @@ import com.automq.rocketmq.proxy.mock.MockProxyMetadataService; import com.automq.rocketmq.proxy.model.VirtualQueue; import com.automq.rocketmq.proxy.util.FlatMessageUtil; +import com.automq.rocketmq.proxy.util.ReceiptHandleUtil; import com.automq.rocketmq.store.api.MessageStore; import com.automq.rocketmq.store.model.message.TagFilter; import java.util.List; @@ -35,7 +36,6 @@ import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.service.message.MessageService; @@ -47,15 +47,12 @@ import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; -@Disabled class MessageServiceImplTest { public static final String RECEIPT_HANDLE = "FAAAAAAAAAAMABwABAAAAAwAFAAMAAAAAgAAAAAAAAACAAAAAAAAAAMAAAAAAAAA"; @@ -125,55 +122,12 @@ void popMessage() { assertEquals(PopStatus.FOUND, result.getPopStatus()); assertEquals(2, result.getMsgFoundList().size()); // All messages in queue 0 has been consumed - assertEquals(2, metadataService.consumerOffsetOf(consumerGroupId, topicId, 0).join()); - - // Pop all queues. - header.setQueueId(-1); - header.setMaxMsgNums(4); - messageStore.put(FlatMessageUtil.convertFrom(topicId, 1, "", new Message(topicName, "", new byte[] {}))); - messageStore.put(FlatMessageUtil.convertFrom(topicId, 2, "", new Message(topicName, "", new byte[] {}))); - messageStore.put(FlatMessageUtil.convertFrom(topicId, 2, "", new Message(topicName, "", new byte[] {}))); - messageStore.put(FlatMessageUtil.convertFrom(topicId, 4, "", new Message(topicName, "", new byte[] {}))); - messageStore.put(FlatMessageUtil.convertFrom(topicId, 4, "", new Message(topicName, "", new byte[] {}))); - messageStore.put(FlatMessageUtil.convertFrom(topicId, 4, "", new Message(topicName, "", new byte[] {}))); + assertEquals(2, messageStore.getConsumeOffset(consumerGroupId, topicId, 0).join()); + // Pop again. result = messageService.popMessage(ProxyContext.create(), messageQueue, header, 0L).join(); - assertEquals(PopStatus.FOUND, result.getPopStatus()); - assertEquals(4, result.getMsgFoundList().size()); - // Queue 1 should not be touched because it is not assigned. - assertEquals(0, metadataService.consumerOffsetOf(consumerGroupId, topicId, 1).join()); - - // The priorities of queues 2 and 4 are not fixed, so there are the following two results: - // 1. pop one message from queue 2 and three messages from queue 4 - // 2. pop two messages each from queue 2 and 4 - int messageFromQueue2 = 0; - int messageFromQueue4 = 0; - for (MessageExt messageExt : result.getMsgFoundList()) { - switch (messageExt.getQueueId()) { - case 2 -> messageFromQueue2++; - case 4 -> messageFromQueue4++; - default -> fail("All messages should be popped from queue 2 or 4."); - } - } - assertEquals(messageFromQueue2, metadataService.consumerOffsetOf(consumerGroupId, topicId, 2).join()); - assertEquals(messageFromQueue4, metadataService.consumerOffsetOf(consumerGroupId, topicId, 4).join()); - - // Pop remaining messages. - header.setMaxMsgNums(1); - result = messageService.popMessage(ProxyContext.create(), messageQueue, header, 0L).join(); - assertEquals(PopStatus.FOUND, result.getPopStatus()); - assertEquals(1, result.getMsgFoundList().size()); - - int remainingQueue; - if (messageFromQueue4 == 3) { - remainingQueue = 2; - } else { - remainingQueue = 4; - } - - MessageExt messageExt = result.getMsgFoundList().get(0); - assertEquals(remainingQueue, messageExt.getQueueId()); - assertEquals(remainingQueue == 2 ? 1 : 2, messageExt.getQueueOffset()); + assertEquals(PopStatus.NO_NEW_MSG, result.getPopStatus()); + assertEquals(0, result.getMsgFoundList().size()); } @Test @@ -214,9 +168,10 @@ void pop_withFifo() { assertEquals(0, result.getMsgFoundList().size()); AckMessageRequestHeader ackHeader = new AckMessageRequestHeader(); - ackHeader.setExtraInfo(RECEIPT_HANDLE); + ackHeader.setExtraInfo(ReceiptHandleUtil.encodeReceiptHandle(RECEIPT_HANDLE, 0L)); ackHeader.setTopic(topicName); ackHeader.setQueueId(0); + ackHeader.setConsumerGroup("group"); messageService.ackMessage(context, null, "", ackHeader, 0L); messageService.ackMessage(context, null, "", ackHeader, 0L); @@ -232,7 +187,7 @@ void pop_withFifo() { @Test void changeInvisibleTime() { ChangeInvisibleTimeRequestHeader header = new ChangeInvisibleTimeRequestHeader(); - header.setExtraInfo(RECEIPT_HANDLE); + header.setExtraInfo(ReceiptHandleUtil.encodeReceiptHandle(RECEIPT_HANDLE, 0L)); header.setInvisibleTime(100L); AckResult ackResult = messageService.changeInvisibleTime(ProxyContext.create(), null, null, header, 0).join(); assertEquals(AckStatus.OK, ackResult.getStatus()); @@ -245,9 +200,10 @@ void changeInvisibleTime() { @Test void ackMessage() { AckMessageRequestHeader header = new AckMessageRequestHeader(); - header.setExtraInfo(RECEIPT_HANDLE); + header.setExtraInfo(ReceiptHandleUtil.encodeReceiptHandle(RECEIPT_HANDLE, 0L)); header.setTopic("topic"); header.setQueueId(0); + header.setConsumerGroup("group"); AckResult ackResult = messageService.ackMessage(ProxyContext.create(), null, null, header, 0).join(); assertEquals(AckStatus.OK, ackResult.getStatus()); diff --git a/store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java b/store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java index 322175ff0..e7569e538 100644 --- a/store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java +++ b/store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java @@ -153,4 +153,10 @@ public CompletableFuture getOffsetRange(long topicI return topicQueueManager.getOrCreate(topicId, queueId) .thenCompose(TopicQueue::getOffsetRange); } + + @Override + public CompletableFuture getConsumeOffset(long consumerGroupId, long topicId, int queueId) { + return topicQueueManager.getOrCreate(topicId, queueId) + .thenCompose(topicQueue -> topicQueue.getConsumeOffset(consumerGroupId)); + } } diff --git a/store/src/main/java/com/automq/rocketmq/store/api/MessageStore.java b/store/src/main/java/com/automq/rocketmq/store/api/MessageStore.java index ee1c65733..c97a0b3ae 100644 --- a/store/src/main/java/com/automq/rocketmq/store/api/MessageStore.java +++ b/store/src/main/java/com/automq/rocketmq/store/api/MessageStore.java @@ -88,4 +88,14 @@ CompletableFuture pop(long consumerGroupId, long topicId, int queueId * @return offset range, [startOffset, endOffset) */ CompletableFuture getOffsetRange(long topicId, int queueId); + + /** + * Get consume offset of specified consumer group. + * + * @param consumerGroupId consumer group id + * @param topicId topic id + * @param queueId queue id + * @return consume offset + */ + CompletableFuture getConsumeOffset(long consumerGroupId, long topicId, int queueId); }