Skip to content

Commit

Permalink
fix: enable remoting e2e test about push, delay, normal and order
Browse files Browse the repository at this point in the history
Signed-off-by: wangxye <[email protected]>
  • Loading branch information
wangxye committed Nov 13, 2023
1 parent fb9f6d6 commit 93462bd
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public void onSuccess(SendResult sendResult) {
public void onException(Throwable e) {
//logger.warn("{} callback message failed: {} exception: {}", context.getTopic(), context.getMessageId(), context.getException());
bFailResponse = true;
logger.warn("callback message failed: {}", e);
}

public void waitResponse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class RMQNormalProducer extends AbstractMQProducer {
private static Logger logger = LoggerFactory.getLogger(RMQNormalProducer.class);
private DefaultMQProducer producer;

static final String PROPERTY_SHARDING_KEY = "__SHARDINGKEY";
public RMQNormalProducer(DefaultMQProducer producer) {
this.producer = producer;
}
Expand Down Expand Up @@ -115,9 +116,11 @@ public void sendWithQueue(List<MessageQueue> mqs, int messageNum) {
*/
public void sendWithQueue(List<MessageQueue> mqs, String tag, int messageNum) {
logger.info("Producer start to send messages");
String orderId = "biz_" + 0;
for (MessageQueue mq : mqs) {
for (int i = 0; i < messageNum; i++) {
Message message = MessageFactory.buildOneMessageWithTagAndBody(mq.getTopic(), tag, String.valueOf(i));
message.putUserProperty(PROPERTY_SHARDING_KEY, orderId);
try {
SendResult sendResult = producer.send(message, mq);
MessageExt messageExt = new MessageExt();
Expand All @@ -141,9 +144,11 @@ public void sendWithQueue(List<MessageQueue> mqs, String tag, int messageNum) {
*/
public void sendWithQueue(List<MessageQueue> mqs, int messageNum, String tag) {
logger.info("Producer start to send messages");
String orderId = "biz_" + 0;
for (MessageQueue mq : mqs) {
for (int i = 0; i < messageNum; i++) {
Message message = MessageFactory.buildOneMessageWithTagAndBody(mq.getTopic(), tag, String.valueOf(i));
message.putUserProperty(PROPERTY_SHARDING_KEY, orderId);
try {
SendResult sendResult = producer.send(message, mq);
MessageExt messageExt = new MessageExt();
Expand All @@ -152,6 +157,7 @@ public void sendWithQueue(List<MessageQueue> mqs, int messageNum, String tag) {
logger.info("{}, index: {}, tag: {}", sendResult, i, tag);
this.enqueueMessages.addData(messageExt);
} catch (Exception e) {
System.out.printf("send message failed %s", e);
logger.error("DefaultMQProducer send message failed");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,19 @@ protected static String getGroupId(String methodName, SubscriptionMode mode) {
return groupId;
}

protected static String getOrderlyGroupId(String methodName, SubscriptionMode mode) {
String groupId = String.format("GID_%s_%s", methodName, RandomUtils.getStringWithCharacter(6));
CreateGroupRequest request = CreateGroupRequest.newBuilder()
.setName(groupId)
.setMaxDeliveryAttempt(16)
.setGroupType(GroupType.GROUP_TYPE_FIFO)
.setSubMode(mode)
.build();
CreateGroupReply reply = createConsumerGroup(request).join();
logger.info("[ConsumerGroupId] groupId:{} methodName:{} reply:{}", groupId, methodName, reply);
return groupId;
}

private static CompletableFuture<CreateGroupReply> createConsumerGroup(CreateGroupRequest request) {
try {
CompletableFuture<CreateGroupReply> groupCf = client.createGroup(namesrvAddr, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@

package org.apache.rocketmq.cluster;

import apache.rocketmq.controller.v1.SubscriptionMode;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.enums.TESTSET;
import org.apache.rocketmq.factory.ConsumerFactory;
import org.apache.rocketmq.factory.ProducerFactory;
Expand All @@ -32,14 +28,10 @@
import org.apache.rocketmq.utils.NameUtils;
import org.apache.rocketmq.utils.RandomUtils;
import org.apache.rocketmq.utils.VerifyUtils;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Field;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Tag(TESTSET.MODEL)
public class ClusterTest extends BaseOperate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.rocketmq.pull;

import apache.rocketmq.controller.v1.MessageType;
import apache.rocketmq.controller.v1.SubscriptionMode;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
Expand Down Expand Up @@ -68,8 +70,8 @@ public void tearDown() {
@DisplayName("When sending 20 sequential messages synchronously using the same MessageQueue, PullConsumer normally receives messages, but does not ack messages, and keeps the sequence; the messages are stuck at the first")
public void testFIFO_pull_receive_nack() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(methodName);
String groupId = getGroupId(methodName);
String topic = getTopic(MessageType.FIFO, methodName);
String groupId = getOrderlyGroupId(methodName, SubscriptionMode.SUB_MODE_PULL);

RMQNormalConsumer consumer = ConsumerFactory.getRMQPullConsumer(namesrvAddr, groupId, rpcHook);
consumer.startDefaultPull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.rocketmq.pull;

import apache.rocketmq.controller.v1.MessageType;
import apache.rocketmq.controller.v1.SubscriptionMode;
import org.apache.rocketmq.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.common.message.MessageQueue;
Expand Down Expand Up @@ -54,9 +56,8 @@ public void setUp() {
@DisplayName("Send 20 sequential messages synchronously, and expect PullConsumer to receive and ack messages properly and maintain the sequence")
public void testFIFO_simple_receive_ack() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();

String topic = getTopic(methodName);
String groupId = getGroupId(methodName);
String topic = getTopic(MessageType.FIFO, methodName);
String groupId = getOrderlyGroupId(methodName, SubscriptionMode.SUB_MODE_PULL);

RMQNormalConsumer consumer = ConsumerFactory.getRMQPullConsumer(namesrvAddr, groupId, rpcHook);
consumer.startDefaultPull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import apache.rocketmq.controller.v1.MessageType;
import apache.rocketmq.controller.v1.SubscriptionMode;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
Expand All @@ -54,6 +56,7 @@
import org.apache.rocketmq.utils.NameUtils;
import org.apache.rocketmq.utils.RandomUtils;
import org.apache.rocketmq.utils.TestUtils;
import org.apache.rocketmq.utils.VerifyUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
Expand All @@ -67,7 +70,6 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;

@Disabled
@Tag(TESTSET.RETRY)
public class PushConsumerRetryTest extends BaseOperate {
private final Logger log = LoggerFactory.getLogger(PushConsumerRetryTest.class);
Expand All @@ -79,6 +81,7 @@ public void setUp() {
tag = NameUtils.getRandomTagName();
}

@Disabled
@Test
@Timeout(value = 180, unit = TimeUnit.SECONDS)
@DisplayName("Send normal messages, set the maximum number of retries and set the received messages to RECONSUME_LATER. The expected retry time is about 10s for the first time and about 30s for the second time")
Expand All @@ -104,7 +107,7 @@ public void testNormalMessageReconsumeTime() {
pushConsumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
if (msg.getReconsumeTimes() == 0) {
msgsReConsumeTime.putIfAbsent(msg.getMsgId(), Instant.now());
Expand Down Expand Up @@ -141,6 +144,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
log.info(String.format("recv msgid(success) %s ", msgs.get(0).getMsgId()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

Expand All @@ -151,13 +155,16 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
}

Assertions.assertNotNull(producer, "Get Producer Failed");
for (int i = 0; i < SEND_NUM; i++) {
Message message = MessageFactory.buildNormalMessage(topic, tag, String.valueOf(i));
producer.send(message);
}
// for (int i = 0; i < SEND_NUM; i++) {
// Message message = MessageFactory.buildNormalMessage(topic, tag, String.valueOf(i));
// producer.send(message);
// System.out.printf("send message %s", message);
// }
producer.send(topic, tag, SEND_NUM);

Assertions.assertEquals(SEND_NUM, producer.getEnqueueMessages().getDataSize(), "send message failed");

await().atMost(120, SECONDS).until(new Callable<Boolean>() {
await().atMost(240, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
boolean flag = true;
Expand All @@ -180,8 +187,8 @@ public Boolean call() throws Exception {
@DisplayName("Send order messages, set the maximum number of retries and set the received messages to SUSPEND_CURRENT_QUEUE_A_MOMENT. The expected retry time is about 1s")
public void testOrderMessageReconsumeTime() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(methodName);
String groupId = getGroupId(methodName);
String topic = getTopic(MessageType.FIFO, methodName);
String groupId = getOrderlyGroupId(methodName, SubscriptionMode.SUB_MODE_PULL);
RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook);

Map<String, Instant> msgsReConsumeTime = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -247,6 +254,7 @@ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
}

Assertions.assertNotNull(producer, "Get Producer Failed");

List<MessageQueue> mqs = producer.fetchPublishMessageQueues(topic);
List<MessageQueue> sendMqs = new ArrayList<>();
sendMqs.add(mqs.get(0));
Expand All @@ -271,6 +279,7 @@ public Boolean call() throws Exception {
pushConsumer.shutdown();
}

@Disabled
@Test
@Timeout(value = 180, unit = TimeUnit.SECONDS)
@DisplayName("Send normal messages, set the maximum consumption to 0(The first retry is 10 seconds, and the setting is 15 seconds. Then check whether the retry occurs), and set the message reception to RECONUME_LATER, expecting that the retry will not occur")
Expand Down Expand Up @@ -313,10 +322,11 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
}

Assertions.assertNotNull(producer, "Get Producer Failed");
for (int i = 0; i < SEND_NUM; i++) {
Message message = MessageFactory.buildNormalMessage(topic, tag, String.valueOf(i));
producer.send(message);
}
// for (int i = 0; i < SEND_NUM; i++) {
// Message message = MessageFactory.buildNormalMessage(topic, tag, String.valueOf(i));
// producer.send(message);
// }
producer.send(topic, tag, SEND_NUM);
Assertions.assertEquals(SEND_NUM, producer.getEnqueueMessages().getDataSize(), "send message failed");

TestUtils.waitForSeconds(15);
Expand All @@ -333,8 +343,8 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
@DisplayName("Send one order message, set the maximum consumption to 0(The retry time of each message is 1s. Then check whether the retry occurs), and set the message reception to SUSPEND_CURRENT_QUEUE_A_MOMENT, expecting that the retry will not occur")
public void testOrderMessageRetryTimesSetting() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(methodName);
String groupId = getGroupId(methodName);
String topic = getTopic(MessageType.FIFO, methodName);
String groupId = getOrderlyGroupId(methodName, SubscriptionMode.SUB_MODE_PULL);
RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook);

Map<String, Integer> msgsReConsumeTimes = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -392,8 +402,8 @@ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
@DisplayName("Send order messages, set the maximum consumption to 30, and set the message reception to SUSPEND_CURRENT_QUEUE_A_MOMENT, expecting that the received messages's reconsume time will be equal to 30 in 2 minutes")
public void testOrderMessageRetryTimesWith30() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(methodName);
String groupId = getGroupId(methodName);
String topic = getTopic(MessageType.FIFO, methodName);
String groupId = getOrderlyGroupId(methodName, SubscriptionMode.SUB_MODE_PULL);
RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook);

Map<String, Integer> msgsReConsumeTimes = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -448,8 +458,8 @@ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
@DisplayName("To send order messages, set SUSPEND_CURRENT_QUEUE_A_MOMENT for the first message. The next sequential message will not be consumed until the first message is in the dead letter queue. All messages except the first message are expected to be received")
public void testOrderMessageRetryTimesWithMaxReconsumeimes() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(methodName);
String groupId = getGroupId(methodName);
String topic = getTopic(MessageType.FIFO, methodName);
String groupId = getOrderlyGroupId(methodName, SubscriptionMode.SUB_MODE_PULL);
RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook);

Map<String, Integer> msgsReConsumeTimes = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -502,6 +512,7 @@ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
pushConsumer.shutdown();
}

@Disabled
@Test
@Timeout(value = 180, unit = TimeUnit.SECONDS)
@DisplayName("Simulate pushconsumer consumption fail, expect that the original message was not received, and capture all messages after message retry")
Expand Down Expand Up @@ -566,6 +577,7 @@ public Boolean call() throws Exception {
pushConsumer.shutdown();
}

@Disabled
@Test
@Timeout(value = 180, unit = TimeUnit.SECONDS)
@DisplayName("Simulate pushconsumer consumption return null, expect that the original message was not received, and capture all messages after message retry")
Expand All @@ -591,15 +603,15 @@ public void testNullConsumption() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
if (msg.getReconsumeTimes() == 2) {
if (msg.getReconsumeTimes() == 1) {
retryMsgs.putIfAbsent(msg.getMsgId(), msg);
log.info("consume success: {}", msg);
} else {
// Simulate consuming operations
log.info("{}", "Simulate consuming operations return null");
firstMsgs.putIfAbsent(msg.getMsgId(), msg);
log.info(String.format("recv msg(null) %s ", msg));
return null;
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

}
Expand All @@ -608,18 +620,21 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
});
pushConsumer.start();
} catch (MQClientException e) {
System.out.printf("exception: %s%n", e);
Assertions.fail(e.getMessage());
}
Assertions.assertNotNull(producer, "Get Producer Failed");
for (int i = 0; i < SEND_NUM; i++) {
Message message = MessageFactory.buildNormalMessage(topic, tag, String.valueOf(i));
producer.send(message);
System.out.printf("send message: %s%n", message);
}
Assertions.assertEquals(SEND_NUM, producer.getEnqueueMessages().getDataSize(), "send message failed");

await().atMost(120, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
System.out.printf("retryMsgs size: %s; firstMsgs size: %s", retryMsgs.size(), firstMsgs.size());
return retryMsgs.size() == SEND_NUM && firstMsgs.size() == SEND_NUM;
}
});
Expand Down Expand Up @@ -693,6 +708,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
pushConsumer.shutdown();
}

@Disabled
@Test
@Timeout(value = 300, unit = TimeUnit.SECONDS)
@DisplayName("The normal message is sent, and after the PushConsumer retry, the retry message is expected to be consumed")
Expand Down Expand Up @@ -759,9 +775,8 @@ public Boolean call() throws Exception {
@DisplayName("The send order message, after the PushConsumer retry, is expected to consume the retry message, and the message consumption order and send order")
public void testFiFoTopicPushConsumerRetry() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(methodName);
String groupId = getGroupId(methodName);

String topic = getTopic(MessageType.FIFO, methodName);
String groupId = getOrderlyGroupId(methodName, SubscriptionMode.SUB_MODE_PULL);
RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook);
Assertions.assertNotNull(producer, "Get producer failed");
Vector<MessageExt> recvMessages = new Vector<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public void testBatchProducer() {
try {
producer.getProducer().send(messages);
} catch (Exception e) {
System.out.printf("send message error: %s", e.getMessage());
Assertions.fail(e.getMessage());
}

Expand Down Expand Up @@ -114,6 +115,7 @@ public void testBatchProducer_queue() {
try {
producer.getProducer().send(messages, msgQueues.get(0));
} catch (Exception e) {
System.out.printf("send message error: %s", e.getMessage());
Assertions.fail(e.getMessage());
}

Expand Down
Loading

0 comments on commit 93462bd

Please sign in to comment.