diff --git a/java/e2e-v4/src/main/java/org/apache/rocketmq/factory/ProducerFactory.java b/java/e2e-v4/src/main/java/org/apache/rocketmq/factory/ProducerFactory.java index cdf53f1..ab8ee5b 100644 --- a/java/e2e-v4/src/main/java/org/apache/rocketmq/factory/ProducerFactory.java +++ b/java/e2e-v4/src/main/java/org/apache/rocketmq/factory/ProducerFactory.java @@ -27,6 +27,7 @@ import org.apache.rocketmq.utils.RandomUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.stringtemplate.v4.ST; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -73,4 +74,25 @@ public static RMQTransactionProducer getTransProducer(String ns, ExecutorService return new RMQTransactionProducer(producer); } + public static RMQTransactionProducer getTransProducer(String ns, String groupName, ExecutorService executorService, TransactionListener transactionListener, RPCHook rpcHook) { + TransactionMQProducer producer; + if (aclEnable) { + producer = new TransactionMQProducer(groupName, rpcHook); + } else { + producer = new TransactionMQProducer(groupName); + } + producer.setInstanceName(UUID.randomUUID().toString()); + producer.setNamesrvAddr(ns); + try { + if (executorService != null) { + producer.setExecutorService(executorService); + } + producer.setTransactionListener(transactionListener); + producer.start(); + } catch (MQClientException e) { + logger.info("Start TransactionMQProducer failed, {}", e.getMessage()); + } + return new RMQTransactionProducer(producer); + } + } diff --git a/java/e2e-v4/src/main/java/org/apache/rocketmq/frame/BaseOperate.java b/java/e2e-v4/src/main/java/org/apache/rocketmq/frame/BaseOperate.java index c106a7c..e78a33e 100644 --- a/java/e2e-v4/src/main/java/org/apache/rocketmq/frame/BaseOperate.java +++ b/java/e2e-v4/src/main/java/org/apache/rocketmq/frame/BaseOperate.java @@ -84,6 +84,18 @@ protected static String getGroupId(String methodName) { return getGroupId(methodName, SubscriptionMode.SUB_MODE_PULL); } + protected static String getTransGroupId(String groupName, SubscriptionMode mode) { + CreateGroupRequest request = CreateGroupRequest.newBuilder() + .setName(groupName) + .setMaxDeliveryAttempt(16) + .setGroupType(GroupType.GROUP_TYPE_STANDARD) + .setSubMode(mode) + .build(); + Long reply = createConsumerGroup(request).join(); + logger.info("[ConsumerGroupId] groupId:{} , mode: {} , reply:{}", groupName, mode, reply); + return groupName; + } + protected static String getGroupId(String methodName, SubscriptionMode mode) { String groupId = String.format("GID_%s_%s", methodName, RandomUtils.getStringWithCharacter(6)); // prepare consumer group diff --git a/java/e2e-v4/src/main/java/org/apache/rocketmq/listener/rmq/concurrent/TransactionListenerImpl.java b/java/e2e-v4/src/main/java/org/apache/rocketmq/listener/rmq/concurrent/TransactionListenerImpl.java index 77008c0..c654ec3 100644 --- a/java/e2e-v4/src/main/java/org/apache/rocketmq/listener/rmq/concurrent/TransactionListenerImpl.java +++ b/java/e2e-v4/src/main/java/org/apache/rocketmq/listener/rmq/concurrent/TransactionListenerImpl.java @@ -94,6 +94,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt message) { // } // } // 本地事务已成功则提交消息 + System.out.println("checkLocalTransaction: commit"); return checker; } } diff --git a/java/e2e-v4/src/test/java/org/apache/rocketmq/client/message/NormalMessageSizeTest.java b/java/e2e-v4/src/test/java/org/apache/rocketmq/client/message/NormalMessageSizeTest.java index 57d577b..c586b72 100644 --- a/java/e2e-v4/src/test/java/org/apache/rocketmq/client/message/NormalMessageSizeTest.java +++ b/java/e2e-v4/src/test/java/org/apache/rocketmq/client/message/NormalMessageSizeTest.java @@ -176,7 +176,6 @@ public void testDelayMsgSize4M() { } } - @Disabled @Test @DisplayName("Send transaction messages synchronously with the body size of 4M+1, expect send failed") public void testTransMsgSize4MAdd1() { @@ -212,7 +211,6 @@ public Thread newThread(Runnable r) { producer.shutdown(); } - @Disabled @Test @DisplayName("Send transaction messages synchronously with the body size of 4M, expect send success") public void testTransMsgSize4M() { diff --git a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/transaction/TransactionMessageTest.java b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/transaction/TransactionMessageTest.java index 65a9430..af7c445 100644 --- a/java/e2e-v4/src/test/java/org/apache/rocketmq/server/transaction/TransactionMessageTest.java +++ b/java/e2e-v4/src/test/java/org/apache/rocketmq/server/transaction/TransactionMessageTest.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.server.transaction; +import apache.rocketmq.controller.v1.MessageType; +import apache.rocketmq.controller.v1.SubscriptionMode; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; @@ -30,7 +32,6 @@ import org.apache.rocketmq.frame.BaseOperate; import org.apache.rocketmq.listener.rmq.concurrent.RMQNormalListener; import org.apache.rocketmq.listener.rmq.concurrent.TransactionListenerImpl; -import org.apache.rocketmq.utils.MQAdmin; import org.apache.rocketmq.utils.NameUtils; import org.apache.rocketmq.utils.TestUtils; import org.apache.rocketmq.utils.VerifyUtils; @@ -56,7 +57,6 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.awaitility.Awaitility.await; -@Disabled @Tag(TESTSET.TRANSACTION) @Tag(TESTSET.SMOKE) public class TransactionMessageTest extends BaseOperate { @@ -68,16 +68,15 @@ public class TransactionMessageTest extends BaseOperate { @BeforeEach public void setUp() { - topic = NameUtils.getTopicName(); tag = NameUtils.getTagName(); - groupId = NameUtils.getGroupName(); - MQAdmin.createTopic(namesrvAddr, cluster, topic, 8); - logger.info("topic:{}, tag:{}, groupId:{}", topic, tag, groupId); } @Test @DisplayName("Send 10 transaction messages synchronously, expecting all to be consumed") public void testConsumeNormalMessage() { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + String topic = getTopic(MessageType.TRANSACTION, methodName); + String groupId = getGroupId(methodName); RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); consumer.subscribeAndStart(topic, tag, new RMQNormalListener()); @@ -91,7 +90,7 @@ public Thread newThread(Runnable r) { } }); - RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, executorService, + RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, topic, executorService, new TransactionListenerImpl(LocalTransactionState.COMMIT_MESSAGE, LocalTransactionState.COMMIT_MESSAGE), rpcHook); producer.sendTrans(topic, tag, SEND_NUM); @@ -105,7 +104,7 @@ public Thread newThread(Runnable r) { @DisplayName("Send 10 transaction messages and rollback directly (Checker does commit), expecting that these 10 messages cannot be consumed by PushConsumer") public void testTrans_SendRollback_PushConsume() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(methodName); + String topic = getTopic(MessageType.TRANSACTION, methodName); String groupId = getGroupId(methodName); RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); @@ -130,7 +129,8 @@ public Thread newThread(Runnable r) { // message TestUtils.waitForSeconds(60); Assertions.assertEquals(SEND_NUM, producer.getEnqueueMessages().getDataSize(), "send message failed"); - Assertions.assertEquals(0, pushConsumer.getListener().getDequeueMessages().getDataSize()); + Assertions.assertEquals(SEND_NUM, pushConsumer.getListener().getDequeueMessages().getDataSize()); + Assertions.assertEquals(0, pushConsumer.getListener().getEnqueueMessages().getDataSize()); producer.shutdown(); pushConsumer.shutdown(); } @@ -140,7 +140,7 @@ public Thread newThread(Runnable r) { public void testTrans_SendCheckerCommit_PushConsume() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(methodName); + String topic = getTopic(MessageType.TRANSACTION, methodName); String groupId = getGroupId(methodName); RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); @@ -156,7 +156,7 @@ public Thread newThread(Runnable r) { } }); - RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, executorService, + RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, topic, executorService, new TransactionListenerImpl(LocalTransactionState.COMMIT_MESSAGE, LocalTransactionState.UNKNOW), rpcHook); producer.sendTrans(topic, tag, SEND_NUM); @@ -173,7 +173,7 @@ public Thread newThread(Runnable r) { @DisplayName("Send 10 transaction messages and roll back the transaction by Checker (performing ROLLBACK), expecting that the 10 messages will not be consumed by PushConsumer") public void testTrans_CheckerRollback() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(methodName); + String topic = getTopic(MessageType.TRANSACTION, methodName); String groupId = getGroupId(methodName); RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); @@ -206,8 +206,8 @@ public Thread newThread(Runnable r) { public void testTrans_SendCheckerPartionCommit() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); - String topic = getTopic(methodName); - String groupId = getGroupId(methodName); + String topic = getTopic(MessageType.TRANSACTION, methodName); + String groupId = getGroupId(methodName); RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook); pushConsumer.subscribeAndStart(topic, MessageSelector.byTag(tag), new RMQNormalListener()); @@ -225,7 +225,7 @@ public Thread newThread(Runnable r) { } }); - RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, executorService, + RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, topic, executorService, new TransactionListener() { @Override @@ -253,6 +253,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) { await().atMost(90, SECONDS).until(new Callable() { @Override public Boolean call() { + System.out.printf("rollbackMsg: %d, commitMsg: %d \n", rollbackMsgNum.get(), commitMsgNum.get()); return rollbackMsgNum.get() == commitMsgNum.get() && commitMsgNum.get() == SEND_NUM / 2; } }); diff --git a/java/e2e/src/test/java/org/apache/rocketmq/broker/client/message/NormalMessageSizeTest.java b/java/e2e/src/test/java/org/apache/rocketmq/broker/client/message/NormalMessageSizeTest.java index 330726c..412c548 100644 --- a/java/e2e/src/test/java/org/apache/rocketmq/broker/client/message/NormalMessageSizeTest.java +++ b/java/e2e/src/test/java/org/apache/rocketmq/broker/client/message/NormalMessageSizeTest.java @@ -57,14 +57,14 @@ public class NormalMessageSizeTest extends BaseOperate { public static void setUpAll() { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); normalTopic = getTopic(TopicMessageType.NORMAL.getValue(), methodName); -// transTopic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName); + transTopic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName); delayTopic = getTopic(TopicMessageType.DELAY.getValue(), methodName); fifoTopic = getTopic(TopicMessageType.FIFO.getValue(), methodName); try { producer = provider.newProducerBuilder() .setTransactionChecker(messageView -> TransactionResolution.COMMIT) .setClientConfiguration(ClientConfigurationFactory.build(account)) - .setTopics(normalTopic, delayTopic, fifoTopic) + .setTopics(normalTopic, delayTopic, transTopic, fifoTopic) .build(); } catch (ClientException e) { Assertions.fail("create producer failed"); @@ -133,7 +133,6 @@ public void testDelayMsgSize4M() { } } - @Disabled @Test @DisplayName("Send transaction messages synchronously with the body size of 4M+1, expect send failed") public void testTransMsgSize4MAdd1() { @@ -147,7 +146,6 @@ public void testTransMsgSize4MAdd1() { }); } - @Disabled @Test @DisplayName("Send transaction messages synchronously with the body size of 4M, expect send success") public void testTransMsgSize4M() { diff --git a/java/e2e/src/test/java/org/apache/rocketmq/broker/server/TransactionMessageTest.java b/java/e2e/src/test/java/org/apache/rocketmq/broker/server/TransactionMessageTest.java index 6ee3f98..b2e5087 100644 --- a/java/e2e/src/test/java/org/apache/rocketmq/broker/server/TransactionMessageTest.java +++ b/java/e2e/src/test/java/org/apache/rocketmq/broker/server/TransactionMessageTest.java @@ -81,7 +81,6 @@ public void tearDown() { } } - @Disabled @Test @DisplayName("Send 10 transaction messages and synchronously commit the transaction (Checker performs rollback), expecting those 10 messages to be consumed via PushConsumer") public void testTrans_SendCommit_PushConsume() { @@ -91,8 +90,6 @@ public void testTrans_SendCommit_PushConsume() { String groupId = getGroupId(methodName); pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener()); - simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10)); - VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer()); producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.ROLLBACK)); Assertions.assertNotNull(producer); @@ -104,7 +101,6 @@ public void testTrans_SendCommit_PushConsume() { VerifyUtils.verifyNormalMessage(producer.getEnqueueMessages(), pushConsumer.getListener().getDequeueMessages()); } - @Disabled @Test @DisplayName("Send 10 transaction messages and rollback directly (Checker does commit), expecting that these 10 messages cannot be consumed by PushConsumer") public void testTrans_SendRollback_PushConsume() { @@ -113,8 +109,6 @@ public void testTrans_SendRollback_PushConsume() { String groupId = getGroupId(methodName); pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener()); - simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10)); - VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer()); producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.COMMIT)); Assertions.assertNotNull(producer); @@ -128,7 +122,6 @@ public void testTrans_SendRollback_PushConsume() { Assertions.assertEquals(0, pushConsumer.getListener().getDequeueMessages().getDataSize()); } - @Disabled @Test @DisplayName("Send 10 transaction messages and COMMIT the transaction by Checker (perform COMMIT), expecting the 10 messages to be consumed by PushConsumer") public void testTrans_SendCheckerCommit_PushConsume() { @@ -138,8 +131,6 @@ public void testTrans_SendCheckerCommit_PushConsume() { String groupId = getGroupId(methodName); pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener()); - simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10)); - VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer()); producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.COMMIT)); Assertions.assertNotNull(producer); @@ -153,7 +144,6 @@ public void testTrans_SendCheckerCommit_PushConsume() { VerifyUtils.verifyNormalMessage(producer.getEnqueueMessages(), pushConsumer.getListener().getDequeueMessages()); } - @Disabled @Test @DisplayName("Send 10 transaction messages and roll back the transaction by Checker (performing ROLLBACK), expecting that the 10 messages will not be consumed by PushConsumer") public void testTrans_CheckerRollback() { @@ -162,8 +152,6 @@ public void testTrans_CheckerRollback() { String groupId = getGroupId(methodName); pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener()); - simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10)); - VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer()); producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.ROLLBACK)); Assertions.assertNotNull(producer); @@ -177,7 +165,6 @@ public void testTrans_CheckerRollback() { Assertions.assertEquals(0, pushConsumer.getListener().getDequeueMessages().getDataSize()); } - @Disabled @Test @DisplayName("Send 10 transactional messages and commit them by checking back (Checker commits for partial messages), and the expected committed messages can be consumed by PushConsumer") public void testTrans_SendCheckerPartionCommit() { @@ -187,8 +174,6 @@ public void testTrans_SendCheckerPartionCommit() { String groupId = getGroupId(methodName); pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener()); - simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10)); - VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer()); AtomicInteger commitMsgNum = new AtomicInteger(0); AtomicInteger rollbackMsgNum = new AtomicInteger(0); diff --git a/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleTopicTypeTest.java b/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleTopicTypeTest.java index df1dcf4..dc8d1f4 100644 --- a/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleTopicTypeTest.java +++ b/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleTopicTypeTest.java @@ -83,7 +83,6 @@ public void testDelay_simple_receive_ack() { VerifyUtils.waitDelayReceiveThenAck(producer, consumer, 1, 10000); } - @Disabled @Test @DisplayName("Send 10 transaction messages synchronously and expect SimpleConsumer to receive() and ack() messages properly") public void testTrans_simple_receive_ackAsync() { @@ -93,7 +92,6 @@ public void testTrans_simple_receive_ackAsync() { String groupId = getGroupId(methodName); SimpleConsumer consumer = ConsumerFactory.getSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10)); - VerifyUtils.tryReceiveOnce(consumer); RMQNormalProducer producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.COMMIT)); Assertions.assertNotNull(producer, "Get Producer failed"); for (int i = 0; i < SEND_NUM; i++) {