Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adaption with RoS #1

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions java/e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
<artifactId>rocketmq-java-e2e</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<rocketmqV5.client.version>5.0.3</rocketmqV5.client.version>
<junit.jupiter.version>5.7.2</junit.jupiter.version>
</properties>
Expand All @@ -45,6 +45,16 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>5.0.0</version>
<exclusions>
<exclusion>
<artifactId>protobuf-java</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
<exclusion>
<artifactId>protobuf-java-util</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
Expand Down Expand Up @@ -93,7 +103,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.0-beta1</version>
<version>2.0.9</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
Expand All @@ -105,6 +115,16 @@
<artifactId>logback-core</artifactId>
<version>1.3.0-beta0</version>
</dependency>
<dependency>
<groupId>com.automq.rocketmq</groupId>
<artifactId>rocketmq-controller</artifactId>
<version>5.1.3-automq-0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -159,28 +179,20 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
<version>3.1.2</version>
<configuration>
<argLine>-Xmx1024m -XX:MaxPermSize=256m</argLine>
<skipAfterFailureCount>1</skipAfterFailureCount>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
<argLine>-Xmx1024m -XX:MaxMetaspaceSize=256m</argLine>
<trimStackTrace>false</trimStackTrace>
<!-- <rerunFailingTestsCount>1</rerunFailingTestsCount>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.22.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
Expand All @@ -200,6 +212,14 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>9</source>
<target>9</target>
</configuration>
</plugin>

</plugins>
<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.rocketmq.factory;

import com.automq.rocketmq.controller.metadata.GrpcControllerClient;
import org.apache.rocketmq.client.apis.ClientServiceProvider;

public class BaseFactory {

protected static ClientServiceProvider provider = ClientServiceProvider.loadService();
protected static GrpcControllerClient client = new GrpcControllerClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
import org.apache.rocketmq.listener.rmq.RMQNormalListener;
import org.apache.rocketmq.util.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerFactory extends BaseFactory {

private static final Logger log = LoggerFactory.getLogger(ConsumerFactory.class);

public static RMQNormalConsumer getRMQPushConsumer(Account account, String topic, String consumerGroup,
FilterExpression filterExpression, RMQNormalListener messageListener) {
PushConsumer pushConsumer = null;
Expand Down Expand Up @@ -109,10 +113,9 @@ public static SimpleConsumer getSimpleConsumer(Account account, String topic, St
.build();
TestUtils.waitForSeconds(1);
} catch (Exception e) {
e.printStackTrace();
log.error("start [SimpleConsumer] failed, message: {}", e.getMessage());
Assertions.fail("start [SimpleConsumer] failed, message: " + e.getMessage());
}

return simpleConsumer;
}
}
76 changes: 68 additions & 8 deletions java/e2e/src/main/java/org/apache/rocketmq/frame/BaseOperate.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@

package org.apache.rocketmq.frame;

import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.MessageType;
import com.automq.rocketmq.controller.metadata.GrpcControllerClient;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.util.MQAdmin;
Expand All @@ -32,6 +40,8 @@ public class BaseOperate extends ResourceInit {

protected static RPCHook rpcHook;

protected static GrpcControllerClient client;

static {
if (aclEnable) {
log.info("acl enable");
Expand All @@ -43,6 +53,7 @@ public void run() {
log.info("Shutdown Hook is running !");
}
});
client = new GrpcControllerClient();
}

// //
Expand All @@ -69,9 +80,34 @@ public void run() {
protected static String getTopic(String messageType, String methodName) {
String topic = String.format("topic_%s_%s_%s", messageType, methodName, RandomUtils.getStringWithCharacter(6));
log.info("[Topic] topic:{}, messageType:{}, methodName:{}", topic, messageType, methodName);
boolean result = MQAdmin.createTopic(cluster, topic, 8, messageType);
Assertions.assertTrue(result, String.format("Create topic:%s failed", topic));
return topic;
try {
CreateTopicRequest request = CreateTopicRequest.newBuilder()
.setTopic(topic)
.setCount(8)
.addAcceptMessageTypes(convertMessageType(messageType))
.build();
Long topicId = client.createTopic(endPoint, request).join();
log.info("create topic: {} , topicId:{}", topic, topicId);
return topic;
} catch (Exception e) {
log.error("create topic error", e);
}
return null;
}

private static MessageType convertMessageType(String typeStr) {
switch (typeStr) {
case "NORMAL":
return MessageType.NORMAL;
case "FIFO":
return MessageType.FIFO;
case "DELAY":
return MessageType.DELAY;
case "TRANSACTION":
return MessageType.TRANSACTION;
default:
return MessageType.MESSAGE_TYPE_UNSPECIFIED;
}
}

protected static String resetOffsetByTimestamp(String consumerGroup, String topic, long timestamp) {
Expand Down Expand Up @@ -107,17 +143,41 @@ protected static String resetOffsetByTimestamp(String consumerGroup, String topi
//The synchronization consumption retry policy is DefaultRetryPolicy
protected static String getGroupId(String methodName) {
String groupId = String.format("GID_%s_%s", methodName, RandomUtils.getStringWithCharacter(6));
log.info("[ConsumerGroupId] groupId:{}, methodName:{}", groupId, methodName);
// prepare consumer group
CreateGroupRequest request = CreateGroupRequest.newBuilder()
.setName(groupId)
.setMaxRetryAttempt(16)
.setGroupType(GroupType.GROUP_TYPE_STANDARD)
.build();
CreateGroupReply reply = createConsumerGroup(request).join();
log.info("[ConsumerGroupId] groupId:{} methodName:{} reply:{}", groupId, methodName, reply);
return groupId;
}

//
// //The sequential consumption retry policy is FixedRetryPolicy
//The sequential consumption retry policy is FixedRetryPolicy
protected static String getOrderlyGroupId(String methodName) {
String groupId = String.format("GID_%s_%s", methodName, RandomUtils.getStringWithCharacter(6));
log.info("[ConsumerGroupId] groupId:{} methodName:{}", groupId, methodName);
MQAdmin.createConsumerGroup(cluster, groupId, 30);
CreateGroupRequest request = CreateGroupRequest.newBuilder()
.setName(groupId)
.setMaxRetryAttempt(16)
.setGroupType(GroupType.GROUP_TYPE_FIFO)
.build();
CreateGroupReply reply = createConsumerGroup(request).join();
log.info("[ConsumerGroupId] groupId:{} methodName:{} reply:{}", groupId, methodName, reply);
return groupId;
}

private static CompletableFuture<CreateGroupReply> createConsumerGroup(CreateGroupRequest request) {
try {
CompletableFuture<CreateGroupReply> groupCf = client.createGroup(account.getEndpoint(), request);
return groupCf.exceptionally(throwable -> {
log.error("Create group failed", throwable);
throw new CompletionException(throwable);
});
} catch (Exception e) {
log.error("Create group failed", e);
return CompletableFuture.failedFuture(e);
}
}

}
2 changes: 1 addition & 1 deletion java/e2e/src/test/resources/env/daily/daily.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

aclEnable = false

namesrvAddr = 127.0.0.1:9876
namesrvAddr = 127.0.0.1:8081
endPoint = 127.0.0.1:8081
# cluster
cluster = DefaultCluster