diff --git a/java/e2e-v4/src/main/java/org/apache/rocketmq/utils/VerifyUtils.java b/java/e2e-v4/src/main/java/org/apache/rocketmq/utils/VerifyUtils.java index 26633c1..ea6f4bf 100644 --- a/java/e2e-v4/src/main/java/org/apache/rocketmq/utils/VerifyUtils.java +++ b/java/e2e-v4/src/main/java/org/apache/rocketmq/utils/VerifyUtils.java @@ -48,7 +48,7 @@ public class VerifyUtils { private static Logger logger = LoggerFactory.getLogger(VerifyUtils.class); private static AtomicInteger receivedIndex = new AtomicInteger(0); - private static final int TIMEOUT = 60; + private static final int TIMEOUT = 90; private static int defaultSimpleThreadNums = 4; /** @@ -220,7 +220,7 @@ public static void verifyDelayMessage(DataCollector enqueueMessages, DataCollector dequeueMessages, int delayLevel) { // Check whether the consumption is complete Collection unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, - (TIMEOUT + DelayConf.DELAY_LEVEL[delayLevel - 1]) * 1000L, 1); + (TIMEOUT + DelayConf.DELAY_LEVEL[delayLevel + 1]) * 1000L, 1); if (unConsumedMessages.size() > 0) { Assertions.fail(String.format("The following %s messages are not consumed: %s", unConsumedMessages.size(), unConsumedMessages)); @@ -426,8 +426,8 @@ private static HashMap checkDelay(DataCollector dequeu // 5 seconds, the requirement is met. long bornTimestamp = receivedMessage.getBornTimestamp(); - if (Math.abs(startDeliverTime - bornTimestamp) - / 1000 > DelayConf.DELAY_LEVEL[receivedMessage.getDelayTimeLevel() - 1] + offset) { + if (Math.abs(System.currentTimeMillis() - startDeliverTime - bornTimestamp) + / 1000 > DelayConf.DELAY_LEVEL[receivedMessage.getDelayTimeLevel() + 1] + offset) { map.put(receivedMessage.getMsgId(), (startDeliverTime - bornTimestamp) / 1000); } } @@ -985,7 +985,7 @@ public static void waitForLoadBalance(String topic, RMQNormalConsumer... allCons } public static void waitForAllocateAvg(String topic, RMQNormalConsumer... allConsumers) { - Awaitility.await().atMost(120, TimeUnit.SECONDS) + Awaitility.await().atMost(180, TimeUnit.SECONDS) .pollInterval(100, TimeUnit.MILLISECONDS) .until(() -> { int size = 0; diff --git a/java/e2e-v4/src/test/java/org/apache/rocketmq/pull/PullAckTest.java b/java/e2e-v4/src/test/java/org/apache/rocketmq/pull/PullAckTest.java index 1360a7a..78e68e5 100644 --- a/java/e2e-v4/src/test/java/org/apache/rocketmq/pull/PullAckTest.java +++ b/java/e2e-v4/src/test/java/org/apache/rocketmq/pull/PullAckTest.java @@ -93,7 +93,7 @@ public void testNormal_pull_receive_ack() { RMQNormalConsumer consumer = ConsumerFactory.getRMQPullConsumer(namesrvAddr, groupId, rpcHook); consumer.startDefaultPull(); - VerifyUtils.tryReceiveOnce(consumer.getPullConsumer(), topic, tag, 32); +// VerifyUtils.tryReceiveOnce(consumer.getPullConsumer(), topic, tag, 32); producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook); Assertions.assertNotNull(producer, "Get producer failed"); for (int i = 0; i < SEND_NUM; i++) { diff --git a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/abnormal/PushConsumerRetryTest.java b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/abnormal/PushConsumerRetryTest.java index 57a7918..1ff39b7 100644 --- a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/abnormal/PushConsumerRetryTest.java +++ b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/abnormal/PushConsumerRetryTest.java @@ -56,7 +56,6 @@ import org.apache.rocketmq.utils.NameUtils; import org.apache.rocketmq.utils.RandomUtils; import org.apache.rocketmq.utils.TestUtils; -import org.apache.rocketmq.utils.VerifyUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -332,7 +331,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, TestUtils.waitForSeconds(15); Assertions.assertEquals(SEND_NUM, msgsRecv.size(), - "retry message size is not equal to send message size"); + "retry message size is not equal to send message size"); producer.shutdown(); pushConsumer.shutdown(); @@ -422,7 +421,7 @@ public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { log.info(String.format("recv msgid(reconsume later) %s, reconsume time is %s ", msg.getMsgId(), - msg.getReconsumeTimes())); + msg.getReconsumeTimes())); msgsReConsumeTimes.put(msg.getMsgId(), msg.getReconsumeTimes()); } return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; @@ -444,7 +443,7 @@ public ConsumeOrderlyStatus consumeMessage(List msgs, TestUtils.waitForMinutes(2); Assertions.assertEquals(2, msgsReConsumeTimes.size(), - "retry message size is not equal to send message size"); + "retry message size is not equal to send message size"); for (Map.Entry entry : msgsReConsumeTimes.entrySet()) { Assertions.assertTrue(30 == entry.getValue(), "retry times is not equal to maxReconsumeTimes(30)"); } @@ -480,7 +479,7 @@ public ConsumeOrderlyStatus consumeMessage(List msgs, String body = String.valueOf(StandardCharsets.UTF_8.decode(ByteBuffer.wrap(msg.getBody()))); if ("0".equals(body)) { log.info(String.format("recv msgid(first message) %s, reconsume time is %s ", - msg.getMsgId(), msg.getReconsumeTimes())); + msg.getMsgId(), msg.getReconsumeTimes())); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } else { msgsReConsumeTimes.putIfAbsent(msg.getMsgId(), msg.getReconsumeTimes()); @@ -506,7 +505,7 @@ public ConsumeOrderlyStatus consumeMessage(List msgs, TestUtils.waitForSeconds(30); Assertions.assertEquals(SEND_NUM - 1, msgsReConsumeTimes.size(), - "retry message size is not equal to send message size"); + "retry message size is not equal to send message size"); producer.shutdown(); pushConsumer.shutdown(); @@ -764,7 +763,7 @@ public Boolean call() throws Exception { }); for (MessageExt messageExt : producer.getEnqueueMessages().getAllData()) { Assertions.assertTrue(firstMsgs.containsKey(messageExt.getMsgId().toString()) - || retryMsgs.containsKey(messageExt.getMsgId().toString())); + || retryMsgs.containsKey(messageExt.getMsgId().toString())); } producer.shutdown(); pushConsumer.shutdown(); @@ -825,9 +824,9 @@ public Boolean call() throws Exception { }); for (int i = 0; i < SEND_NUM; i++) { Assertions.assertEquals(i, - Integer.parseInt(String - .valueOf(StandardCharsets.UTF_8.decode(ByteBuffer.wrap(recvMessages.get(i).getBody())))), - "recv message failed"); + Integer.parseInt(String + .valueOf(StandardCharsets.UTF_8.decode(ByteBuffer.wrap(recvMessages.get(i).getBody())))), + "recv message failed"); } } } diff --git a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/delay/DelayMessageTest.java b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/delay/DelayMessageTest.java index d5a0ed6..a5bec9b 100644 --- a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/delay/DelayMessageTest.java +++ b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/delay/DelayMessageTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.server.delay; +import apache.rocketmq.controller.v1.MessageType; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendResult; @@ -57,13 +58,11 @@ public class DelayMessageTest extends BaseOperate { @BeforeEach public void setUp() { - topic = NameUtils.getTopicName(); +// topic = NameUtils.getTopicName(); tag = NameUtils.getTagName(); - groupId = NameUtils.getGroupName(); +// groupId = NameUtils.getGroupName(); // MQAdmin.createTopic(namesrvAddr, cluster, topic, 8); - getTopic(topic); - getGroupId(groupId); - logger.info("topic:{}, tag:{}, groupId:{}", topic, tag, groupId); +// logger.info("topic:{}, tag:{}, groupId:{}", topic, tag, groupId); } @AfterEach @@ -74,6 +73,9 @@ public void tearDown() { @Test @DisplayName("Send 10 delay messages and set the delay test delay level=1 , expecting all to be consumed and latency is as expected") public void testDelayLevel1() { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + String topic = getTopic(MessageType.DELAY, methodName); + String groupId = getGroupId(methodName); int delayLevel = 1; RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); consumer.subscribeAndStart(topic, "*", new RMQNormalListener()); @@ -90,6 +92,9 @@ public void testDelayLevel1() { @Test @DisplayName("Send 10 delay messages and set the delay test delay level=4 , expecting all to be consumed and latency is as expected") public void testDelayLevel4() { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + String topic = getTopic(MessageType.DELAY, methodName); + String groupId = getGroupId(methodName); int delayLevel = 4; RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); consumer.subscribeAndStart(topic, "*", new RMQNormalListener()); @@ -103,35 +108,39 @@ public void testDelayLevel4() { consumer.shutdown(); } - @Test - @DisplayName("Send one delay message and set the delay test negative delay level, expecting message building wrong") - public void testNegativeDelayLevel() { - int delayLevel = -1; - RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook); - - assertThrows(Exception.class, () -> { - Message msg = new Message(topic, "*", RandomUtils.getStringByUUID().getBytes()); - msg.setDelayTimeLevel(delayLevel); - SendResult sendResult = producer.getProducer().send(msg); - logger.info(sendResult.toString()); - }, "Send messages with a negative delay level, Expected send() to throw exception, but it didn't"); - - producer.shutdown(); - } - - @Test - @DisplayName("Send one delay message and set the delay test delay level=19, expecting message building wrong") - public void testDelayLevelWith19() { - int delayLevel = 19; - RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook); - - assertThrows(Exception.class, () -> { - Message msg = new Message(topic, "*", RandomUtils.getStringByUUID().getBytes()); - msg.setDelayTimeLevel(delayLevel); - SendResult sendResult = producer.getProducer().send(msg); - logger.info(sendResult.toString()); - }, "Send messages with delay level=19, Expected send() to throw exception, but it didn't"); - - producer.shutdown(); - } +// @Test +// @DisplayName("Send one delay message and set the delay test negative delay level, expecting message building wrong") +// public void testNegativeDelayLevel() { +// String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); +// String topic = getTopic(MessageType.DELAY, methodName); +// int delayLevel = -1; +// RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook); +// +// assertThrows(Exception.class, () -> { +// Message msg = new Message(topic, "*", RandomUtils.getStringByUUID().getBytes()); +// msg.setDelayTimeLevel(delayLevel); +// SendResult sendResult = producer.getProducer().send(msg); +// logger.info(sendResult.toString()); +// }, "Send messages with a negative delay level, Expected send() to throw exception, but it didn't"); +// +// producer.shutdown(); +// } +// +// @Test +// @DisplayName("Send one delay message and set the delay test delay level=19, expecting message building wrong") +// public void testDelayLevelWith19() { +// String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); +// String topic = getTopic(MessageType.DELAY, methodName); +// int delayLevel = 19; +// RMQNormalProducer producer = ProducerFactory.getRMQProducer(namesrvAddr, rpcHook); +// +// assertThrows(Exception.class, () -> { +// Message msg = new Message(topic, "*", RandomUtils.getStringByUUID().getBytes()); +// msg.setDelayTimeLevel(delayLevel); +// SendResult sendResult = producer.getProducer().send(msg); +// logger.info(sendResult.toString()); +// }, "Send messages with delay level=19, Expected send() to throw exception, but it didn't"); +// +// producer.shutdown(); +// } } diff --git a/java/e2e/pom.xml b/java/e2e/pom.xml index f3c9705..f17e3f2 100644 --- a/java/e2e/pom.xml +++ b/java/e2e/pom.xml @@ -195,6 +195,7 @@ 1 true -Xms2048m -Xmx2048m -XX:MetaspaceSize=512m -XX:MaxMetaspaceSize=512m + 1 false diff --git a/java/e2e/src/test/java/org/apache/rocketmq/broker/cluster/ClusterTest.java b/java/e2e/src/test/java/org/apache/rocketmq/broker/cluster/ClusterTest.java index 89ebe0a..15db9d9 100644 --- a/java/e2e/src/test/java/org/apache/rocketmq/broker/cluster/ClusterTest.java +++ b/java/e2e/src/test/java/org/apache/rocketmq/broker/cluster/ClusterTest.java @@ -124,8 +124,6 @@ public void testClusterConsume() { pushConsumer02 = ConsumerFactory.getPushConsumer(account, topic, groupId, new FilterExpression(tag), listenerB); pushConsumer03 = ConsumerFactory.getPushConsumer(account, topic, groupId, new FilterExpression(tag), listenerC); - simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10)); - VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer()); RMQNormalProducer producer = ProducerFactory.getRMQProducer(account, topic); Assertions.assertNotNull(producer, "Get producer failed");