Skip to content

Commit

Permalink
[ISSUE #8974] Add feature switch of recalling, disable by default (#9067
Browse files Browse the repository at this point in the history
)
  • Loading branch information
imzs authored Dec 27, 2024
1 parent 2089abd commit 19393e0
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
final RecallMessageRequestHeader requestHeader =
request.decodeCommandCustomHeader(RecallMessageRequestHeader.class);

if (!brokerController.getBrokerConfig().isRecallMessageEnable()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("recall failed, operation is forbidden");
return response;
}

if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
response.setRemark("recall failed, broker service not available");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void init() throws IllegalAccessException, NoSuchFieldException {
when(brokerController.getMessageStore()).thenReturn(messageStore);
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
when(brokerConfig.getBrokerName()).thenReturn(BROKER_NAME);
when(brokerConfig.isRecallMessageEnable()).thenReturn(true);
when(brokerController.getBrokerStatsManager()).thenReturn(brokerStatsManager);
when(handlerContext.channel()).thenReturn(channel);
recallMessageProcessor = new RecallMessageProcessor(brokerController);
Expand Down Expand Up @@ -134,6 +135,14 @@ public void testHandlePutMessageResult() {
}
}

@Test
public void testProcessRequest_notEnable() throws RemotingCommandException {
when(brokerConfig.isRecallMessageEnable()).thenReturn(false);
RemotingCommand request = mockRequest(0, TOPIC, TOPIC, "id", BROKER_NAME);
RemotingCommand response = recallMessageProcessor.processRequest(handlerContext, request);
Assert.assertEquals(ResponseCode.NO_PERMISSION, response.getCode());
}

@Test
public void testProcessRequest_invalidStatus() throws RemotingCommandException {
RemotingCommand request = mockRequest(0, TOPIC, TOPIC, "id", BROKER_NAME);
Expand Down
10 changes: 10 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ public class BrokerConfig extends BrokerIdentity {

private boolean allowRecallWhenBrokerNotWriteable = true;

private boolean recallMessageEnable = false;

public String getConfigBlackList() {
return configBlackList;
}
Expand Down Expand Up @@ -1996,4 +1998,12 @@ public boolean isAllowRecallWhenBrokerNotWriteable() {
public void setAllowRecallWhenBrokerNotWriteable(boolean allowRecallWhenBrokerNotWriteable) {
this.allowRecallWhenBrokerNotWriteable = allowRecallWhenBrokerNotWriteable;
}

public boolean isRecallMessageEnable() {
return recallMessageEnable;
}

public void setRecallMessageEnable(boolean recallMessageEnable) {
this.recallMessageEnable = recallMessageEnable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public static BrokerController createAndStartBroker(String nsAddr) {
brokerConfig.setEnableCalcFilterBitMap(true);
brokerConfig.setAppendAckAsync(true);
brokerConfig.setAppendCkAsync(true);
brokerConfig.setRecallMessageEnable(true);
storeConfig.setEnableConsumeQueueExt(true);
brokerConfig.setLoadBalancePollNameServerInterval(500);
storeConfig.setStorePathRootDir(baseDir);
Expand Down

0 comments on commit 19393e0

Please sign in to comment.