Skip to content

Commit

Permalink
Add new command to check async task status in broker.
Browse files Browse the repository at this point in the history
  • Loading branch information
MatthewAden committed Jan 27, 2025
1 parent e0db654 commit 2ed6bc5
Show file tree
Hide file tree
Showing 12 changed files with 1,577 additions and 1,113 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker;

import org.apache.rocketmq.common.AsyncTask;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class AdminAsyncTaskManager {

private static final Map<String, AsyncTask> ASYNC_TASK_MAP = new ConcurrentHashMap<>();

private static final Map<String, List<String>> TASK_NAME_TO_IDS_MAP = new ConcurrentHashMap<>();

public static String createTask(String taskName) {
String taskId = UUID.randomUUID().toString();
ASYNC_TASK_MAP.put(taskId, new AsyncTask(taskName, taskId));
TASK_NAME_TO_IDS_MAP.computeIfAbsent(taskName, k -> new ArrayList<>()).add(taskId);
return taskId;
}

public static List<String> getTaskIdsByName(String taskName) {
return TASK_NAME_TO_IDS_MAP.getOrDefault(taskName, Collections.emptyList());
}

public static AsyncTask getTaskStatus(String taskId) {
return ASYNC_TASK_MAP.get(taskId);
}

public static void updateTaskStatus(String taskId, int status, String result) {
AsyncTask task = ASYNC_TASK_MAP.get(taskId);
if (task != null) {
task.setStatus(status);
task.setResult(result);
}
}

public static void removeTask(String taskId) {
AsyncTask task = ASYNC_TASK_MAP.remove(taskId);
if (task != null) {
TASK_NAME_TO_IDS_MAP.computeIfPresent(task.getTaskName(), (k, v) -> {
v.remove(taskId);
return v.isEmpty() ? null : v;
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.rocketmq.auth.authorization.exception.AuthorizationException;
import org.apache.rocketmq.auth.authorization.model.Acl;
import org.apache.rocketmq.auth.authorization.model.Resource;
import org.apache.rocketmq.broker.AdminAsyncTaskManager;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.auth.converter.AclConverter;
import org.apache.rocketmq.broker.auth.converter.UserConverter;
Expand All @@ -68,6 +69,7 @@
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.AsyncTask;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
Expand All @@ -77,6 +79,7 @@
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TaskStatus;
import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UnlockCallback;
Expand Down Expand Up @@ -145,6 +148,8 @@
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
Expand Down Expand Up @@ -408,6 +413,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
return this.listAcl(ctx, request);
case RequestCode.POP_ROLLBACK:
return this.transferPopToFsStore(ctx, request);
case RequestCode.CHECK_ASYNC_TASK_STATUS:
return this.checkAsyncTaskStatus(ctx, request);
default:
return getUnknownCmdResponse(ctx, request);
}
Expand Down Expand Up @@ -480,11 +487,14 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re
private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) {
CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue());
String taskId = AdminAsyncTaskManager.createTask("checkRocksdbCqWriteProgress");
Runnable runnable = () -> {
try {
CheckRocksdbCqWriteResult checkResult = doCheckRocksdbCqWriteProgress(ctx, request);
AdminAsyncTaskManager.updateTaskStatus(taskId, TaskStatus.SUCCESS.getValue(), JSON.toJSONString(checkResult));
LOGGER.info("checkRocksdbCqWriteProgress result: {}", JSON.toJSONString(checkResult));
} catch (Exception e) {
AdminAsyncTaskManager.updateTaskStatus(taskId, TaskStatus.ERROR.getValue(), e.getMessage());
LOGGER.error("checkRocksdbCqWriteProgress error", e);
}
};
Expand Down Expand Up @@ -3545,4 +3555,32 @@ private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, Remoting
}
return response;
}

private RemotingCommand checkAsyncTaskStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final CheckAsyncTaskStatusRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckAsyncTaskStatusRequestHeader.class);
try {
List<String> taskIds = AdminAsyncTaskManager.getTaskIdsByName(requestHeader.getTaskName());
if (taskIds == null || taskIds.isEmpty()) {
throw new RemotingCommandException("taskId not found");
}

List<AsyncTask> result = new ArrayList<>();
for (String taskId : taskIds) {
AsyncTask taskStatus = AdminAsyncTaskManager.getTaskStatus(taskId);
result.add(taskStatus);

if (taskStatus.getStatus() == TaskStatus.SUCCESS.getValue()) {
AdminAsyncTaskManager.removeTask(taskId);
}
}

RemotingCommand response = RemotingCommand.createResponseCommand(CheckAsyncTaskStatusResponseHeader.class);
response.setCode(ResponseCode.SUCCESS);
response.setBody(JSON.toJSONBytes(result));
return response;
} catch (Exception e) {
LOGGER.error("checkAsyncTaskStatus error", e);
return RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, e.getMessage());
}
}
}
Loading

0 comments on commit 2ed6bc5

Please sign in to comment.