diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java index 7f348f79e2..6d55835183 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java @@ -139,7 +139,7 @@ public synchronized boolean isRunning() { protected synchronized Object doReceive() { if (messageExtIterator == null) { List messageExtList = consumer.poll(); - if (CollectionUtils.isEmpty(messageExtList) || messageExtList.size() > 1) { + if (CollectionUtils.isEmpty(messageExtList)) { return null; } messageExtIterator = messageExtList.iterator(); diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java index 11edb2bf17..df9bedebc0 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java @@ -169,18 +169,23 @@ protected void handleMessageInternal(Message message) { } ((TransactionMQProducer) defaultMQProducer) .setTransactionListener(transactionListener); - log.info("send transaction message :" + mqMessage); + if(log.isDebugEnabled()){ + log.debug("send transaction message ->{}" , mqMessage); + } sendResult = defaultMQProducer.sendMessageInTransaction(mqMessage, message.getHeaders().get(RocketMQConst.USER_TRANSACTIONAL_ARGS)); } else { - log.info("send message :" + mqMessage); + if(log.isDebugEnabled()){ + log.debug("send message ->{}" , mqMessage); + } sendResult = this.send(mqMessage, this.messageQueueSelector, message.getHeaders(), message); } + log.info("the message has sent,message={},sendResult={}",mqMessage,sendResult); if (sendResult == null || !SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { - log.error("message send fail.SendStatus is not OK "); + log.error("message send fail.SendStatus is not OK.the message={}",mqMessage); this.doFail(message, new MessagingException( "message send fail.SendStatus is not OK.")); }