Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7512] Simplify message sending at the basic layer and reduce redundant judgments #7513

Closed
wants to merge 15 commits into from
Closed
Prev Previous commit
Next Next commit
sendMessage Optimize
CLFutureX committed Oct 28, 2023
commit 2f1c31092260aaedf2b97c291c095be4eb6b2209
Original file line number Diff line number Diff line change
@@ -66,12 +66,13 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, Address
List<Message> msgList, SendMessageRequestHeader requestHeader, long timeoutMillis) {
CompletableFuture<List<SendResult>> future;
Message message;
if(requestHeader.isBatch()){
if (msgList.size() == 1) {
message = msgList.get(0);
} else {
requestHeader.setBatch(true);
message = MessageBatch.generateFromList(msgList);
MessageClientIDSetter.setUniqID(message);
((MessageBatch) message).fillBody();
} else {
message = msgList.get(0);
}
future = this.mqClientAPIFactory.getClient().sendMessageAsync(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个部分的改动似乎没有必要,原有的写法我觉得没什么问题

messageQueue.getBrokerAddr(),
@@ -143,7 +144,8 @@ public CompletableFuture<AckResult> ackMessage(ProxyContext ctx, ReceiptHandle h
}

@Override
public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx, List<ReceiptHandleMessage> handleList, String consumerGroup,
public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx, List<ReceiptHandleMessage> handleList,
String consumerGroup,
String topic, long timeoutMillis) {
List<String> extraInfoList = handleList.stream().map(message -> message.getReceiptHandle().getReceiptHandle()).collect(Collectors.toList());
return this.mqClientAPIFactory.getClient().batchAckMessageAsync(
Original file line number Diff line number Diff line change
@@ -91,12 +91,13 @@ public LocalMessageService(BrokerController brokerController, ChannelManager cha
public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, AddressableMessageQueue messageQueue,
List<Message> msgList, SendMessageRequestHeader requestHeader, long timeoutMillis) {
Message message;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

判断条件变了一下,带来什么优化了吗

if (requestHeader.isBatch()) {
if (msgList.size() == 1) {
message = msgList.get(0);
} else {
requestHeader.setBatch(true);
message = MessageBatch.generateFromList(msgList);
MessageClientIDSetter.setUniqID(message);
((MessageBatch) message).fillBody();
} else {
message = msgList.get(0);
}
RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader, ctx.getLanguage());
request.setBody(message.getBody());
@@ -171,7 +172,8 @@ public CompletableFuture<RemotingCommand> sendMessageBack(ProxyContext ctx, Rece
}

@Override
public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String brokerName, EndTransactionRequestHeader requestHeader,
public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String brokerName,
EndTransactionRequestHeader requestHeader,
long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createChannel(ctx);