Skip to content

Commit

Permalink
[ISSUE#43]Add 4 features under simple directory (#44)
Browse files Browse the repository at this point in the history
* features

* Delete ConsumerGroup.feature

* features

* Delete bdd/src/main/resources/consumer directory

* features

* features

* finish consumer and producer features in client, 2 sql filter features and a tag filter feature

* fix a problem in SimpleConsumerInitTest.java

* fix some problems in features

* update

* Add 4 features of SimpleConsumers scenarios

* Add 4 features of SimpleConsumers scenarios

---------

Co-authored-by: alani <[email protected]>
  • Loading branch information
nannananananana and alani authored Jun 20, 2023
1 parent caacf39 commit 364fc9a
Show file tree
Hide file tree
Showing 7 changed files with 471 additions and 16 deletions.
174 changes: 158 additions & 16 deletions bdd/src/main/java/org/apache/rocketmq/ClientInitStepdefs.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ public void createAMessageIncludingTheTopicTagAndBody(String arg0, String arg1,

@And("Create a message, including the Topic\\({string}), Tag\\({string}), Body\\({string}), deliveryTimestamp\\({string})")
public void createAMessageIncludingTheTopicTagBodyDeliveryTimestamp(String arg0, String arg1, String arg2, String arg3) {



}

@And("Create a message, including the Topic\\({string}), Tag\\({string}), Body\\({string}), messageGroup\\({string})")
Expand Down Expand Up @@ -269,21 +270,6 @@ public void createASimpleConsumerSetTheEndpointTopicConsumerGroupFilterExpressio

}

@Then("Check SimpleConsumer pull a message once")
public void checkSimpleConsumerPullAMessageOnce() {

}

@And("SimpleConsumer invokes receive method {string} and returns acks {string}")
public void simpleconsumerInvokesReceiveMethodAndReturnsAcks(String arg0, String arg1) {

}

@Then("Check all messages are pulled by SimpleConsumer {string}")
public void checkAllMessagesArePulledBySimpleConsumer(String arg0) {

}

@When("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), ConsumerGroup\\({string}), AwaitDuration\\({string}), SubscriptionExpressions\\(NULL)")
public void createASetTheClientConfigurationEndpointConsumerGroupAwaitDurationSubscriptionExpressionsNULL(String arg0, String arg1, String arg2, String arg3) {

Expand Down Expand Up @@ -417,4 +403,160 @@ public void createASetTheTopicsAndMaxAttempts(String arg0, String arg1, int arg2
@When("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), and MaxAttempts\\({int})")
public void createASetTheClientConfigurationEndpointAndMaxAttempts(String arg0, String arg1, int arg2) {
}

@Then("Check only the first message that can be pulled by SimpleConsumer")
public void checkOnlyTheFirstMessageThatCanBePulledBySimpleConsumer() {
}

@And("Create {string} messages, including the Topic\\({string}), Tag\\({string}), Key\\({string}), MessageGroup\\({string}), and Body\\({string})")
public void createMessagesIncludingTheTopicTagKeyMessageGroupAndBody(String arg0, String arg1, String arg2, String arg3, String arg4, String arg5) {

}

@And("Check the pulled messages that can be retried except the first one")
public void checkThePulledMessagesThatCanBeRetriedExceptTheFirstOne() {

}

@And("Check the number of retried messages equals to {int}")
public void checkTheNumberOfRetriedMessagesEqualsTo(int arg0) {
}

@And("SimpleConsumer invokes the method receive\\(maxMessageNum:{string}, invisibleDuration:{string}) {string}")
public void simpleconsumerInvokesReceiveMaxMessageNumInvisibleDuration(String arg0, String arg1, String arg2) {

}


@Then("Check the duration between each two retrying consumptions equals to {int}s")
public void checkTheDurationBetweenEachTwoRetryingConsumptionsEqualsToS(int arg0) {

}

@And("SimpleConsumer returns an ack within {string}")
public void simpleconsumerReturnsAnAckWithin(String arg0) {

}

@And("Set a failed assertion with {string}")
public void setAFailedAssertionWith(String arg0) {

}

@And("Check a new {string} messages are pulled {string} and acked {string} by SimpleConsumer within {int}s {string}")
public void checkANewMessagesArePulledAndAckedBySimpleConsumerWithinS(String arg0, String arg1, String arg2, int arg3, String arg4) {

}

@And("SimpleConsumer invokes receive\\(maxMessageNum:{string}, invisibleDuration:{string}, changeInvisibleDuration:{string}) {string} without returning ack")
public void simpleconsumerInvokesReceiveMaxMessageNumInvisibleDurationChangeInvisibleDurationWithoutReturningAck(String arg0, String arg1, String arg2, String arg3) {

}

@And("SimpleConsumer invokes receive\\(maxMessageNum:{string}, invisibleDuration:{string}) during next consumption and returns an ack")
public void simpleconsumerInvokesReceiveMaxMessageNumInvisibleDurationDuringNextConsumptionAndReturnsAnAck(String arg0, String arg1) {

}

@Then("Check SimpleConsumer pulls the message and returns an ack within {int}s but over {int}s")
public void checkSimpleConsumerPullsTheMessageAndReturnsAnAckWithinSButOverS(int arg0, int arg1) {

}

@And("SimpleConsumer invokes receive\\(maxMessageNum:{string}, invisibleDuration:{string}) during next consumption")
public void simpleconsumerInvokesReceiveMaxMessageNumInvisibleDurationDuringNextConsumption(String arg0, String arg1) {

}

@Then("Set SimpleConsumer changeInvisibleDuration\\({string})")
public void setSimpleConsumerChangeInvisibleDuration(String arg0) {

}

@And("Check changeInvisibleDuration after the ack is failed")
public void checkChangeInvisibleDurationAfterTheAckIsFailed() {

}

@And("Check the consumption is failed")
public void checkTheConsumptionIsFailed() {

}

@And("SimpleConsumer invokes the method receive\\(maxMessageNum:{string}, invisibleDuration:{string}) {string}")
public void simpleconsumerInvokesTheMethodReceiveMaxMessageNumInvisibleDuration(String arg0, String arg1, String arg2) {

}

@Then("SimpleConsumer returns acks {string}")
public void simpleconsumerReturnsAcks(String arg0) {
}

@Then("Check all {string} messages are received and acked within {int}s {string}")
public void checkAllMessagesAreReceivedAndAckedWithinS(String arg0, int arg1, String arg2) {
}

@Then("SimpleConsumer returns acks for all received messages except the first one with Body\\({string})")
public void simpleconsumerReturnsAcksForAllReceivedMessagesExceptTheFirstOneWithBody(String arg0) {

}

@Then("Check only the first {string} messages with Body\\({string}) that can be received")
public void checkOnlyTheFirstMessagesWithBodyThatCanBeReceived(String arg0, String arg1) {
}

@And("Check all received messages that can be consumed again {string}")
public void checkAllReceivedMessagesThatCanBeConsumedAgain(String arg0) {
}

@And("Check the number of consumptions equals to {int}")
public void checkTheNumberOfConsumptionsEqualsTo(int arg0) {
}

@And("SimpleConsumer returns an ack after DeliveryAttempt\\({int})")
public void simpleconsumerReturnsAnAckAfterDeliveryAttempt(int arg0) {
}

@And("Check all messages that can be consumed and acked within {int}s")
public void checkAllMessagesThatCanBeConsumedAndAckedWithinS(int arg0) {

}

@Then("SimpleConsumer returns acks for all received messages except the first one")
public void simpleconsumerReturnsAcksForAllReceivedMessagesExceptTheFirstOne() {
}

@And("Check only the first message is not acked")
public void checkOnlyTheFirstMessageIsNotAcked() {
}

@And("SimpleConsumer returns acks for all received messages")
public void simpleconsumerReturnsAcksForAllReceivedMessages() {
}

@And("SimpleConsumer returns an ack when DeliveryAttempt value equals {int}")
public void simpleconsumerReturnsAnAckWhenDeliveryAttemptValueEquals(int arg0) {
}

@Then("Check SimpleConsumer receives only up to {int} messages once")
public void checkSimpleConsumerReceivesOnlyUpToMessagesOnce(int arg0) {
}

@And("Check no acked messages that can be consumed again")
public void checkNoAckedMessagesThatCanBeConsumedAgain() {

}

@Then("SimpleConsumer returns ack for all received messages except the first one")
public void simpleconsumerReturnsAckForAllReceivedMessagesExceptTheFirstOne() {
}

@And("SimpleConsumer waits for {int}s after receiving the messages")
public void simpleconsumerWaitsForSAfterReceivingTheMessages(int arg0) {

}

@Then("SimpleConsumer returns an ack")
public void simpleconsumerReturnsAnAck() {
}
}
52 changes: 52 additions & 0 deletions bdd/src/main/resources/server/abnormal/PushConsumerRetry.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Feature: Test retry of PushConsumer

Scenario: The normal message is sent, and after the PushConsumer partial retry, the retry message is expected to be consumed
Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
When Create a "PushConsumer", set the ClientConfiguration(Endpoint:"127.0.0.1:9876"), ConsumerGroup("random-group"), SubscriptionExpressions("random-topic", "random-FilterExpression"), ConsumptionThreadCount(20)
And Set PushConsumer Listener, “half” of the number of pre-sent messages are consumed successful and return SUCCESS, another "half" are consumed failed and return FAILURE
And Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
Then Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body")
And Send "10" messages "synchronous"
Then Check all messages send "successfully"
Then Check NoRetryMessages equals to RetryMessages and equals to "half" of the number of pre-sent messages
And Check all messages are contained in retryMessages or noRetryMessages
And Shutdown the producer and consumer if they are started

Scenario: The send order message, after the PushConsumer partial retry, is expected to consume the retry message, and the message consumption order and send order
Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
When Create a "PushConsumer", set the ClientConfiguration(Endpoint:"127.0.0.1:9876"), ConsumerGroup("random-group"), SubscriptionExpressions("random-topic", "random-FilterExpression"), ConsumptionThreadCount(20), and MessageListener("default")
And Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
Then Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), Body("Body"), and MessageGroup("group")
And Send "10" messages "synchronous"
Then Check all messages send "successfully"
And Wait until all the messages that can be consumed
And Check the order of received messages consistent with the order of pre-sent messages
And Shutdown the producer and consumer if they are started

Scenario: Send sequential messages, using three Shardingkeys, after partial retries, expect to consume retry messages, and the order of message consumption and the order of message delivery
Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
When Create a "PushConsumer", set the ClientConfiguration(Endpoint:"127.0.0.1:9876"), ConsumerGroup("random-group"), SubscriptionExpressions("random-topic", "random-FilterExpression"), ConsumptionThreadCount(20), and MessageListener("default")
And Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic")
Then Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), Body("Body"), and MessageGroup("group")
And Send "10" messages "synchronous"
Then Check all messages send "successfully"
And Wait until all the messages that can be consumed
Then Separate the messages into 3 ShardingKeyGroups according to messageGroup
And Check the message order in each ShardingKeyGroup
And Shutdown the producer and consumer if they are started

