Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix ScriptHistoryYarnStateRefreshJob and ScriptHistoryTimeoutJob get incorrect queue name #64

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/java/com/meiyou/bigwhale/common/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,12 @@ interface ErrorType {
*/
String DINGDING_ROBOT_URL = "https://oapi.dingtalk.com/robot/send?access_token=";

/**
* YARN resource manager scheduler type
*/
interface YarnResourcemanagerScheduler {

String CAPACITY_SCHEDULER = "capacityScheduler";
String FAIR_SCHEDULER = "fairScheduler";
}
}
124 changes: 124 additions & 0 deletions src/main/java/com/meiyou/bigwhale/common/pojo/SchedulerInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package com.meiyou.bigwhale.common.pojo;

import com.meiyou.bigwhale.common.Constant.YarnResourcemanagerScheduler;
import lombok.Data;

import java.io.Serializable;
import java.util.List;

/**
* YARN resource manager scheduler info
*
* <a href="https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Scheduler_API">schedulerInfo</a>
*
* Elements of the schedulerInfo object
*
* @author yangjie
*/
@Data
public class SchedulerInfo implements Serializable {

/**
* @see YarnResourcemanagerScheduler
*/
private String type;
private Float capacity;
private Float usedCapacity;
private Float maxCapacity;
private String queueName;
private List<Queue> queues;
private Health health;

/**
* Elements of the queues object for a Parent queue
*
* Elements of the queues object for a Leaf queue - contains all the elements in parent except ‘queues’ plus the following
*/
@Data
public static class Queue {
/**
* Elements of the queues object for a Parent queue
*/
private Float capacity;
private Float usedCapacity;
private Float maxCapacity;
private Float absoluteCapacity;
private Float absoluteMaxCapacity;
private Float absoluteUsedCapacity;
private Integer numApplications;
private String usedResources;
private String queueName;
private String state;
private List<Queue> queues;
private Resource resourcesUsed;
/**
* Elements of the queues object for a Leaf queue
* contains all the elements in parent except ‘queues’ plus the following
*/
private String type;
private Integer numActiveApplications;
private Integer numPendingApplications;
private Integer numContainers;
private Integer allocatedContainers;
private Integer reservedContainers;
private Integer pendingContainers;
private Integer maxApplications;
private Integer maxApplicationsPerUser;
private Integer maxActiveApplications;
private Integer maxActiveApplicationsPerUser;
private Integer userLimit;
private Float userLimitFactor;
private List<User> users;
}

/**
* Elements of the user object for users
*/
@Data
public static class User {
private String username;
private Resource resourcesUsed;
private Integer numActiveApplications;
private Integer numPendingApplications;
}

/**
* Elements of the resource object for resourcesUsed in user and queues
*/
@Data
public static class Resource {
private Integer memory;
private Integer vCores;
}

/**
* Elements of the health object in schedulerInfo
*/
@Data
public static class Health {
private Long lastrun;
private List<Operation> operationsInfo;
private List<LastRunDetail> lastRunDetails;
}

/**
* Elements of the operation object in health
*/
@Data
public static class Operation {
private String operation;
private String nodeId;
private String containerId;
private String queue;
}

/**
* Elements of the lastRunDetail object in health
*/
@Data
public static class LastRunDetail {
private String operation;
private Long count;
private Resource resources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.meiyou.bigwhale.common.Constant;
import com.meiyou.bigwhale.common.pojo.HttpYarnApp;
import com.meiyou.bigwhale.common.pojo.SchedulerInfo;
import com.meiyou.bigwhale.entity.Cluster;
import com.meiyou.bigwhale.entity.ScriptHistory;
import com.meiyou.bigwhale.service.ClusterService;
Expand Down Expand Up @@ -49,8 +50,15 @@ public void execute(JobExecutionContext jobExecutionContext) {
// Yarn资源不够时,客户端会长时间处于提交请求状态,平台无法中断此请求,故在此处再判断一次状态
if (scriptHistory.getClusterId() != null && scriptHistory.getState().equals(Constant.JobState.SUBMITTING)) {
Cluster cluster = clusterService.findById(scriptHistory.getClusterId());
// request Cluster Scheduler API for schedulerType
SchedulerInfo scheduler = YarnApiUtils.getYarnSchedulerInfo(cluster.getYarnUrl());
if (scheduler == null) {
// handle this next time.
continue;
}
String schedulerType = scheduler.getType();
String [] jobParams = scriptHistory.getJobParams().split(";");
HttpYarnApp httpYarnApp = YarnApiUtils.getActiveApp(cluster.getYarnUrl(), jobParams[0], jobParams[1], jobParams[2], 3);
HttpYarnApp httpYarnApp = YarnApiUtils.getActiveApp(cluster.getYarnUrl(), schedulerType, jobParams[0], jobParams[1], jobParams[2], 3);
if (httpYarnApp != null) {
retry = false;
scriptHistory.updateState(Constant.JobState.SUBMITTED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import com.meiyou.bigwhale.common.Constant;
import com.meiyou.bigwhale.common.pojo.HttpYarnApp;
import com.meiyou.bigwhale.common.pojo.SchedulerInfo;
import com.meiyou.bigwhale.entity.Cluster;
import com.meiyou.bigwhale.entity.ScriptHistory;
import com.meiyou.bigwhale.service.ClusterService;
import com.meiyou.bigwhale.service.ScriptHistoryService;
import com.meiyou.bigwhale.util.YarnApiUtils;
import com.meiyou.bigwhale.util.YarnUtil;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.InterruptableJob;
import org.quartz.JobExecutionContext;
Expand Down Expand Up @@ -43,21 +45,24 @@ public void execute(JobExecutionContext jobExecutionContext) {
if (scriptHistories.isEmpty()) {
continue;
}
// request Cluster Scheduler API for schedulerType
SchedulerInfo scheduler = YarnApiUtils.getYarnSchedulerInfo(cluster.getYarnUrl());
if (scheduler == null) {
continue;
}
// 请求yarn web url, 获取所有应用
List<HttpYarnApp> httpYarnApps = YarnApiUtils.getActiveApps(cluster.getYarnUrl());
// 请求出错,不清理数据
if (httpYarnApps == null) {
continue;
}
String schedulerType = scheduler.getType();
httpYarnApps.removeIf(httpYarnApp -> !httpYarnApp.getName().contains(".bw_instance_") && !httpYarnApp.getName().contains(".bw_test_instance_"));
Map<String, ScriptHistory> yarnParamsToScriptHistoryMap = new HashMap<>();
scriptHistories.forEach(scriptHistory -> {
String [] jobParams = scriptHistory.getJobParams().split(";");
String user = jobParams[0];
String queue = jobParams[1];
if (queue != null && !"root".equals(queue) && !queue.startsWith("root.")) {
queue = "root." + queue;
}
String queue = YarnUtil.getQueueName(schedulerType, jobParams[1]);
String app = jobParams[2];
String key = user + ";" + queue + ";" + app;
yarnParamsToScriptHistoryMap.put(key, scriptHistory);
Expand All @@ -82,7 +87,7 @@ public void execute(JobExecutionContext jobExecutionContext) {
}
scriptHistories.forEach(scriptHistory -> {
if (!matchIds.contains(scriptHistory.getId())) {
updateNoMatchScriptHistory(cluster.getYarnUrl(), scriptHistory);
updateNoMatchScriptHistory(cluster.getYarnUrl(), scriptHistory, schedulerType);
}
});
}
Expand All @@ -104,9 +109,9 @@ private void updateMatchScriptHistory(HttpYarnApp httpYarnApp, ScriptHistory scr
scriptHistoryService.save(scriptHistory);
}

private void updateNoMatchScriptHistory(String yarnUrl, ScriptHistory scriptHistory) {
private void updateNoMatchScriptHistory(String yarnUrl, ScriptHistory scriptHistory, String schedulerType) {
String [] jobParams = scriptHistory.getJobParams().split(";");
HttpYarnApp httpYarnApp = YarnApiUtils.getLastNoActiveApp(yarnUrl, jobParams[0], jobParams[1], jobParams[2], 3);
HttpYarnApp httpYarnApp = YarnApiUtils.getLastNoActiveApp(yarnUrl, schedulerType, jobParams[0], jobParams[1], jobParams[2], 3);
if (httpYarnApp != null) {
if ("FINISHED".equals(httpYarnApp.getState())) {
scriptHistory.updateState(httpYarnApp.getFinalStatus());
Expand Down
42 changes: 32 additions & 10 deletions src/main/java/com/meiyou/bigwhale/util/YarnApiUtils.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
package com.meiyou.bigwhale.util;

import com.alibaba.fastjson.*;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.meiyou.bigwhale.common.pojo.BackpressureInfo;
import com.meiyou.bigwhale.common.pojo.HttpYarnApp;
import com.meiyou.bigwhale.common.pojo.SchedulerInfo;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class YarnApiUtils {

Expand Down Expand Up @@ -45,10 +55,8 @@ public static List<HttpYarnApp> getActiveApps(String yarnUrl) {
* @param retries
* @return
*/
public static HttpYarnApp getActiveApp(String yarnUrl, String user, String queue, String name, int retries) {
if (queue != null && !"root".equals(queue) && !queue.startsWith("root.")) {
queue = "root." + queue;
}
public static HttpYarnApp getActiveApp(String yarnUrl, String schedulerType, String user, String queue, String name, int retries) {
queue = YarnUtil.getQueueName(schedulerType, queue);
Map<String, Object> params = new HashMap<>();
params.put("user", user);
params.put("queue", queue);
Expand Down Expand Up @@ -84,10 +92,8 @@ public static HttpYarnApp getActiveApp(String yarnUrl, String user, String queue
* @param retries 重试次数
* @return
*/
public static HttpYarnApp getLastNoActiveApp(String yarnUrl, String user, String queue, String name, int retries) {
if (queue != null && !"root".equals(queue) && !queue.startsWith("root.")) {
queue = "root." + queue;
}
public static HttpYarnApp getLastNoActiveApp(String yarnUrl, String schedulerType, String user, String queue, String name, int retries) {
queue = YarnUtil.getQueueName(schedulerType, queue);
Map<String, Object> params = new HashMap<>();
params.put("user", user);
params.put("states", "finished,killed,failed");
Expand Down Expand Up @@ -295,10 +301,26 @@ public static boolean killApp(String yarnUrl, String appId) {
return false;
}

/**
* <a href="https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Scheduler_API">cluster scheduler API</a>
*/
public static SchedulerInfo getYarnSchedulerInfo(String yarnUrl) {
OkHttpUtils.Result result = OkHttpUtils.doGet(getSchedulerUrl(yarnUrl), Collections.emptyMap(), HEADERS);
if (result.isSuccessful && StringUtils.isNotEmpty(result.content)) {
JSONObject jsonObject = JSON.parseObject(JSON.parseObject(result.content).getString("scheduler"));
return JSON.parseObject(jsonObject.getString("schedulerInfo"), SchedulerInfo.class);
}
return null;
}

private static String getAppsUrl(String yarnUrl) {
return appendUrl(yarnUrl) + "ws/v1/cluster/apps";
}

private static String getSchedulerUrl(String yarnUrl) {
return appendUrl(yarnUrl) + "ws/v1/cluster/scheduler";
}

private static String appendUrl(String url) {
if (!url.endsWith("/")) {
url += "/";
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/com/meiyou/bigwhale/util/YarnUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.meiyou.bigwhale.util;

import com.meiyou.bigwhale.common.Constant.YarnResourcemanagerScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author yangjie
*/
public class YarnUtil {

private YarnUtil() {
}

private static final Logger LOGGER = LoggerFactory.getLogger(YarnUtil.class);

public static String getQueueName(String yarnSchedulerType, String queue) {
if (YarnResourcemanagerScheduler.FAIR_SCHEDULER.equals(yarnSchedulerType) &&
queue != null && !"root".equals(queue) && !queue.startsWith("root.")) {
return "root." + queue;
}
return queue;
}
}