Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support trans message e2e test #19

Merged
merged 2 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.rocketmq.utils.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.stringtemplate.v4.ST;

import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -73,4 +74,25 @@ public static RMQTransactionProducer getTransProducer(String ns, ExecutorService
return new RMQTransactionProducer(producer);
}

public static RMQTransactionProducer getTransProducer(String ns, String groupName, ExecutorService executorService, TransactionListener transactionListener, RPCHook rpcHook) {
TransactionMQProducer producer;
if (aclEnable) {
producer = new TransactionMQProducer(groupName, rpcHook);
} else {
producer = new TransactionMQProducer(groupName);
}
producer.setInstanceName(UUID.randomUUID().toString());
producer.setNamesrvAddr(ns);
try {
if (executorService != null) {
producer.setExecutorService(executorService);
}
producer.setTransactionListener(transactionListener);
producer.start();
} catch (MQClientException e) {
logger.info("Start TransactionMQProducer failed, {}", e.getMessage());
}
return new RMQTransactionProducer(producer);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ protected static String getGroupId(String methodName) {
return getGroupId(methodName, SubscriptionMode.SUB_MODE_PULL);
}

protected static String getTransGroupId(String groupName, SubscriptionMode mode) {
CreateGroupRequest request = CreateGroupRequest.newBuilder()
.setName(groupName)
.setMaxDeliveryAttempt(16)
.setGroupType(GroupType.GROUP_TYPE_STANDARD)
.setSubMode(mode)
.build();
Long reply = createConsumerGroup(request).join();
logger.info("[ConsumerGroupId] groupId:{} , mode: {} , reply:{}", groupName, mode, reply);
return groupName;
}

protected static String getGroupId(String methodName, SubscriptionMode mode) {
String groupId = String.format("GID_%s_%s", methodName, RandomUtils.getStringWithCharacter(6));
// prepare consumer group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt message) {
// }
// }
// 本地事务已成功则提交消息
System.out.println("checkLocalTransaction: commit");
return checker;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ public void testDelayMsgSize4M() {
}
}

@Disabled
@Test
@DisplayName("Send transaction messages synchronously with the body size of 4M+1, expect send failed")
public void testTransMsgSize4MAdd1() {
Expand Down Expand Up @@ -212,7 +211,6 @@ public Thread newThread(Runnable r) {
producer.shutdown();
}

@Disabled
@Test
@DisplayName("Send transaction messages synchronously with the body size of 4M, expect send success")
public void testTransMsgSize4M() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.rocketmq.server.transaction;

import apache.rocketmq.controller.v1.MessageType;
import apache.rocketmq.controller.v1.SubscriptionMode;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
Expand All @@ -30,7 +32,6 @@
import org.apache.rocketmq.frame.BaseOperate;
import org.apache.rocketmq.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.listener.rmq.concurrent.TransactionListenerImpl;
import org.apache.rocketmq.utils.MQAdmin;
import org.apache.rocketmq.utils.NameUtils;
import org.apache.rocketmq.utils.TestUtils;
import org.apache.rocketmq.utils.VerifyUtils;
Expand All @@ -56,7 +57,6 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;

@Disabled
@Tag(TESTSET.TRANSACTION)
@Tag(TESTSET.SMOKE)
public class TransactionMessageTest extends BaseOperate {
Expand All @@ -68,16 +68,15 @@ public class TransactionMessageTest extends BaseOperate {

@BeforeEach
public void setUp() {
topic = NameUtils.getTopicName();
tag = NameUtils.getTagName();
groupId = NameUtils.getGroupName();
MQAdmin.createTopic(namesrvAddr, cluster, topic, 8);
logger.info("topic:{}, tag:{}, groupId:{}", topic, tag, groupId);
}

@Test
@DisplayName("Send 10 transaction messages synchronously, expecting all to be consumed")
public void testConsumeNormalMessage() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(MessageType.TRANSACTION, methodName);
String groupId = getGroupId(methodName);
RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook);
consumer.subscribeAndStart(topic, tag, new RMQNormalListener());

Expand All @@ -91,7 +90,7 @@ public Thread newThread(Runnable r) {
}
});

RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, executorService,
RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, topic, executorService,
new TransactionListenerImpl(LocalTransactionState.COMMIT_MESSAGE, LocalTransactionState.COMMIT_MESSAGE),
rpcHook);
producer.sendTrans(topic, tag, SEND_NUM);
Expand All @@ -105,7 +104,7 @@ public Thread newThread(Runnable r) {
@DisplayName("Send 10 transaction messages and rollback directly (Checker does commit), expecting that these 10 messages cannot be consumed by PushConsumer")
public void testTrans_SendRollback_PushConsume() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(methodName);
String topic = getTopic(MessageType.TRANSACTION, methodName);
String groupId = getGroupId(methodName);

RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook);
Expand All @@ -130,7 +129,8 @@ public Thread newThread(Runnable r) {
// message
TestUtils.waitForSeconds(60);
Assertions.assertEquals(SEND_NUM, producer.getEnqueueMessages().getDataSize(), "send message failed");
Assertions.assertEquals(0, pushConsumer.getListener().getDequeueMessages().getDataSize());
Assertions.assertEquals(SEND_NUM, pushConsumer.getListener().getDequeueMessages().getDataSize());
Assertions.assertEquals(0, pushConsumer.getListener().getEnqueueMessages().getDataSize());
producer.shutdown();
pushConsumer.shutdown();
}
Expand All @@ -140,7 +140,7 @@ public Thread newThread(Runnable r) {
public void testTrans_SendCheckerCommit_PushConsume() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();

String topic = getTopic(methodName);
String topic = getTopic(MessageType.TRANSACTION, methodName);
String groupId = getGroupId(methodName);

RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook);
Expand All @@ -156,7 +156,7 @@ public Thread newThread(Runnable r) {
}
});

RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, executorService,
RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, topic, executorService,
new TransactionListenerImpl(LocalTransactionState.COMMIT_MESSAGE, LocalTransactionState.UNKNOW),
rpcHook);
producer.sendTrans(topic, tag, SEND_NUM);
Expand All @@ -173,7 +173,7 @@ public Thread newThread(Runnable r) {
@DisplayName("Send 10 transaction messages and roll back the transaction by Checker (performing ROLLBACK), expecting that the 10 messages will not be consumed by PushConsumer")
public void testTrans_CheckerRollback() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
String topic = getTopic(methodName);
String topic = getTopic(MessageType.TRANSACTION, methodName);
String groupId = getGroupId(methodName);

RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook);
Expand Down Expand Up @@ -206,8 +206,8 @@ public Thread newThread(Runnable r) {
public void testTrans_SendCheckerPartionCommit() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();

String topic = getTopic(methodName);
String groupId = getGroupId(methodName);
String topic = getTopic(MessageType.TRANSACTION, methodName);
String groupId = getGroupId(methodName);

RMQNormalConsumer pushConsumer = ConsumerFactory.getRMQNormalConsumer(namesrvAddr, groupId, rpcHook);
pushConsumer.subscribeAndStart(topic, MessageSelector.byTag(tag), new RMQNormalListener());
Expand All @@ -225,7 +225,7 @@ public Thread newThread(Runnable r) {
}
});

RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, executorService,
RMQTransactionProducer producer = ProducerFactory.getTransProducer(namesrvAddr, topic, executorService,
new TransactionListener() {

@Override
Expand Down Expand Up @@ -253,6 +253,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
await().atMost(90, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() {
System.out.printf("rollbackMsg: %d, commitMsg: %d \n", rollbackMsgNum.get(), commitMsgNum.get());
return rollbackMsgNum.get() == commitMsgNum.get() && commitMsgNum.get() == SEND_NUM / 2;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ public class NormalMessageSizeTest extends BaseOperate {
public static void setUpAll() {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
normalTopic = getTopic(TopicMessageType.NORMAL.getValue(), methodName);
// transTopic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
transTopic = getTopic(TopicMessageType.TRANSACTION.getValue(), methodName);
delayTopic = getTopic(TopicMessageType.DELAY.getValue(), methodName);
fifoTopic = getTopic(TopicMessageType.FIFO.getValue(), methodName);
try {
producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> TransactionResolution.COMMIT)
.setClientConfiguration(ClientConfigurationFactory.build(account))
.setTopics(normalTopic, delayTopic, fifoTopic)
.setTopics(normalTopic, delayTopic, transTopic, fifoTopic)
.build();
} catch (ClientException e) {
Assertions.fail("create producer failed");
Expand Down Expand Up @@ -133,7 +133,6 @@ public void testDelayMsgSize4M() {
}
}

@Disabled
@Test
@DisplayName("Send transaction messages synchronously with the body size of 4M+1, expect send failed")
public void testTransMsgSize4MAdd1() {
Expand All @@ -147,7 +146,6 @@ public void testTransMsgSize4MAdd1() {
});
}

@Disabled
@Test
@DisplayName("Send transaction messages synchronously with the body size of 4M, expect send success")
public void testTransMsgSize4M() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public void tearDown() {
}
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages and synchronously commit the transaction (Checker performs rollback), expecting those 10 messages to be consumed via PushConsumer")
public void testTrans_SendCommit_PushConsume() {
Expand All @@ -91,8 +90,6 @@ public void testTrans_SendCommit_PushConsume() {
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer());

producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.ROLLBACK));
Assertions.assertNotNull(producer);
Expand All @@ -104,7 +101,6 @@ public void testTrans_SendCommit_PushConsume() {
VerifyUtils.verifyNormalMessage(producer.getEnqueueMessages(), pushConsumer.getListener().getDequeueMessages());
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages and rollback directly (Checker does commit), expecting that these 10 messages cannot be consumed by PushConsumer")
public void testTrans_SendRollback_PushConsume() {
Expand All @@ -113,8 +109,6 @@ public void testTrans_SendRollback_PushConsume() {
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer());

producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.COMMIT));
Assertions.assertNotNull(producer);
Expand All @@ -128,7 +122,6 @@ public void testTrans_SendRollback_PushConsume() {
Assertions.assertEquals(0, pushConsumer.getListener().getDequeueMessages().getDataSize());
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages and COMMIT the transaction by Checker (perform COMMIT), expecting the 10 messages to be consumed by PushConsumer")
public void testTrans_SendCheckerCommit_PushConsume() {
Expand All @@ -138,8 +131,6 @@ public void testTrans_SendCheckerCommit_PushConsume() {
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer());

producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.COMMIT));
Assertions.assertNotNull(producer);
Expand All @@ -153,7 +144,6 @@ public void testTrans_SendCheckerCommit_PushConsume() {
VerifyUtils.verifyNormalMessage(producer.getEnqueueMessages(), pushConsumer.getListener().getDequeueMessages());
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages and roll back the transaction by Checker (performing ROLLBACK), expecting that the 10 messages will not be consumed by PushConsumer")
public void testTrans_CheckerRollback() {
Expand All @@ -162,8 +152,6 @@ public void testTrans_CheckerRollback() {
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer());

producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.ROLLBACK));
Assertions.assertNotNull(producer);
Expand All @@ -177,7 +165,6 @@ public void testTrans_CheckerRollback() {
Assertions.assertEquals(0, pushConsumer.getListener().getDequeueMessages().getDataSize());
}

@Disabled
@Test
@DisplayName("Send 10 transactional messages and commit them by checking back (Checker commits for partial messages), and the expected committed messages can be consumed by PushConsumer")
public void testTrans_SendCheckerPartionCommit() {
Expand All @@ -187,8 +174,6 @@ public void testTrans_SendCheckerPartionCommit() {
String groupId = getGroupId(methodName);

pushConsumer = ConsumerFactory.getRMQPushConsumer(account, topic, groupId, new FilterExpression(tag), new RMQNormalListener());
simpleConsumer = ConsumerFactory.getRMQSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(simpleConsumer.getSimpleConsumer());

AtomicInteger commitMsgNum = new AtomicInteger(0);
AtomicInteger rollbackMsgNum = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public void testDelay_simple_receive_ack() {
VerifyUtils.waitDelayReceiveThenAck(producer, consumer, 1, 10000);
}

@Disabled
@Test
@DisplayName("Send 10 transaction messages synchronously and expect SimpleConsumer to receive() and ack() messages properly")
public void testTrans_simple_receive_ackAsync() {
Expand All @@ -93,7 +92,6 @@ public void testTrans_simple_receive_ackAsync() {
String groupId = getGroupId(methodName);

SimpleConsumer consumer = ConsumerFactory.getSimpleConsumer(account, topic, groupId, new FilterExpression(tag), Duration.ofSeconds(10));
VerifyUtils.tryReceiveOnce(consumer);
RMQNormalProducer producer = ProducerFactory.getRMQTransProducer(account, topic, new LocalTransactionCheckerImpl(TransactionResolution.COMMIT));
Assertions.assertNotNull(producer, "Get Producer failed");
for (int i = 0; i < SEND_NUM; i++) {
Expand Down