38 changes: 38 additions & 0 deletions bdd/src/main/resources/simple/SimpleAck.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Feature: Test SimpleConsumer pulls normal messages and returns acks synchronously or asynchronously

Scenario Outline: Send 20 normal messages synchronously and expect to consume with receive()/receiveAsync() and ack()/ackAsync() messages successfully
Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic")
And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s")
And Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body")
And Send "20" messages "synchronously"
Then Check all messages send "successfully"
And SimpleConsumer invokes the method receive(maxMessageNum:"1", invisibleDuration:"10s") "<TransmissionMode>"
Then SimpleConsumer returns acks "<AckMode>"
Then Check all "Normal" messages are received and acked within 90s "successfully"
And Shutdown the producer and consumer if they are started

Examples:
| TransmissionMode | AckMode |
| synchronously | synchronously |
| asynchronously | synchronously |
| synchronously | asynchronously |
| asynchronously | asynchronously |



52 changes: 52 additions & 0 deletions bdd/src/main/resources/simple/SimpleMsgType.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Feature: Test SimpleConsumer pulls and ack order/delay/transaction messages synchronously properly

Scenario: Send 20 order messages synchronously, expect SimpleConsumer to receive() and ack() messages properly and the order to be maintained
Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic")
And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s")
And Create "a" messages, including the Topic("Topic"), Tag("Tag"), Key("Key"), MessageGroup("Group"), and Body("Body")
And Send "20" messages "synchronously"
Then Check all messages send "successfully"
And SimpleConsumer invokes the method receive(maxMessageNum:"1", invisibleDuration:"10s") "synchronously"
Then SimpleConsumer returns acks "synchronously"
Then Check all "FIFO" messages are received and acked within 90s "successfully"
And Shutdown the producer and consumer if they are started

