From 364fc9a2fdd4d7de6b7abb857f889f7069299ff2 Mon Sep 17 00:00:00 2001 From: alani <97888618+nannananananana@users.noreply.github.com> Date: Tue, 20 Jun 2023 10:46:08 +0800 Subject: [PATCH] [ISSUE#43]Add 4 features under simple directory (#44) * features * Delete ConsumerGroup.feature * features * Delete bdd/src/main/resources/consumer directory * features * features * finish consumer and producer features in client, 2 sql filter features and a tag filter feature * fix a problem in SimpleConsumerInitTest.java * fix some problems in features * update * Add 4 features of SimpleConsumers scenarios * Add 4 features of SimpleConsumers scenarios --------- Co-authored-by: alani --- .../apache/rocketmq/ClientInitStepdefs.java | 174 ++++++++++++++++-- .../server/abnormal/PushConsumerRetry.feature | 52 ++++++ .../main/resources/simple/SimpleAck.feature | 38 ++++ .../resources/simple/SimpleMsgType.feature | 52 ++++++ .../resources/simple/SimpleOrderParam.feature | 67 +++++++ .../resources/simple/SimpleParameter.feature | 103 +++++++++++ .../broker/simple/SimpleOrderParamTest.java | 1 + 7 files changed, 471 insertions(+), 16 deletions(-) create mode 100644 bdd/src/main/resources/server/abnormal/PushConsumerRetry.feature create mode 100644 bdd/src/main/resources/simple/SimpleAck.feature create mode 100644 bdd/src/main/resources/simple/SimpleMsgType.feature create mode 100644 bdd/src/main/resources/simple/SimpleOrderParam.feature create mode 100644 bdd/src/main/resources/simple/SimpleParameter.feature diff --git a/bdd/src/main/java/org/apache/rocketmq/ClientInitStepdefs.java b/bdd/src/main/java/org/apache/rocketmq/ClientInitStepdefs.java index 1e46a77..972004f 100644 --- a/bdd/src/main/java/org/apache/rocketmq/ClientInitStepdefs.java +++ b/bdd/src/main/java/org/apache/rocketmq/ClientInitStepdefs.java @@ -179,7 +179,8 @@ public void createAMessageIncludingTheTopicTagAndBody(String arg0, String arg1, @And("Create a message, including the Topic\\({string}), Tag\\({string}), Body\\({string}), deliveryTimestamp\\({string})") public void createAMessageIncludingTheTopicTagBodyDeliveryTimestamp(String arg0, String arg1, String arg2, String arg3) { - + + } @And("Create a message, including the Topic\\({string}), Tag\\({string}), Body\\({string}), messageGroup\\({string})") @@ -269,21 +270,6 @@ public void createASimpleConsumerSetTheEndpointTopicConsumerGroupFilterExpressio } - @Then("Check SimpleConsumer pull a message once") - public void checkSimpleConsumerPullAMessageOnce() { - - } - - @And("SimpleConsumer invokes receive method {string} and returns acks {string}") - public void simpleconsumerInvokesReceiveMethodAndReturnsAcks(String arg0, String arg1) { - - } - - @Then("Check all messages are pulled by SimpleConsumer {string}") - public void checkAllMessagesArePulledBySimpleConsumer(String arg0) { - - } - @When("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), ConsumerGroup\\({string}), AwaitDuration\\({string}), SubscriptionExpressions\\(NULL)") public void createASetTheClientConfigurationEndpointConsumerGroupAwaitDurationSubscriptionExpressionsNULL(String arg0, String arg1, String arg2, String arg3) { @@ -417,4 +403,160 @@ public void createASetTheTopicsAndMaxAttempts(String arg0, String arg1, int arg2 @When("Create a {string}, set the ClientConfiguration\\(Endpoint:{string}), and MaxAttempts\\({int})") public void createASetTheClientConfigurationEndpointAndMaxAttempts(String arg0, String arg1, int arg2) { } + + @Then("Check only the first message that can be pulled by SimpleConsumer") + public void checkOnlyTheFirstMessageThatCanBePulledBySimpleConsumer() { + } + + @And("Create {string} messages, including the Topic\\({string}), Tag\\({string}), Key\\({string}), MessageGroup\\({string}), and Body\\({string})") + public void createMessagesIncludingTheTopicTagKeyMessageGroupAndBody(String arg0, String arg1, String arg2, String arg3, String arg4, String arg5) { + + } + + @And("Check the pulled messages that can be retried except the first one") + public void checkThePulledMessagesThatCanBeRetriedExceptTheFirstOne() { + + } + + @And("Check the number of retried messages equals to {int}") + public void checkTheNumberOfRetriedMessagesEqualsTo(int arg0) { + } + + @And("SimpleConsumer invokes the method receive\\(maxMessageNum:{string}, invisibleDuration:{string}) {string}") + public void simpleconsumerInvokesReceiveMaxMessageNumInvisibleDuration(String arg0, String arg1, String arg2) { + + } + + + @Then("Check the duration between each two retrying consumptions equals to {int}s") + public void checkTheDurationBetweenEachTwoRetryingConsumptionsEqualsToS(int arg0) { + + } + + @And("SimpleConsumer returns an ack within {string}") + public void simpleconsumerReturnsAnAckWithin(String arg0) { + + } + + @And("Set a failed assertion with {string}") + public void setAFailedAssertionWith(String arg0) { + + } + + @And("Check a new {string} messages are pulled {string} and acked {string} by SimpleConsumer within {int}s {string}") + public void checkANewMessagesArePulledAndAckedBySimpleConsumerWithinS(String arg0, String arg1, String arg2, int arg3, String arg4) { + + } + + @And("SimpleConsumer invokes receive\\(maxMessageNum:{string}, invisibleDuration:{string}, changeInvisibleDuration:{string}) {string} without returning ack") + public void simpleconsumerInvokesReceiveMaxMessageNumInvisibleDurationChangeInvisibleDurationWithoutReturningAck(String arg0, String arg1, String arg2, String arg3) { + + } + + @And("SimpleConsumer invokes receive\\(maxMessageNum:{string}, invisibleDuration:{string}) during next consumption and returns an ack") + public void simpleconsumerInvokesReceiveMaxMessageNumInvisibleDurationDuringNextConsumptionAndReturnsAnAck(String arg0, String arg1) { + + } + + @Then("Check SimpleConsumer pulls the message and returns an ack within {int}s but over {int}s") + public void checkSimpleConsumerPullsTheMessageAndReturnsAnAckWithinSButOverS(int arg0, int arg1) { + + } + + @And("SimpleConsumer invokes receive\\(maxMessageNum:{string}, invisibleDuration:{string}) during next consumption") + public void simpleconsumerInvokesReceiveMaxMessageNumInvisibleDurationDuringNextConsumption(String arg0, String arg1) { + + } + + @Then("Set SimpleConsumer changeInvisibleDuration\\({string})") + public void setSimpleConsumerChangeInvisibleDuration(String arg0) { + + } + + @And("Check changeInvisibleDuration after the ack is failed") + public void checkChangeInvisibleDurationAfterTheAckIsFailed() { + + } + + @And("Check the consumption is failed") + public void checkTheConsumptionIsFailed() { + + } + + @And("SimpleConsumer invokes the method receive\\(maxMessageNum:{string}, invisibleDuration:{string}) {string}") + public void simpleconsumerInvokesTheMethodReceiveMaxMessageNumInvisibleDuration(String arg0, String arg1, String arg2) { + + } + + @Then("SimpleConsumer returns acks {string}") + public void simpleconsumerReturnsAcks(String arg0) { + } + + @Then("Check all {string} messages are received and acked within {int}s {string}") + public void checkAllMessagesAreReceivedAndAckedWithinS(String arg0, int arg1, String arg2) { + } + + @Then("SimpleConsumer returns acks for all received messages except the first one with Body\\({string})") + public void simpleconsumerReturnsAcksForAllReceivedMessagesExceptTheFirstOneWithBody(String arg0) { + + } + + @Then("Check only the first {string} messages with Body\\({string}) that can be received") + public void checkOnlyTheFirstMessagesWithBodyThatCanBeReceived(String arg0, String arg1) { + } + + @And("Check all received messages that can be consumed again {string}") + public void checkAllReceivedMessagesThatCanBeConsumedAgain(String arg0) { + } + + @And("Check the number of consumptions equals to {int}") + public void checkTheNumberOfConsumptionsEqualsTo(int arg0) { + } + + @And("SimpleConsumer returns an ack after DeliveryAttempt\\({int})") + public void simpleconsumerReturnsAnAckAfterDeliveryAttempt(int arg0) { + } + + @And("Check all messages that can be consumed and acked within {int}s") + public void checkAllMessagesThatCanBeConsumedAndAckedWithinS(int arg0) { + + } + + @Then("SimpleConsumer returns acks for all received messages except the first one") + public void simpleconsumerReturnsAcksForAllReceivedMessagesExceptTheFirstOne() { + } + + @And("Check only the first message is not acked") + public void checkOnlyTheFirstMessageIsNotAcked() { + } + + @And("SimpleConsumer returns acks for all received messages") + public void simpleconsumerReturnsAcksForAllReceivedMessages() { + } + + @And("SimpleConsumer returns an ack when DeliveryAttempt value equals {int}") + public void simpleconsumerReturnsAnAckWhenDeliveryAttemptValueEquals(int arg0) { + } + + @Then("Check SimpleConsumer receives only up to {int} messages once") + public void checkSimpleConsumerReceivesOnlyUpToMessagesOnce(int arg0) { + } + + @And("Check no acked messages that can be consumed again") + public void checkNoAckedMessagesThatCanBeConsumedAgain() { + + } + + @Then("SimpleConsumer returns ack for all received messages except the first one") + public void simpleconsumerReturnsAckForAllReceivedMessagesExceptTheFirstOne() { + } + + @And("SimpleConsumer waits for {int}s after receiving the messages") + public void simpleconsumerWaitsForSAfterReceivingTheMessages(int arg0) { + + } + + @Then("SimpleConsumer returns an ack") + public void simpleconsumerReturnsAnAck() { + } } diff --git a/bdd/src/main/resources/server/abnormal/PushConsumerRetry.feature b/bdd/src/main/resources/server/abnormal/PushConsumerRetry.feature new file mode 100644 index 0000000..be043fe --- /dev/null +++ b/bdd/src/main/resources/server/abnormal/PushConsumerRetry.feature @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Feature: Test retry of PushConsumer + + Scenario: The normal message is sent, and after the PushConsumer partial retry, the retry message is expected to be consumed + Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" + When Create a "PushConsumer", set the ClientConfiguration(Endpoint:"127.0.0.1:9876"), ConsumerGroup("random-group"), SubscriptionExpressions("random-topic", "random-FilterExpression"), ConsumptionThreadCount(20) + And Set PushConsumer Listener, “half” of the number of pre-sent messages are consumed successful and return SUCCESS, another "half" are consumed failed and return FAILURE + And Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic") + Then Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body") + And Send "10" messages "synchronous" + Then Check all messages send "successfully" + Then Check NoRetryMessages equals to RetryMessages and equals to "half" of the number of pre-sent messages + And Check all messages are contained in retryMessages or noRetryMessages + And Shutdown the producer and consumer if they are started + + Scenario: The send order message, after the PushConsumer partial retry, is expected to consume the retry message, and the message consumption order and send order + Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" + When Create a "PushConsumer", set the ClientConfiguration(Endpoint:"127.0.0.1:9876"), ConsumerGroup("random-group"), SubscriptionExpressions("random-topic", "random-FilterExpression"), ConsumptionThreadCount(20), and MessageListener("default") + And Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic") + Then Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), Body("Body"), and MessageGroup("group") + And Send "10" messages "synchronous" + Then Check all messages send "successfully" + And Wait until all the messages that can be consumed + And Check the order of received messages consistent with the order of pre-sent messages + And Shutdown the producer and consumer if they are started + + Scenario: Send sequential messages, using three Shardingkeys, after partial retries, expect to consume retry messages, and the order of message consumption and the order of message delivery + Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" + When Create a "PushConsumer", set the ClientConfiguration(Endpoint:"127.0.0.1:9876"), ConsumerGroup("random-group"), SubscriptionExpressions("random-topic", "random-FilterExpression"), ConsumptionThreadCount(20), and MessageListener("default") + And Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("10s"), Topic("random-topic") + Then Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), Body("Body"), and MessageGroup("group") + And Send "10" messages "synchronous" + Then Check all messages send "successfully" + And Wait until all the messages that can be consumed + Then Separate the messages into 3 ShardingKeyGroups according to messageGroup + And Check the message order in each ShardingKeyGroup + And Shutdown the producer and consumer if they are started + diff --git a/bdd/src/main/resources/simple/SimpleAck.feature b/bdd/src/main/resources/simple/SimpleAck.feature new file mode 100644 index 0000000..2c6999f --- /dev/null +++ b/bdd/src/main/resources/simple/SimpleAck.feature @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Feature: Test SimpleConsumer pulls normal messages and returns acks synchronously or asynchronously + + Scenario Outline: Send 20 normal messages synchronously and expect to consume with receive()/receiveAsync() and ack()/ackAsync() messages successfully + Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" + When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic") + And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s") + And Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body") + And Send "20" messages "synchronously" + Then Check all messages send "successfully" + And SimpleConsumer invokes the method receive(maxMessageNum:"1", invisibleDuration:"10s") "" + Then SimpleConsumer returns acks "" + Then Check all "Normal" messages are received and acked within 90s "successfully" + And Shutdown the producer and consumer if they are started + + Examples: + | TransmissionMode | AckMode | + | synchronously | synchronously | + | asynchronously | synchronously | + | synchronously | asynchronously | + | asynchronously | asynchronously | + + + diff --git a/bdd/src/main/resources/simple/SimpleMsgType.feature b/bdd/src/main/resources/simple/SimpleMsgType.feature new file mode 100644 index 0000000..4ce6002 --- /dev/null +++ b/bdd/src/main/resources/simple/SimpleMsgType.feature @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Feature: Test SimpleConsumer pulls and ack order/delay/transaction messages synchronously properly + + Scenario: Send 20 order messages synchronously, expect SimpleConsumer to receive() and ack() messages properly and the order to be maintained + Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" + When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic") + And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s") + And Create "a" messages, including the Topic("Topic"), Tag("Tag"), Key("Key"), MessageGroup("Group"), and Body("Body") + And Send "20" messages "synchronously" + Then Check all messages send "successfully" + And SimpleConsumer invokes the method receive(maxMessageNum:"1", invisibleDuration:"10s") "synchronously" + Then SimpleConsumer returns acks "synchronously" + Then Check all "FIFO" messages are received and acked within 90s "successfully" + And Shutdown the producer and consumer if they are started + + Scenario: Send 10 delay messages synchronously, expect SimpleConsumer to receive() and ack() messages properly + Given Create a "Delay" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" + When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic") + And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s") + And Create a message, including the Topic("random-topic"), Tag("TagA"), Body("Body"), deliveryTimestamp("10s") + And Send "10" messages "synchronously" + Then Check all messages send "successfully" + And SimpleConsumer invokes the method receive(maxMessageNum:"1", invisibleDuration:"10s") "synchronously" + Then SimpleConsumer returns acks "synchronously" + Then Check all "Delay" messages are received and acked within 90s "successfully" + And Shutdown the producer and consumer if they are started + + Scenario: Send 10 transaction messages synchronously, expect SimpleConsumer to receive() and ack() messages properly + Given Create a "Transaction" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" + When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic") + And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s") + And Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body") + And Send "10" messages "synchronously" + Then Check all messages send "successfully" + And SimpleConsumer invokes the method receive(maxMessageNum:"1", invisibleDuration:"10s") "synchronously" + Then SimpleConsumer returns acks "synchronously" + Then Check all "Transaction" messages are received and acked within 90s "successfully" + And Shutdown the producer and consumer if they are started diff --git a/bdd/src/main/resources/simple/SimpleOrderParam.feature b/bdd/src/main/resources/simple/SimpleOrderParam.feature new file mode 100644 index 0000000..0d509ec --- /dev/null +++ b/bdd/src/main/resources/simple/SimpleOrderParam.feature @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Feature: Test SimpleConsumer pulls, ack and retry order messages + + Scenario: Send 20 order messages synchronously with the same MessageGroup, then SimpleConsumer consumes messages orderly with receive() but not ack() messages, expect messages to be stuck at the first message + Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" + When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic") + And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s") + And Create "20" messages, including the Topic("random-topic"), Tag("TagA"), Key("Key"), MessageGroup("group1"), and Body("1-20") + And Send "20" messages "synchronously" + Then Check all messages send "successfully" + And SimpleConsumer invokes the method receive(maxMessageNum:"3", invisibleDuration:"10s") "synchronously" + Then SimpleConsumer returns acks for all received messages except the first one with Body("1") + Then Check only the first "1" messages with Body("1") that can be received + And Shutdown the producer and consumer if they are started + +# Scenario: Send 20 order messages synchronously with eight different MessageGroups, then SimpleConsumer consumes messages orderly with receive() but not ack() messages, expect the order to be maintained +# Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" +# When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic") +# And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s") +# And Create "20" messages, including the Topic("random-topic"), Tag("TagA"), Key("Key"), MessageGroup("8 different groups"), and Body("Body") +# And Send "20" messages "synchronously" +# Then Check all messages send "successfully" +# And SimpleConsumer invokes receive(1) "synchronously" +# Then Check only the first message that can be pulled by SimpleConsumer +# And Shutdown the producer and consumer if they are started + + + Scenario: Send 20 order messages synchronously, then SimpleConsumer invokes receive(3) in batch, and all pulled messages return ack() except the first one, expect the order to be maintained and the messages to be consumed again after a certain time + Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" + When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic") + And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s") + And Create "20" messages, including the Topic("random-topic"), Tag("TagA"), Key("Key"), MessageGroup("group1"), and Body("1-20") + And Send "20" messages "synchronously" + And Check all messages send "successfully" + And SimpleConsumer invokes the method receive(maxMessageNum:"3", invisibleDuration:"10s") "synchronously" + Then SimpleConsumer returns acks for all received messages except the first one with Body("1") + And Check only the first "3" messages with Body("1-3") that can be received + And Check all received messages that can be consumed again "successfully" +# 第一条没有返回ack,因此会被重新拉取,由于发送的是顺序消息,所以后面的消息也会被重新拉取 + And Shutdown the producer and consumer if they are started +# +# Scenario: Send 20 order messages synchronously, then SimpleConsumer invokes receive(3) in batch, only ack() the first one of pulled messages, expect the order to be maintained and other messages to be consumed again after a certain time +# Given Create a "FIFO" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" +# When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic") +# And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s") +# And Create "20" messages, including the Topic("random-topic"), Tag("TagA"), Key("Key"), MessageGroup("group1"), and Body("Body") +# And Send "20" messages "synchronously" +# And Check all messages send "successfully" +# Then SimpleConsumer invokes receive(3) "synchronously" and returns acks "synchronously" for "the first one" pulled messages +# And Check the pulled messages that can be retried except the first one +# And Check the number of retried messages equals to 2 +## 如果一起拉取三条,只有第一条返回ack,剩余两条会被重试,那么viewList.stream().filter(msg -> msg.getDeliveryAttempt() == 1).count() 应该是2,原来用例是等于1,而且getDeliveryAttenmpt是从0开始计数的 +# And Shutdown the producer and consumer if they are started diff --git a/bdd/src/main/resources/simple/SimpleParameter.feature b/bdd/src/main/resources/simple/SimpleParameter.feature new file mode 100644 index 0000000..3717c67 --- /dev/null +++ b/bdd/src/main/resources/simple/SimpleParameter.feature @@ -0,0 +1,103 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Feature: Test some complex scenarios about pulling and ack of SimpleConsumer + + Scenario: Send a normal message, then the SimpleConsumer invokes receive(1,10s), and an ack is returned after three retries, expect retry times to be 3 and retry interval to be 10s + Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" + When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic") + And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s") + And Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body") + And Send "a" messages "synchronously" + Then Check all messages send "successfully" + And SimpleConsumer invokes the method receive(maxMessageNum:"1", invisibleDuration:"10s") "synchronously" + And SimpleConsumer returns an ack when DeliveryAttempt value equals 4 + Then Check the duration between each two retrying consumptions equals to 10s + And Check the number of consumptions equals to 4 + And Check all messages that can be consumed and acked within 90s + And Shutdown the producer and consumer if they are started + + Scenario: Send a normal message, then the SimpleConsumer invokes receive(1,10s), and an ack is returned within 11s section. If an INVALID_RECEIPT_HANDLE error message is displayed, then receive a new message again and return an ack successfully + Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" + When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic") + And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s") + And Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body") + And Send "a" messages "synchronously" + Then Check all messages send "successfully" + And SimpleConsumer invokes the method receive(maxMessageNum:"1", invisibleDuration:"10s") "synchronously" + And SimpleConsumer waits for 11s after receiving the messages + Then SimpleConsumer returns an ack + Then Check exceptions can be thrown + And Check a new "Normal" messages are pulled "synchronously" and acked "synchronously" by SimpleConsumer within 90s "successfully" + And Shutdown the producer and consumer if they are started + +# Scenario: Send a normal message, then the SimpleConsumer invokes receive(1,10s), then delay the invisibleTime to 20s, and consume the messages again with receive(1,30s), expect to receive and ack in 30s +# Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" +# When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic") +# And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s") +# And Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body") +# And Send "a" messages "synchronously" +# Then Check all messages send "successfully" +# And SimpleConsumer invokes receive(maxMessageNum:"1", invisibleDuration:"10s", changeInvisibleDuration:"20s") "synchronously" without returning ack +# And Check all messages that can be consumed and acked within 90s +# And SimpleConsumer invokes receive(maxMessageNum:"1", invisibleDuration:"30s") during next consumption +# And SimpleConsumer ack "a" messages +# Then Check SimpleConsumer pulls the message and returns an ack within 30s but over 20s +## 用例描述里面写的是20s内,但判断的是大于20s,小于30s +# And Shutdown the producer and consumer if they are started + +# Scenario: Send a normal message, then the SimpleConsumer invokes receive(1,10s), acks and changeInvisibleDuration, expect to change indicating illegal ReceiptHandle +# Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" +# When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic") +# And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s") +# And Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body") +# And Send "a" messages "synchronously" +# Then Check all messages send "successfully" +# And SimpleConsumer invokes receive(maxMessageNum:"1", invisibleDuration:"10s") "synchronously" +# And SimpleConsumer ack "a" messages +# Then Set SimpleConsumer changeInvisibleDuration("10s") +# And Check changeInvisibleDuration after the ack is failed +# And Check the consumption is failed +# And Shutdown the producer and consumer if they are started + + Scenario: Send 300 normal messages synchronously, then the SimpleConsumer invokes receive(50,10s), expect only up to 32 messages are consumed and acked at the same time, and no ack messages are consumed again + Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" + When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic") + And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s") + And Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body") + And Send "300" messages "synchronously" + Then Check all messages send "successfully" + And SimpleConsumer invokes the method receive(maxMessageNum:"50", invisibleDuration:"10s") "synchronously" + Then Check SimpleConsumer receives only up to 32 messages once + And SimpleConsumer returns acks for all received messages + And Check no acked messages that can be consumed again + And Check all messages that can be consumed within 60s + And Shutdown the producer and consumer if they are started + + Scenario: Send 20 normal messages synchronously, then SimpleConsumer invokes receive(50) in batch, and all pulled messages return ack() except the first one, expect the consumption not to be affected and the ack messages not to be consumed again + Given Create a "Normal" topic:"random-topic" if not exist, a "Concurrently" group:"random-group" + When Create a Producer, set the Endpoint("127.0.0.1:9876"), RequestTimeout:("random-group"), Topic("random-topic") + And Create a SimpleConsumer, set the Endpoint("127.0.0.1:9876"), Topic("random-topic"), ConsumerGroup("group"), FilterExpressions("tag"), Duration("10s") + And Create a message, including the Topic("random-topic"), Tag("TagA"), Key("Key"), and Body("Body") + And Send "20" messages "synchronously" + Then Check all messages send "successfully" + And SimpleConsumer invokes the method receive(maxMessageNum:"50", invisibleDuration:"10s") "synchronously" + Then SimpleConsumer returns ack for all received messages except the first one + And Check SimpleConsumer receives only up to 32 messages once + And Check all messages that can be consumed within 90s + And Check only the first message is not acked + And Shutdown the producer and consumer if they are started + + diff --git a/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleOrderParamTest.java b/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleOrderParamTest.java index 2351a99..09587d7 100644 --- a/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleOrderParamTest.java +++ b/java/e2e/src/test/java/org/apache/rocketmq/broker/simple/SimpleOrderParamTest.java @@ -328,6 +328,7 @@ public void run() { List viewList = entry.getValue(); long actual = viewList.stream().filter(msg -> msg.getDeliveryAttempt() == 2).count(); Assertions.assertEquals(1, actual, String.format("The number of message retries obtained was not expected, expect:%s, actual:%s", 1, actual)); +// DeliveryAttempt是从0开始计数的,判断重试应该让getDeliveryAttempt()==1, 一起拉取三条消息,第一条没重试,那重试的还剩两条,所以应该让actual==2 } } } catch (InterruptedException e) {