From 63ad33cb2476eb0489e2d2429622de46e53a8cf1 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 18 Oct 2023 10:08:29 +0800 Subject: [PATCH] feat: Adaption with RoS 1. Adaption with RoS Signed-off-by: TheR1sing3un --- java/e2e/pom.xml | 52 +++++++++---- .../apache/rocketmq/factory/BaseFactory.java | 2 + .../rocketmq/factory/ConsumerFactory.java | 7 +- .../apache/rocketmq/frame/BaseOperate.java | 76 +++++++++++++++++-- .../src/test/resources/env/daily/daily.conf | 2 +- 5 files changed, 112 insertions(+), 27 deletions(-) diff --git a/java/e2e/pom.xml b/java/e2e/pom.xml index b692b40..833a47e 100644 --- a/java/e2e/pom.xml +++ b/java/e2e/pom.xml @@ -29,8 +29,8 @@ rocketmq-java-e2e - 8 - 8 + 17 + 17 5.0.3 5.7.2 @@ -45,6 +45,16 @@ org.apache.rocketmq rocketmq-acl 5.0.0 + + + protobuf-java + com.google.protobuf + + + protobuf-java-util + com.google.protobuf + + org.apache.rocketmq @@ -93,7 +103,7 @@ org.slf4j slf4j-api - 2.0.0-beta1 + 2.0.9 ch.qos.logback @@ -105,6 +115,16 @@ logback-core 1.3.0-beta0 + + com.automq.rocketmq + rocketmq-controller + 5.1.3-automq-0-SNAPSHOT + + + org.projectlombok + lombok + 1.18.28 + @@ -159,11 +179,13 @@ org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M5 + 3.1.2 - -Xmx1024m -XX:MaxPermSize=256m + 1 + 1 + true + -Xmx1024m -XX:MaxMetaspaceSize=256m false - @@ -171,16 +193,6 @@ maven-failsafe-plugin 2.22.2 - - org.apache.maven.plugins - maven-compiler-plugin - 3.8.0 - - 1.8 - 1.8 - UTF-8 - - org.codehaus.mojo build-helper-maven-plugin @@ -200,6 +212,14 @@ + + org.apache.maven.plugins + maven-compiler-plugin + + 9 + 9 + + diff --git a/java/e2e/src/main/java/org/apache/rocketmq/factory/BaseFactory.java b/java/e2e/src/main/java/org/apache/rocketmq/factory/BaseFactory.java index 8db17f3..364b855 100644 --- a/java/e2e/src/main/java/org/apache/rocketmq/factory/BaseFactory.java +++ b/java/e2e/src/main/java/org/apache/rocketmq/factory/BaseFactory.java @@ -17,9 +17,11 @@ package org.apache.rocketmq.factory; +import com.automq.rocketmq.controller.metadata.GrpcControllerClient; import org.apache.rocketmq.client.apis.ClientServiceProvider; public class BaseFactory { protected static ClientServiceProvider provider = ClientServiceProvider.loadService(); + protected static GrpcControllerClient client = new GrpcControllerClient(); } diff --git a/java/e2e/src/main/java/org/apache/rocketmq/factory/ConsumerFactory.java b/java/e2e/src/main/java/org/apache/rocketmq/factory/ConsumerFactory.java index 9c45261..86afd73 100644 --- a/java/e2e/src/main/java/org/apache/rocketmq/factory/ConsumerFactory.java +++ b/java/e2e/src/main/java/org/apache/rocketmq/factory/ConsumerFactory.java @@ -28,9 +28,13 @@ import org.apache.rocketmq.listener.rmq.RMQNormalListener; import org.apache.rocketmq.util.TestUtils; import org.junit.jupiter.api.Assertions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ConsumerFactory extends BaseFactory { + private static final Logger log = LoggerFactory.getLogger(ConsumerFactory.class); + public static RMQNormalConsumer getRMQPushConsumer(Account account, String topic, String consumerGroup, FilterExpression filterExpression, RMQNormalListener messageListener) { PushConsumer pushConsumer = null; @@ -109,10 +113,9 @@ public static SimpleConsumer getSimpleConsumer(Account account, String topic, St .build(); TestUtils.waitForSeconds(1); } catch (Exception e) { - e.printStackTrace(); + log.error("start [SimpleConsumer] failed, message: {}", e.getMessage()); Assertions.fail("start [SimpleConsumer] failed, message: " + e.getMessage()); } - return simpleConsumer; } } diff --git a/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java b/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java index 416ffbe..dd76395 100644 --- a/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java +++ b/java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java @@ -17,6 +17,14 @@ package org.apache.rocketmq.frame; +import apache.rocketmq.controller.v1.CreateGroupReply; +import apache.rocketmq.controller.v1.CreateGroupRequest; +import apache.rocketmq.controller.v1.CreateTopicRequest; +import apache.rocketmq.controller.v1.GroupType; +import apache.rocketmq.controller.v1.MessageType; +import com.automq.rocketmq.controller.metadata.GrpcControllerClient; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.util.MQAdmin; @@ -32,6 +40,8 @@ public class BaseOperate extends ResourceInit { protected static RPCHook rpcHook; + protected static GrpcControllerClient client; + static { if (aclEnable) { log.info("acl enable"); @@ -43,6 +53,7 @@ public void run() { log.info("Shutdown Hook is running !"); } }); + client = new GrpcControllerClient(); } // // @@ -69,9 +80,34 @@ public void run() { protected static String getTopic(String messageType, String methodName) { String topic = String.format("topic_%s_%s_%s", messageType, methodName, RandomUtils.getStringWithCharacter(6)); log.info("[Topic] topic:{}, messageType:{}, methodName:{}", topic, messageType, methodName); - boolean result = MQAdmin.createTopic(cluster, topic, 8, messageType); - Assertions.assertTrue(result, String.format("Create topic:%s failed", topic)); - return topic; + try { + CreateTopicRequest request = CreateTopicRequest.newBuilder() + .setTopic(topic) + .setCount(8) + .addAcceptMessageTypes(convertMessageType(messageType)) + .build(); + Long topicId = client.createTopic(endPoint, request).join(); + log.info("create topic: {} , topicId:{}", topic, topicId); + return topic; + } catch (Exception e) { + log.error("create topic error", e); + } + return null; + } + + private static MessageType convertMessageType(String typeStr) { + switch (typeStr) { + case "NORMAL": + return MessageType.NORMAL; + case "FIFO": + return MessageType.FIFO; + case "DELAY": + return MessageType.DELAY; + case "TRANSACTION": + return MessageType.TRANSACTION; + default: + return MessageType.MESSAGE_TYPE_UNSPECIFIED; + } } protected static String resetOffsetByTimestamp(String consumerGroup, String topic, long timestamp) { @@ -107,17 +143,41 @@ protected static String resetOffsetByTimestamp(String consumerGroup, String topi //The synchronization consumption retry policy is DefaultRetryPolicy protected static String getGroupId(String methodName) { String groupId = String.format("GID_%s_%s", methodName, RandomUtils.getStringWithCharacter(6)); - log.info("[ConsumerGroupId] groupId:{}, methodName:{}", groupId, methodName); + // prepare consumer group + CreateGroupRequest request = CreateGroupRequest.newBuilder() + .setName(groupId) + .setMaxRetryAttempt(16) + .setGroupType(GroupType.GROUP_TYPE_STANDARD) + .build(); + CreateGroupReply reply = createConsumerGroup(request).join(); + log.info("[ConsumerGroupId] groupId:{} methodName:{} reply:{}", groupId, methodName, reply); return groupId; } - // -// //The sequential consumption retry policy is FixedRetryPolicy + //The sequential consumption retry policy is FixedRetryPolicy protected static String getOrderlyGroupId(String methodName) { String groupId = String.format("GID_%s_%s", methodName, RandomUtils.getStringWithCharacter(6)); - log.info("[ConsumerGroupId] groupId:{} methodName:{}", groupId, methodName); - MQAdmin.createConsumerGroup(cluster, groupId, 30); + CreateGroupRequest request = CreateGroupRequest.newBuilder() + .setName(groupId) + .setMaxRetryAttempt(16) + .setGroupType(GroupType.GROUP_TYPE_FIFO) + .build(); + CreateGroupReply reply = createConsumerGroup(request).join(); + log.info("[ConsumerGroupId] groupId:{} methodName:{} reply:{}", groupId, methodName, reply); return groupId; } + private static CompletableFuture createConsumerGroup(CreateGroupRequest request) { + try { + CompletableFuture groupCf = client.createGroup(account.getEndpoint(), request); + return groupCf.exceptionally(throwable -> { + log.error("Create group failed", throwable); + throw new CompletionException(throwable); + }); + } catch (Exception e) { + log.error("Create group failed", e); + return CompletableFuture.failedFuture(e); + } + } + } diff --git a/java/e2e/src/test/resources/env/daily/daily.conf b/java/e2e/src/test/resources/env/daily/daily.conf index 6e0dc5e..2c5cdf1 100644 --- a/java/e2e/src/test/resources/env/daily/daily.conf +++ b/java/e2e/src/test/resources/env/daily/daily.conf @@ -15,7 +15,7 @@ aclEnable = false -namesrvAddr = 127.0.0.1:9876 +namesrvAddr = 127.0.0.1:8081 endPoint = 127.0.0.1:8081 # cluster cluster = DefaultCluster \ No newline at end of file