Scenario: Send 10 delay messages synchronously, expect SimpleConsumer to receive() and ack() messages properly
Given Create a "Delay" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic")
And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s")
And Create a message, including the Topic("random-topic"), Tag("TagA"), Body("Body"), deliveryTimestamp("10s")
And Send "10" messages "synchronously"
Then Check all messages send "successfully"
And SimpleConsumer invokes the method receive(maxMessageNum:"1", invisibleDuration:"10s") "synchronously"
Then SimpleConsumer returns acks "synchronously"
Then Check all "Delay" messages are received and acked within 90s "successfully"
And Shutdown the producer and consumer if they are started

Scenario: Send 10 transaction messages synchronously, expect SimpleConsumer to receive() and ack() messages properly
Given Create a "Transaction" topic:"random-topic" if not exist, a "Concurrently" group:"random-group"
When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic")
And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s")
And Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body")
And Send "10" messages "synchronously"
Then Check all messages send "successfully"
And SimpleConsumer invokes the method receive(maxMessageNum:"1", invisibleDuration:"10s") "synchronously"
Then SimpleConsumer returns acks "synchronously"
Then Check all "Transaction" messages are received and acked within 90s "successfully"
And Shutdown the producer and consumer if they are started
Loading

0 comments on commit 364fc9a

Please sign in to comment.