diff --git a/java/e2e/src/main/java/org/apache/rocketmq/factory/ClientConfigurationFactory.java b/java/e2e/src/main/java/org/apache/rocketmq/factory/ClientConfigurationFactory.java index be69114..dc0f529 100644 --- a/java/e2e/src/main/java/org/apache/rocketmq/factory/ClientConfigurationFactory.java +++ b/java/e2e/src/main/java/org/apache/rocketmq/factory/ClientConfigurationFactory.java @@ -25,18 +25,18 @@ public class ClientConfigurationFactory { public static ClientConfiguration build(Account account) { ClientConfiguration clientConfiguration; - if(account.getAclEnable()) { + if (account.getAclEnable()) { StaticSessionCredentialsProvider staticSessionCredentialsProvider = new StaticSessionCredentialsProvider(account.getAccessKey(), account.getSecretKey()); clientConfiguration = ClientConfiguration.newBuilder() - .setEndpoints(account.getEndpoint()) - .setRequestTimeout(Duration.ofSeconds(10)) - .setCredentialProvider(staticSessionCredentialsProvider) - .build(); - }else { + .setEndpoints(account.getEndpoint()) + .setRequestTimeout(Duration.ofSeconds(10)) + .setCredentialProvider(staticSessionCredentialsProvider) + .build(); + } else { clientConfiguration = ClientConfiguration.newBuilder() - .setEndpoints(account.getEndpoint()) - .setRequestTimeout(Duration.ofSeconds(10)) - .build(); + .setEndpoints(account.getEndpoint()) + .setRequestTimeout(Duration.ofSeconds(10)) + .build(); } return clientConfiguration; } diff --git a/java/e2e/src/main/java/org/apache/rocketmq/frame/ResourceInit.java b/java/e2e/src/main/java/org/apache/rocketmq/frame/ResourceInit.java index bf56c2b..0d1ec90 100644 --- a/java/e2e/src/main/java/org/apache/rocketmq/frame/ResourceInit.java +++ b/java/e2e/src/main/java/org/apache/rocketmq/frame/ResourceInit.java @@ -47,7 +47,7 @@ public class ResourceInit { protected static String endPoint = null; protected static String namesrvAddr = null; protected static Boolean aclEnable = null; - protected static String ALL_IP; + protected static String allIp; protected static String cluster; protected static List nameserverIpList = new ArrayList<>(); protected static List brokerIpList = new ArrayList<>(); @@ -102,9 +102,9 @@ private static void initAcl() { } private static void initConnectionInfo() { - ALL_IP = System.getenv("ALL_IP"); - if (ALL_IP != null) { - String[] allPodInfos = ALL_IP.split(","); + allIp = System.getenv("ALL_IP"); + if (allIp != null) { + String[] allPodInfos = allIp.split(","); for (String podInfo : allPodInfos) { if (podInfo.contains("nameserver")) { nameserverIpList.add(podInfo.substring(podInfo.indexOf(":") + 1)); diff --git a/java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java b/java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java index 5d3a7ca..045bb9a 100644 --- a/java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java +++ b/java/e2e/src/main/java/org/apache/rocketmq/util/VerifyUtils.java @@ -94,7 +94,7 @@ public static boolean checkOrderMessage(ConcurrentHashMap enqueueMessages, - DataCollector dequeueMessages) { + DataCollector dequeueMessages) { Collection unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, TIMEOUT * 1000L, 1); if (unConsumedMessages.size() > 0) { Assertions.fail(String.format("The following %s messages are not consumed: %s", unConsumedMessages.size(), unConsumedMessages)); @@ -102,7 +102,7 @@ public static void verifyNormalMessage(DataCollector enqueueMessages, } public static void verifyNormalMessage(DataCollector enqueueMessages, - DataCollector dequeueMessages, Set unconsumedMsgIds, int timeout) { + DataCollector dequeueMessages, Set unconsumedMsgIds, int timeout) { Collection unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, timeout * 1000L, 1); // if (unConsumedMessages.size() == 0) { // Assertions.fail(String.format("Messages are all consumed")); @@ -130,7 +130,7 @@ public static void verifyNormalMessage(DataCollector enqueueMessages, } public static void verifyNormalMessage(DataCollector enqueueMessages, - DataCollector dequeueMessages, int timeout) { + DataCollector dequeueMessages, int timeout) { Collection unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, timeout * 1000L, 1); if (unConsumedMessages.size() > 0) { Assertions.fail(String.format("The following %s messages are not consumed: %s", unConsumedMessages.size(), unConsumedMessages)); @@ -145,7 +145,7 @@ public static void verifyNormalMessage(DataCollector enqueueMessages, * @param messageBody Body of message */ public static void verifyNormalMessageWithBody(DataCollector enqueueMessages, - DataCollector dequeueMessages, String messageBody) { + DataCollector dequeueMessages, String messageBody) { Collection unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, TIMEOUT * 1000L, 1); if (unConsumedMessages.size() > 0) { Assertions.fail(String.format("The following %s messages are not consumed: %s", unConsumedMessages.size(), unConsumedMessages)); @@ -165,7 +165,7 @@ public static void verifyNormalMessageWithBody(DataCollector enqueueMess * @param dequeueMessages Consume the outgoing message set */ public static void verifyOrderMessage(DataCollector enqueueMessages, - DataCollector dequeueMessages) { + DataCollector dequeueMessages) { //Check whether the consumption is complete Collection unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, TIMEOUT * 1000L, 1); if (unConsumedMessages.size() > 0) { @@ -183,10 +183,10 @@ public static void verifyOrderMessage(DataCollector enqueueMessages, * @param delayTime Estimated consumption time required */ public static void verifyDelayMessage(DataCollector enqueueMessages, - DataCollector dequeueMessages, int delayTime) { + DataCollector dequeueMessages, int delayTime) { //Check whether the consumption is complete Collection unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, - (TIMEOUT + delayTime) * 1000L, 1); + (TIMEOUT + delayTime) * 1000L, 1); if (unConsumedMessages.size() > 0) { Assertions.fail(String.format("The following %s messages are not consumed: %s", unConsumedMessages.size(), unConsumedMessages)); } @@ -207,8 +207,8 @@ public static void verifyDelayMessage(DataCollector enqueueMessages, * @param count The amount that is not consumed */ public static void verifyDelayMessageWithUnConsumeCount(DataCollector enqueueMessages, - DataCollector dequeueMessages, int delayTime, - int count) { + DataCollector dequeueMessages, int delayTime, + int count) { //Check whether the consumption is complete Collection unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, (TIMEOUT + delayTime) * 1000L, 1); if (unConsumedMessages.size() > count) { @@ -222,7 +222,7 @@ public static void verifyDelayMessageWithUnConsumeCount(DataCollector en SimpleDateFormat date = new SimpleDateFormat("ss"); for (String msg : delayUnExcept.keySet()) { sb.append(msg).append(" , interval:").append("Difference between" + date.format(new Date(Long.parseLong(String.valueOf(delayUnExcept.get(msg))))) + "s").append( - "\n"); + "\n"); } Assertions.assertEquals(0, delayUnExcept.size(), sb.toString()); } @@ -234,8 +234,8 @@ public static void verifyDelayMessageWithUnConsumeCount(DataCollector en * @param reconsumeTime Number of retries */ public static void verifyDelayMessageWithReconsumeTimes(DataCollector enqueueMessages, - DataCollector dequeueMessages, int delayTime, - int reconsumeTime) { + DataCollector dequeueMessages, int delayTime, + int reconsumeTime) { int flexibleTime = TIMEOUT; if (reconsumeTime == 1) { flexibleTime = flexibleTime + 10; @@ -262,8 +262,8 @@ public static void verifyDelayMessageWithReconsumeTimes(DataCollector en } public static void verifyNormalMessageWithReconsumeTimes(DataCollector enqueueMessages, - DataCollector dequeueMessages, - int reconsumeTime) { + DataCollector dequeueMessages, + int reconsumeTime) { int flexibleTime = TIMEOUT; if (reconsumeTime == 1) { flexibleTime = flexibleTime + 10; @@ -289,7 +289,7 @@ public static void verifyNormalMessageWithReconsumeTimes(DataCollector e * @param consumedTimes The number of repeated purchases */ public static void verifyRetryConsume(DataCollector enqueueMessages, - DataCollector dequeueAllMessages, int consumedTimes) { + DataCollector dequeueAllMessages, int consumedTimes) { Collection unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueAllMessages, TIMEOUT * 1000L, consumedTimes); if (unConsumedMessages.size() > 0) { Assertions.fail(String.format("The following %s messages are not consumed: %s", unConsumedMessages.size(), unConsumedMessages)); @@ -303,7 +303,7 @@ public static void verifyRetryConsume(DataCollector enqueueMessages, * @param dequeueMessages Consume the outgoing message set */ public static void checkTransactionMessage(DataCollector enqueueMessages, - DataCollector dequeueMessages) { + DataCollector dequeueMessages) { Collection unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, TIMEOUT * 1000L, 1); if (unConsumedMessages.size() > 0) { Assertions.fail(String.format("The following %s messages are not consumed: %s", unConsumedMessages.size(), unConsumedMessages)); @@ -316,7 +316,7 @@ private static HashMap checkDelay(DataCollector dequeueMes long consumeTime = System.currentTimeMillis(); for (Object receivedMessage : receivedMessages) { MessageView messageView = (MessageView) receivedMessage; - Assertions.assertTrue(messageView.getDeliveryTimestamp().isPresent(),"DeliveryTimestamp is empty"); + Assertions.assertTrue(messageView.getDeliveryTimestamp().isPresent(), "DeliveryTimestamp is empty"); //Check the current time and the distribution time. If the difference is within 5s, the requirements are met long bornTimestamp = messageView.getBornTimestamp(); if (Math.abs((consumeTime - bornTimestamp) / 1000 - delayTimeSec) > 5) { @@ -361,8 +361,8 @@ private static boolean checkOrder(DataCollector dequeueMessages) { * @return A collection of messages that are not consumed */ private static Collection waitForMessageConsume(DataCollector enqueueMessages, - DataCollector dequeueMessages, - Long timeoutMills, Integer consumedTimes) { + DataCollector dequeueMessages, + Long timeoutMills, Integer consumedTimes) { log.info("Set timeout: {}ms", timeoutMills); Collection sendMessages = new ArrayList<>(enqueueMessages.getAllData()); @@ -378,16 +378,16 @@ private static Collection waitForMessageConsume(DataCollector en // String messageId = (MessageView) message; long msgCount = receivedMessagesCopy.stream().filter( - msg -> { - MessageView messageView = (MessageView) msg; - return messageView.getMessageId().toString().equals(enqueueMessageId); - }).count(); + msg -> { + MessageView messageView = (MessageView) msg; + return messageView.getMessageId().toString().equals(enqueueMessageId); + }).count(); if (msgCount > 0 && getRepeatedTimes(receivedMessagesCopy, enqueueMessageId) == consumedTimes) { iter.remove(); } else if (getRepeatedTimes(receivedMessagesCopy, enqueueMessageId) > consumedTimes) { Assertions.fail( - String.format("More retry messages were consumed than expected (including one original message) Except:%s, Actual:%s, MsgId:%s", consumedTimes, getRepeatedTimes(receivedMessagesCopy, enqueueMessageId), - enqueueMessageId)); + String.format("More retry messages were consumed than expected (including one original message) Except:%s, Actual:%s, MsgId:%s", consumedTimes, getRepeatedTimes(receivedMessagesCopy, enqueueMessageId), + enqueueMessageId)); //log.error("More retry messages were consumed than expected, Except:{}, Actual:{}", consumedTimes, getRepeatedTimes(receivedMessagesCopy, message)); } } @@ -397,7 +397,7 @@ private static Collection waitForMessageConsume(DataCollector en if (System.currentTimeMillis() - currentTime >= timeoutMills) { log.error("Timeout but not received all send messages,topic:{}, send {} , recv {}, not received msg: {}\n received msg:{}\n", - dequeueMessages.getDataSize() > 0 ? ((MessageView) dequeueMessages.getFirstElement()).getTopic() : null, enqueueMessages.getDataSize(), receivedMessagesCopy.size(), sendMessages, receivedMessagesCopy); + dequeueMessages.getDataSize() > 0 ? ((MessageView) dequeueMessages.getFirstElement()).getTopic() : null, enqueueMessages.getDataSize(), receivedMessagesCopy.size(), sendMessages, receivedMessagesCopy); break; } TestUtils.waitForMoment(500L); @@ -424,7 +424,7 @@ private static synchronized int getRepeatedTimes(Collection recvMsgs, St * @param props The desired attribute condition is not met */ public static void verifyNormalMessageWithUserProperties(DataCollector enqueueMessages, - DataCollector dequeueMessages, HashMap props, int expectedUnrecvMsgNum) { + DataCollector dequeueMessages, HashMap props, int expectedUnrecvMsgNum) { Collection unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, TIMEOUT * 1000L, 1); Collection recvMsgs = dequeueMessages.getAllData(); for (Object unConsumedMessage : recvMsgs) { @@ -451,7 +451,7 @@ public static void verifyNormalMessageWithUserProperties(DataCollector e */ @SafeVarargs public static void verifyClusterConsume(DataCollector enqueueMessages, - DataCollector... dequeueAllMessages) { + DataCollector... dequeueAllMessages) { long currentTime = System.currentTimeMillis(); List sendMessagesCopy = new ArrayList<>(enqueueMessages.getAllData()); @@ -528,7 +528,7 @@ public static void tryReceiveOnce(SimpleConsumer consumer, Boolean useExistTopic for (MessageView messageView : messageViews) { receivedIndex.getAndIncrement(); log.info("MessageId:{}, Body:{}, Property:{}, Index:{}, Retry:{}", messageView.getMessageId(), - StandardCharsets.UTF_8.decode(messageView.getBody()), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); + StandardCharsets.UTF_8.decode(messageView.getBody()), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); try { consumer.ack(messageView); } catch (ClientException e) { @@ -549,7 +549,7 @@ public static void tryReceiveOnce(SimpleConsumer consumer, Boolean useExistTopic } public static void waitReceiveThenAckAsync(RMQNormalProducer producer, SimpleConsumer consumer, int maxMessageNum, - Duration invisibleDuration) { + Duration invisibleDuration) { long endTime = System.currentTimeMillis() + TIMEOUT * 1000; List runnables = new ArrayList<>(); Collection sendCollection = Collections.synchronizedCollection(producer.getEnqueueMessages().getAllData()); @@ -564,7 +564,7 @@ public void run() { for (MessageView messageView : messageViews) { receivedIndex.getAndIncrement(); log.info("MessageId:{}, Body:{}, tag:{}, Property:{}, Index:{}, Retry:{}", messageView.getMessageId(), - StandardCharsets.UTF_8.decode(messageView.getBody()).toString(), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); + StandardCharsets.UTF_8.decode(messageView.getBody()).toString(), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); CompletableFuture future = consumer.ackAsync(messageView); future.thenAccept(new Consumer() { @Override @@ -608,7 +608,7 @@ public Void apply(Throwable throwable) { } public static void waitReceiveThenAck(RMQNormalProducer producer, SimpleConsumer consumer, int maxMessageNum, - Duration invisibleDuration) { + Duration invisibleDuration) { long endTime = System.currentTimeMillis() + TIMEOUT * 1000; List runnables = new ArrayList<>(8); @@ -624,7 +624,7 @@ public void run() { for (MessageView messageView : messageViews) { receivedIndex.getAndIncrement(); log.info("MessageId:{}, Body:{}, tag:{}, Property:{}, Index:{}, Retry:{}", messageView.getMessageId(), - StandardCharsets.UTF_8.decode(messageView.getBody()), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); + StandardCharsets.UTF_8.decode(messageView.getBody()), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); consumer.ack(messageView); sendCollection.removeIf(sendMessageId -> sendMessageId.equals(messageView.getMessageId().toString())); } @@ -650,8 +650,8 @@ public void run() { } public static void waitFIFOReceiveThenAck(RMQNormalProducer producer, SimpleConsumer consumer, - int maxMessageNum, - Duration invisibleDuration) { + int maxMessageNum, + Duration invisibleDuration) { long endTime = System.currentTimeMillis() + TIMEOUT * 1000; Collection sendCollection = producer.getEnqueueMessages().getAllData(); ConcurrentHashMap> map = new ConcurrentHashMap<>(); @@ -662,7 +662,7 @@ public static void waitFIFOReceiveThenAck(RMQNormalProducer producer, SimpleCons for (MessageView messageView : messageViews) { receivedIndex.getAndIncrement(); log.info("MessageId:{}, Body:{}, tag:{}, Property:{}, Index:{}, Retry:{}", messageView.getMessageId(), - StandardCharsets.UTF_8.decode(messageView.getBody()), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); + StandardCharsets.UTF_8.decode(messageView.getBody()), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); consumer.ack(messageView); sendCollection.removeIf(sendMessageId -> sendMessageId.equals(messageView.getMessageId().toString())); String shardingKey = String.valueOf(messageView.getMessageGroup()); @@ -689,8 +689,8 @@ public static void waitFIFOReceiveThenAck(RMQNormalProducer producer, SimpleCons } public static void waitReceiveAsyncThenAck(RMQNormalProducer producer, SimpleConsumer consumer, - int maxMessageNum, - Duration invisibleDuration) { + int maxMessageNum, + Duration invisibleDuration) { long endTime = System.currentTimeMillis() + TIMEOUT * 1000; Collection sendCollection = Collections.synchronizedCollection(producer.getEnqueueMessages().getAllData()); ExecutorService executorService = Executors.newFixedThreadPool(4); @@ -707,7 +707,7 @@ public void accept(List messageViews) { for (MessageView messageView : messageViews) { receivedIndex.getAndIncrement(); log.info("MessageId:{}, Body:{}, tag:{}, Property:{}, Index:{}, Retry:{}", messageView.getMessageId(), - StandardCharsets.UTF_8.decode(messageView.getBody()), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); + StandardCharsets.UTF_8.decode(messageView.getBody()), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); try { consumer.ack(messageView); sendCollection.removeIf(sendMessageId -> sendMessageId.equals(messageView.getMessageId().toString())); @@ -738,7 +738,7 @@ public List apply(Throwable throwable) { } public static void waitReceiveAsyncThenAckAsync(RMQNormalProducer producer, SimpleConsumer consumer, - int maxMessageNum, Duration invisibleDuration) { + int maxMessageNum, Duration invisibleDuration) { long endTime = System.currentTimeMillis() + TIMEOUT * 1000; Collection sendCollection = Collections.synchronizedCollection(producer.getEnqueueMessages().getAllData()); @@ -756,7 +756,7 @@ public void accept(List messageViews) { for (MessageView messageView : messageViews) { receivedIndex.getAndIncrement(); log.info("MessageId:{}, Body:{}, tag:{}, Property:{}, Index:{}, Retry:{}", messageView.getMessageId(), - StandardCharsets.UTF_8.decode(messageView.getBody()), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); + StandardCharsets.UTF_8.decode(messageView.getBody()), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); CompletableFuture ackFuture = consumer.ackAsync(messageView); ackFuture.thenAccept(new Consumer() { @Override @@ -802,7 +802,7 @@ public List apply(Throwable throwable) { } public static void waitReceiveThenNack(RMQNormalProducer producer, SimpleConsumer consumer, int maxMessageNum, - Duration receiveInvisibleDuration, Duration changeInvisibleDuration) { + Duration receiveInvisibleDuration, Duration changeInvisibleDuration) { long endTime = System.currentTimeMillis() + TIMEOUT * 1000; Collection sendCollection = producer.getEnqueueMessages().getAllData(); @@ -813,7 +813,7 @@ public static void waitReceiveThenNack(RMQNormalProducer producer, SimpleConsume for (MessageView messageView : messageViews) { receivedIndex.getAndIncrement(); log.info("MessageId:{}, Body:{}, tag:{}, Property:{}, Index:{}, Retry:{}", messageView.getMessageId(), - StandardCharsets.UTF_8.decode(messageView.getBody()).toString(), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); + StandardCharsets.UTF_8.decode(messageView.getBody()).toString(), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); if (changeInvisibleDuration != null) { consumer.changeInvisibleDuration(messageView, changeInvisibleDuration); log.info("Change the invisibility duration of [{}] to (changeInvisibleDuration): {}", messageView.getMessageId().toString(), changeInvisibleDuration); @@ -833,7 +833,7 @@ public static void waitReceiveThenNack(RMQNormalProducer producer, SimpleConsume } public static void waitReceiveThenAck(RMQNormalProducer producer, SimpleConsumer consumer, int maxMessageNum, - Duration invisibleDuration, int consumeTimes) { + Duration invisibleDuration, int consumeTimes) { ConcurrentHashMap msgMap = new ConcurrentHashMap<>(); ConcurrentHashMap msgTimeMap = new ConcurrentHashMap<>(); List runnables = new ArrayList<>(4); @@ -851,7 +851,7 @@ public void run() { for (MessageView messageView : messageViews) { receivedIndex.getAndIncrement(); log.info("MessageId:{}, Body:{}, tag:{}, Property:{}, Index:{}, Retry:{}", messageView.getMessageId(), - StandardCharsets.UTF_8.decode(messageView.getBody()), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); + StandardCharsets.UTF_8.decode(messageView.getBody()), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); String receivedMessageId = messageView.getMessageId().toString(); //Processing repeated consuming messages @@ -903,7 +903,7 @@ public void run() { } public static void waitReceiveThenAck(RMQNormalProducer producer, SimpleConsumer consumer, int maxMessageNum, - Duration invisibleDuration, int consumeTimes, int waitTime, boolean needAck) { + Duration invisibleDuration, int consumeTimes, int waitTime, boolean needAck) { ConcurrentHashMap msgMap = new ConcurrentHashMap<>(); ConcurrentHashMap msgTimeMap = new ConcurrentHashMap<>(); List runnables = new ArrayList<>(4); @@ -921,7 +921,7 @@ public void run() { for (MessageView messageView : messageViews) { receivedIndex.getAndIncrement(); log.info("MessageId:{}, Body:{}, tag:{}, Property:{}, Index:{}, Retry:{}", messageView.getMessageId(), - StandardCharsets.UTF_8.decode(messageView.getBody()), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); + StandardCharsets.UTF_8.decode(messageView.getBody()), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); String receivedMessageId = messageView.getMessageId().toString(); //Processing repeated consuming messages @@ -989,7 +989,7 @@ public void run() { } public static void waitDelayReceiveThenAck(RMQNormalProducer producer, SimpleConsumer consumer, - int maxMessageNum, long delayTimeMillis) { + int maxMessageNum, long delayTimeMillis) { long endTime = System.currentTimeMillis() + TIMEOUT * 1000; Collection sendCollection = Collections.synchronizedCollection(producer.getEnqueueMessages().getAllData()); List runnables = new ArrayList<>(); @@ -1004,7 +1004,7 @@ public void run() { for (MessageView messageView : messageViews) { receivedIndex.getAndIncrement(); log.info("MessageId:{}, Body:{}, tag:{}, Property:{}, Index:{}, Retry:{}", messageView.getMessageId(), - StandardCharsets.UTF_8.decode(messageView.getBody()), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); + StandardCharsets.UTF_8.decode(messageView.getBody()), messageView.getTag().get(), messageView.getProperties(), receivedIndex.get(), messageView.getDeliveryAttempt()); consumer.ack(messageView); long bornTimestamp = messageView.getBornTimestamp(); long startDeliverTime = messageView.getDeliveryTimestamp().get();