Skip to content

Commit

Permalink
Optimize the code of producer
Browse files Browse the repository at this point in the history
  • Loading branch information
zkzlx committed Sep 29, 2021
1 parent afdee28 commit 3e83807
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public synchronized boolean isRunning() {
protected synchronized Object doReceive() {
if (messageExtIterator == null) {
List<MessageExt> messageExtList = consumer.poll();
if (CollectionUtils.isEmpty(messageExtList) || messageExtList.size() > 1) {
if (CollectionUtils.isEmpty(messageExtList)) {
return null;
}
messageExtIterator = messageExtList.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,23 @@ protected void handleMessageInternal(Message<?> message) {
}
((TransactionMQProducer) defaultMQProducer)
.setTransactionListener(transactionListener);
log.info("send transaction message :" + mqMessage);
if(log.isDebugEnabled()){
log.debug("send transaction message ->{}" , mqMessage);
}
sendResult = defaultMQProducer.sendMessageInTransaction(mqMessage,
message.getHeaders().get(RocketMQConst.USER_TRANSACTIONAL_ARGS));
}
else {
log.info("send message :" + mqMessage);
if(log.isDebugEnabled()){
log.debug("send message ->{}" , mqMessage);
}
sendResult = this.send(mqMessage, this.messageQueueSelector,
message.getHeaders(), message);
}
log.info("the message has sent,message={},sendResult={}",mqMessage,sendResult);
if (sendResult == null
|| !SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.error("message send fail.SendStatus is not OK ");
log.error("message send fail.SendStatus is not OK.the message={}",mqMessage);
this.doFail(message, new MessagingException(
"message send fail.SendStatus is not OK."));
}
Expand Down

0 comments on commit 3e83807

Please sign in to comment.