Skip to content

Commit

Permalink
test(proxy): add unit test for MessageServiceImpl (#533)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Nov 1, 2023
1 parent a9c47fb commit 871a543
Show file tree
Hide file tree
Showing 6 changed files with 308 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.automq.rocketmq.proxy.service;

import com.automq.rocketmq.common.config.ProxyConfig;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -37,16 +38,21 @@ private Lock getLock(long topicId, int queueId) {
.computeIfAbsent(queueId, v -> new Lock(topicId, queueId));
}

public boolean tryLock(long topicId, int queueId, String clientId, boolean fifo) {
return tryLock(topicId, queueId, clientId, fifo, config.lockExpireTime());
public boolean tryLock(long topicId, int queueId, String clientId, boolean preempt, boolean reentrant) {
return tryLock(topicId, queueId, clientId, preempt, reentrant, config.lockExpireTime());
}

public boolean tryLock(long topicId, int queueId, String clientId, boolean fifo, long expireTime) {
public boolean tryLock(long topicId, int queueId, String clientId, boolean preempt, boolean reentrant,
long expireTime) {
if (preempt && reentrant) {
throw new IllegalArgumentException("Argument preempt and reentrant cannot be true at the same time.");
}

Lock lock = getLock(topicId, queueId);
if (fifo) {
if (preempt) {
return lock.tryPreempt(clientId, expireTime);
}
return lock.tryLock(clientId);
return lock.tryLock(clientId, reentrant);
}

public void release(long topicId, int queueId) {
Expand All @@ -62,6 +68,7 @@ public void tryExpire(long topicId, int queueId, long later) {
private static class Lock {
private final long topicId;
private final int queueId;
private boolean reentrant;

private final AtomicBoolean processing = new AtomicBoolean(false);
private final AtomicBoolean preempting = new AtomicBoolean(false);
Expand All @@ -73,11 +80,17 @@ public Lock(long topicId, int queueId) {
this.queueId = queueId;
}

public boolean tryLock(String clientId) {
public boolean tryLock(String clientId, boolean reentrant) {
boolean result = processing.compareAndSet(false, true);
if (result) {
this.lockTime = System.currentTimeMillis();
this.ownerId = clientId;
this.reentrant = reentrant;
return true;
}

if (this.reentrant && Objects.equals(this.ownerId, clientId)) {
this.lockTime = System.currentTimeMillis();
return true;
}
return false;
Expand All @@ -87,14 +100,14 @@ public void release() {
processing.set(false);
}

// Used in FIFO mode, only the same client can acquire the lock or preempt the expired lock.
// Used in preempt mode, only the same client can acquire the lock or preempt the expired lock.
public boolean tryPreempt(String clientId, long expireTime) {
boolean result = preempting.compareAndSet(false, true);
if (result) {
try {
if (StringUtils.isBlank(this.ownerId) || this.ownerId.equals(clientId)) {
// Only same clientId could acquire the lock.
return tryLock(clientId);
return tryLock(clientId, false);
} else if (lockTime + expireTime <= System.currentTimeMillis()) {
// Try to preempt expired lock.
this.lockTime = System.currentTimeMillis();
Expand All @@ -110,7 +123,7 @@ public boolean tryPreempt(String clientId, long expireTime) {
return false;
}

// Used in FIFO mode, expire the lock after the specified time.
// Used in preempt mode, expire the lock after the specified time.
public void tryExpire(long later, long expireTime) {
if (preempting.compareAndSet(false, true)) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public CompletableFuture<RemotingCommand> sendMessageBack(ProxyContext ctx, Rece
if (pullResult.status() == com.automq.rocketmq.store.model.message.PullResult.Status.FOUND) {
return pullResult.messageList().get(0);
}
throw new IllegalArgumentException("Message not found");
throw new IllegalArgumentException("Message not found.");
}).thenCompose(messageExt -> {
if (messageExt.deliveryAttempts() > group.getMaxDeliveryAttempt()) {
return deadLetterService.send(group.getGroupId(), messageExt);
Expand Down Expand Up @@ -344,7 +344,7 @@ private CompletableFuture<InnerPopResult> popSpecifiedQueue(ConsumerGroup consum
Topic topic, int queueId, Filter filter, int batchSize, boolean fifo, long invisibleDuration,
long timeoutMillis) {
long topicId = topic.getTopicId();
if (lockService.tryLock(topicId, queueId, clientId, fifo)) {
if (lockService.tryLock(topicId, queueId, clientId, fifo, false)) {
return popSpecifiedQueueUnsafe(consumerGroup, topic, queueId, filter, batchSize, fifo, invisibleDuration)
.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS)
.whenComplete((v, throwable) -> {
Expand Down Expand Up @@ -388,7 +388,7 @@ public CompletableFuture<PopResult> popMessage(ProxyContext ctx, AddressableMess
}

if (group.getSubMode() != SubscriptionMode.SUB_MODE_POP) {
throw new CompletionException(new MQBrokerException(ResponseCode.NO_PERMISSION, String.format("Group [%s] do not support pop mode.", group.getName())));
throw new CompletionException(new MQBrokerException(ResponseCode.NO_PERMISSION, String.format("The consumer group [pullGroup] is not allowed to consume message with pop mode.", group.getName())));
}

topicReference.set(topic);
Expand Down Expand Up @@ -583,7 +583,7 @@ public CompletableFuture<PullResult> pullMessage(ProxyContext ctx, AddressableMe
}

if (group.getSubMode() != SubscriptionMode.SUB_MODE_PULL) {
throw new CompletionException(new MQBrokerException(ResponseCode.NO_PERMISSION, String.format("Group [%s] do not support pull mode.", group.getName())));
throw new CompletionException(new MQBrokerException(ResponseCode.NO_PERMISSION, String.format("The consumer group [%s] is not allowed to consume message with pull mode.", group.getName())));
}

topicReference.set(topic);
Expand Down Expand Up @@ -632,7 +632,7 @@ public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx, Addres
for (MessageQueue queue : requestBody.getMqSet()) {
VirtualQueue virtualQueue = new VirtualQueue(queue.getBrokerName());
// Expire time is 60 seconds which is a magic number from apache rocketmq.
boolean result = lockService.tryLock(virtualQueue.topicId(), virtualQueue.physicalQueueId(), requestBody.getClientId(), true, 60_000);
boolean result = lockService.tryLock(virtualQueue.topicId(), virtualQueue.physicalQueueId(), requestBody.getClientId(), false, true, 60_000);
if (result) {
successSet.add(queue);
}
Expand All @@ -646,7 +646,7 @@ public CompletableFuture<Void> unlockBatchMQ(ProxyContext ctx, AddressableMessag
UnlockBatchRequestBody requestBody, long timeoutMillis) {
for (MessageQueue queue : requestBody.getMqSet()) {
VirtualQueue virtualQueue = new VirtualQueue(queue.getBrokerName());
lockService.tryExpire(virtualQueue.topicId(), virtualQueue.physicalQueueId(), 0);
lockService.release(virtualQueue.topicId(), virtualQueue.physicalQueueId());
}

return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,23 @@ public CompletableFuture<PopResult> pop(long consumerGroupId, long topicId, int
@Override
public CompletableFuture<PullResult> pull(long consumerGroupId, long topicId, int queueId, Filter filter,
long offset, int batchSize, boolean retry) {
return null;
if (retry) {
return CompletableFuture.completedFuture(new PullResult(PullResult.Status.NO_NEW_MSG, 0L, 0L, 0L, new ArrayList<>()));
}

List<FlatMessageExt> messageList = messageMap.computeIfAbsent(topicId + queueId, v -> new ArrayList<>());
int start = offset >= messageList.size() ? -1 : (int) offset;
int end = offset + batchSize >= messageList.size() ? messageList.size() : (int) offset + batchSize;

PullResult.Status status;
if (start < 0) {
status = PullResult.Status.NO_NEW_MSG;
messageList = new ArrayList<>();
} else {
status = PullResult.Status.FOUND;
messageList = messageList.subList(start, end);
}
return CompletableFuture.completedFuture(new PullResult(status, offset + messageList.size(), 0L, messageList.size() - end, messageList));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public CompletableFuture<ConsumerGroup> consumerGroupOf(String groupName) {
.setName(groupName)
.setGroupId(groupId)
.setMaxDeliveryAttempt(16)
.setSubMode(SubscriptionMode.SUB_MODE_POP)
.setSubMode(groupName.contains("pull") ? SubscriptionMode.SUB_MODE_PULL : SubscriptionMode.SUB_MODE_POP)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.assertTrue;

class LockServiceTest {
Expand All @@ -29,47 +30,54 @@ class LockServiceTest {
void lock() {
LockService lockService = new LockService(new ProxyConfig());

boolean result = lockService.tryLock(0, 0, "client1", false);
boolean result = lockService.tryLock(0, 0, "client1", false, false);
assertTrue(result);

// Check if the lock is reentrant.
result = lockService.tryLock(0, 0, "client1", false);
result = lockService.tryLock(0, 0, "client1", false, false);
assertFalse(result);

// Check if client can lock another queue.
result = lockService.tryLock(1, 1, "client1", false);
result = lockService.tryLock(1, 1, "client1", false, false);
assertTrue(result);

// Try to release and lock again.
lockService.release(0, 0);
result = lockService.tryLock(0, 0, "client1", false);
result = lockService.tryLock(0, 0, "client1", false, true);
assertTrue(result);

// Check if the lock is reentrant.
result = lockService.tryLock(0, 0, "client1", false, true);
assertTrue(result);
}

@Test
void lock_WithFifo() {
void lock_preempt() {
LockService lockService = new LockService(new ProxyConfig());

boolean result = lockService.tryLock(0, 0, "client1", true);
// Set both preempt and reentrant to true.
assertThrowsExactly(IllegalArgumentException.class, () -> lockService.tryLock(0, 0, "client1", true, true));

boolean result = lockService.tryLock(0, 0, "client1", true, false);
assertTrue(result);

// Other clients can not acquire the lock before the lock is expired.
lockService.release(0, 0);
result = lockService.tryLock(0, 0, "client2", true);
result = lockService.tryLock(0, 0, "client2", true, false);
assertFalse(result);

result = lockService.tryLock(0, 0, "client1", true);
result = lockService.tryLock(0, 0, "client1", true, false);
assertTrue(result);

// Can not expire the lock when there is a ongoing pop request.
lockService.tryExpire(0, 0, 0);
result = lockService.tryLock(0, 0, "client2", true);
result = lockService.tryLock(0, 0, "client2", true, false);
assertFalse(result);

// Expire the lock immediately, so other clients can acquire the lock.
lockService.release(0, 0);
lockService.tryExpire(0, 0, 0);
result = lockService.tryLock(0, 0, "client2", true);
result = lockService.tryLock(0, 0, "client2", true, false);
assertTrue(result);
}
}
Loading

0 comments on commit 871a543

Please sign in to comment.