From a91c0115a45182d310e8c43a136f344d0c0bc1eb Mon Sep 17 00:00:00 2001
From: yangsanity <471419897@qq.com>
Date: Fri, 2 Jul 2021 19:42:30 +0800
Subject: [PATCH] fix ScriptHistoryYarnStateRefreshJob and
 ScriptHistoryTimeoutJob: get correct queue name According to different
 CapacityScheduler or FairScheduler

---
 .../com/meiyou/bigwhale/common/Constant.java  |   8 ++
 .../bigwhale/common/pojo/SchedulerInfo.java   | 124 ++++++++++++++++++
 .../bigwhale/job/ScriptHistoryTimeoutJob.java |  10 +-
 .../job/ScriptHistoryYarnStateRefreshJob.java |  19 ++-
 .../meiyou/bigwhale/util/YarnApiUtils.java    |  42 ++++--
 .../com/meiyou/bigwhale/util/YarnUtil.java    |  24 ++++
 6 files changed, 209 insertions(+), 18 deletions(-)
 create mode 100644 src/main/java/com/meiyou/bigwhale/common/pojo/SchedulerInfo.java
 create mode 100644 src/main/java/com/meiyou/bigwhale/util/YarnUtil.java

diff --git a/src/main/java/com/meiyou/bigwhale/common/Constant.java b/src/main/java/com/meiyou/bigwhale/common/Constant.java
index c7046ea..180d683 100644
--- a/src/main/java/com/meiyou/bigwhale/common/Constant.java
+++ b/src/main/java/com/meiyou/bigwhale/common/Constant.java
@@ -95,4 +95,12 @@ interface ErrorType {
      */
     String DINGDING_ROBOT_URL = "https://oapi.dingtalk.com/robot/send?access_token=";
 
+    /**
+     * YARN resource manager scheduler type
+     */
+    interface YarnResourcemanagerScheduler {
+
+        String CAPACITY_SCHEDULER = "capacityScheduler";
+        String FAIR_SCHEDULER = "fairScheduler";
+    }
 }
diff --git a/src/main/java/com/meiyou/bigwhale/common/pojo/SchedulerInfo.java b/src/main/java/com/meiyou/bigwhale/common/pojo/SchedulerInfo.java
new file mode 100644
index 0000000..dcd0676
--- /dev/null
+++ b/src/main/java/com/meiyou/bigwhale/common/pojo/SchedulerInfo.java
@@ -0,0 +1,124 @@
+package com.meiyou.bigwhale.common.pojo;
+
+import com.meiyou.bigwhale.common.Constant.YarnResourcemanagerScheduler;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * YARN resource manager scheduler info
+ *
+ * <a href="https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Scheduler_API">schedulerInfo</a>
+ *
+ * Elements of the schedulerInfo object
+ *
+ * @author yangjie
+ */
+@Data
+public class SchedulerInfo implements Serializable {
+
+    /**
+     * @see YarnResourcemanagerScheduler
+     */
+    private String type;
+    private Float capacity;
+    private Float usedCapacity;
+    private Float maxCapacity;
+    private String queueName;
+    private List<Queue> queues;
+    private Health health;
+
+    /**
+     * Elements of the queues object for a Parent queue
+     *
+     * Elements of the queues object for a Leaf queue - contains all the elements in parent except ‘queues’ plus the following
+     */
+    @Data
+    public static class Queue {
+        /**
+         * Elements of the queues object for a Parent queue
+         */
+        private Float capacity;
+        private Float usedCapacity;
+        private Float maxCapacity;
+        private Float absoluteCapacity;
+        private Float absoluteMaxCapacity;
+        private Float absoluteUsedCapacity;
+        private Integer numApplications;
+        private String usedResources;
+        private String queueName;
+        private String state;
+        private List<Queue> queues;
+        private Resource resourcesUsed;
+        /**
+         * Elements of the queues object for a Leaf queue
+         * contains all the elements in parent except ‘queues’ plus the following
+         */
+        private String type;
+        private Integer numActiveApplications;
+        private Integer numPendingApplications;
+        private Integer numContainers;
+        private Integer allocatedContainers;
+        private Integer reservedContainers;
+        private Integer pendingContainers;
+        private Integer maxApplications;
+        private Integer maxApplicationsPerUser;
+        private Integer maxActiveApplications;
+        private Integer maxActiveApplicationsPerUser;
+        private Integer userLimit;
+        private Float userLimitFactor;
+        private List<User> users;
+    }
+
+    /**
+     * Elements of the user object for users
+     */
+    @Data
+    public static class User {
+        private String username;
+        private Resource resourcesUsed;
+        private Integer numActiveApplications;
+        private Integer numPendingApplications;
+    }
+
+    /**
+     * Elements of the resource object for resourcesUsed in user and queues
+     */
+    @Data
+    public static class Resource {
+        private Integer memory;
+        private Integer vCores;
+    }
+
+    /**
+     * Elements of the health object in schedulerInfo
+     */
+    @Data
+    public static class Health {
+        private Long lastrun;
+        private List<Operation> operationsInfo;
+        private List<LastRunDetail> lastRunDetails;
+    }
+
+    /**
+     * Elements of the operation object in health
+     */
+    @Data
+    public static class Operation {
+        private String operation;
+        private String nodeId;
+        private String containerId;
+        private String queue;
+    }
+
+    /**
+     * Elements of the lastRunDetail object in health
+     */
+    @Data
+    public static class LastRunDetail {
+        private String operation;
+        private Long count;
+        private Resource resources;
+    }
+}
diff --git a/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryTimeoutJob.java b/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryTimeoutJob.java
index fae9ad0..974e0ad 100644
--- a/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryTimeoutJob.java
+++ b/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryTimeoutJob.java
@@ -2,6 +2,7 @@
 
 import com.meiyou.bigwhale.common.Constant;
 import com.meiyou.bigwhale.common.pojo.HttpYarnApp;
+import com.meiyou.bigwhale.common.pojo.SchedulerInfo;
 import com.meiyou.bigwhale.entity.Cluster;
 import com.meiyou.bigwhale.entity.ScriptHistory;
 import com.meiyou.bigwhale.service.ClusterService;
@@ -49,8 +50,15 @@ public void execute(JobExecutionContext jobExecutionContext) {
                 // Yarn资源不够时,客户端会长时间处于提交请求状态,平台无法中断此请求,故在此处再判断一次状态
                 if (scriptHistory.getClusterId() != null && scriptHistory.getState().equals(Constant.JobState.SUBMITTING)) {
                     Cluster cluster = clusterService.findById(scriptHistory.getClusterId());
+                    // request Cluster Scheduler API for schedulerType
+                    SchedulerInfo scheduler = YarnApiUtils.getYarnSchedulerInfo(cluster.getYarnUrl());
+                    if (scheduler == null) {
+                        // handle this next time.
+                        continue;
+                    }
+                    String schedulerType = scheduler.getType();
                     String [] jobParams = scriptHistory.getJobParams().split(";");
-                    HttpYarnApp httpYarnApp = YarnApiUtils.getActiveApp(cluster.getYarnUrl(), jobParams[0], jobParams[1], jobParams[2], 3);
+                    HttpYarnApp httpYarnApp = YarnApiUtils.getActiveApp(cluster.getYarnUrl(), schedulerType, jobParams[0], jobParams[1], jobParams[2], 3);
                     if (httpYarnApp != null) {
                         retry = false;
                         scriptHistory.updateState(Constant.JobState.SUBMITTED);
diff --git a/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryYarnStateRefreshJob.java b/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryYarnStateRefreshJob.java
index 4cadfa0..50f4eb5 100644
--- a/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryYarnStateRefreshJob.java
+++ b/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryYarnStateRefreshJob.java
@@ -2,11 +2,13 @@
 
 import com.meiyou.bigwhale.common.Constant;
 import com.meiyou.bigwhale.common.pojo.HttpYarnApp;
+import com.meiyou.bigwhale.common.pojo.SchedulerInfo;
 import com.meiyou.bigwhale.entity.Cluster;
 import com.meiyou.bigwhale.entity.ScriptHistory;
 import com.meiyou.bigwhale.service.ClusterService;
 import com.meiyou.bigwhale.service.ScriptHistoryService;
 import com.meiyou.bigwhale.util.YarnApiUtils;
+import com.meiyou.bigwhale.util.YarnUtil;
 import org.quartz.DisallowConcurrentExecution;
 import org.quartz.InterruptableJob;
 import org.quartz.JobExecutionContext;
@@ -43,21 +45,24 @@ public void execute(JobExecutionContext jobExecutionContext) {
             if (scriptHistories.isEmpty()) {
                 continue;
             }
+            // request Cluster Scheduler API for schedulerType
+            SchedulerInfo scheduler = YarnApiUtils.getYarnSchedulerInfo(cluster.getYarnUrl());
+            if (scheduler == null) {
+                continue;
+            }
             // 请求yarn web url, 获取所有应用
             List<HttpYarnApp> httpYarnApps = YarnApiUtils.getActiveApps(cluster.getYarnUrl());
             // 请求出错,不清理数据
             if (httpYarnApps == null) {
                 continue;
             }
+            String schedulerType = scheduler.getType();
             httpYarnApps.removeIf(httpYarnApp -> !httpYarnApp.getName().contains(".bw_instance_") && !httpYarnApp.getName().contains(".bw_test_instance_"));
             Map<String, ScriptHistory> yarnParamsToScriptHistoryMap = new HashMap<>();
             scriptHistories.forEach(scriptHistory -> {
                 String [] jobParams = scriptHistory.getJobParams().split(";");
                 String user = jobParams[0];
-                String queue = jobParams[1];
-                if (queue != null && !"root".equals(queue) && !queue.startsWith("root.")) {
-                    queue = "root." + queue;
-                }
+                String queue = YarnUtil.getQueueName(schedulerType, jobParams[1]);
                 String app = jobParams[2];
                 String key = user + ";" + queue + ";" + app;
                 yarnParamsToScriptHistoryMap.put(key, scriptHistory);
@@ -82,7 +87,7 @@ public void execute(JobExecutionContext jobExecutionContext) {
             }
             scriptHistories.forEach(scriptHistory -> {
                 if (!matchIds.contains(scriptHistory.getId())) {
-                    updateNoMatchScriptHistory(cluster.getYarnUrl(), scriptHistory);
+                    updateNoMatchScriptHistory(cluster.getYarnUrl(), scriptHistory, schedulerType);
                 }
             });
         }
@@ -104,9 +109,9 @@ private void updateMatchScriptHistory(HttpYarnApp httpYarnApp, ScriptHistory scr
         scriptHistoryService.save(scriptHistory);
     }
 
-    private void updateNoMatchScriptHistory(String yarnUrl, ScriptHistory scriptHistory) {
+    private void updateNoMatchScriptHistory(String yarnUrl, ScriptHistory scriptHistory, String schedulerType) {
         String [] jobParams = scriptHistory.getJobParams().split(";");
-        HttpYarnApp httpYarnApp = YarnApiUtils.getLastNoActiveApp(yarnUrl, jobParams[0], jobParams[1], jobParams[2], 3);
+        HttpYarnApp httpYarnApp = YarnApiUtils.getLastNoActiveApp(yarnUrl, schedulerType, jobParams[0], jobParams[1], jobParams[2], 3);
         if (httpYarnApp != null) {
             if ("FINISHED".equals(httpYarnApp.getState())) {
                 scriptHistory.updateState(httpYarnApp.getFinalStatus());
diff --git a/src/main/java/com/meiyou/bigwhale/util/YarnApiUtils.java b/src/main/java/com/meiyou/bigwhale/util/YarnApiUtils.java
index 7a3a867..d8d3887 100644
--- a/src/main/java/com/meiyou/bigwhale/util/YarnApiUtils.java
+++ b/src/main/java/com/meiyou/bigwhale/util/YarnApiUtils.java
@@ -1,13 +1,23 @@
 package com.meiyou.bigwhale.util;
 
-import com.alibaba.fastjson.*;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONException;
+import com.alibaba.fastjson.JSONObject;
 import com.meiyou.bigwhale.common.pojo.BackpressureInfo;
 import com.meiyou.bigwhale.common.pojo.HttpYarnApp;
+import com.meiyou.bigwhale.common.pojo.SchedulerInfo;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class YarnApiUtils {
 
@@ -45,10 +55,8 @@ public static List<HttpYarnApp> getActiveApps(String yarnUrl) {
      * @param retries
      * @return
      */
-    public static HttpYarnApp getActiveApp(String yarnUrl, String user, String queue, String name, int retries) {
-        if (queue != null && !"root".equals(queue) && !queue.startsWith("root.")) {
-            queue = "root." + queue;
-        }
+    public static HttpYarnApp getActiveApp(String yarnUrl, String schedulerType, String user, String queue, String name, int retries) {
+        queue = YarnUtil.getQueueName(schedulerType, queue);
         Map<String, Object> params = new HashMap<>();
         params.put("user", user);
         params.put("queue", queue);
@@ -84,10 +92,8 @@ public static HttpYarnApp getActiveApp(String yarnUrl, String user, String queue
      * @param retries 重试次数
      * @return
      */
-    public static HttpYarnApp getLastNoActiveApp(String yarnUrl, String user, String queue, String name, int retries) {
-        if (queue != null && !"root".equals(queue) && !queue.startsWith("root.")) {
-            queue = "root." + queue;
-        }
+    public static HttpYarnApp getLastNoActiveApp(String yarnUrl, String schedulerType, String user, String queue, String name, int retries) {
+        queue = YarnUtil.getQueueName(schedulerType, queue);
         Map<String, Object> params = new HashMap<>();
         params.put("user", user);
         params.put("states", "finished,killed,failed");
@@ -295,10 +301,26 @@ public static boolean killApp(String yarnUrl, String appId) {
         return false;
     }
 
+    /**
+     * <a href="https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Scheduler_API">cluster scheduler API</a>
+     */
+    public static SchedulerInfo getYarnSchedulerInfo(String yarnUrl) {
+        OkHttpUtils.Result result = OkHttpUtils.doGet(getSchedulerUrl(yarnUrl), Collections.emptyMap(), HEADERS);
+        if (result.isSuccessful && StringUtils.isNotEmpty(result.content)) {
+            JSONObject jsonObject = JSON.parseObject(JSON.parseObject(result.content).getString("scheduler"));
+            return JSON.parseObject(jsonObject.getString("schedulerInfo"), SchedulerInfo.class);
+        }
+        return null;
+    }
+
     private static String getAppsUrl(String yarnUrl) {
         return appendUrl(yarnUrl) + "ws/v1/cluster/apps";
     }
 
+    private static String getSchedulerUrl(String yarnUrl) {
+        return appendUrl(yarnUrl) + "ws/v1/cluster/scheduler";
+    }
+
     private static String appendUrl(String url) {
         if (!url.endsWith("/")) {
             url += "/";
diff --git a/src/main/java/com/meiyou/bigwhale/util/YarnUtil.java b/src/main/java/com/meiyou/bigwhale/util/YarnUtil.java
new file mode 100644
index 0000000..c7f56f0
--- /dev/null
+++ b/src/main/java/com/meiyou/bigwhale/util/YarnUtil.java
@@ -0,0 +1,24 @@
+package com.meiyou.bigwhale.util;
+
+import com.meiyou.bigwhale.common.Constant.YarnResourcemanagerScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangjie
+ */
+public class YarnUtil {
+
+    private YarnUtil() {
+    }
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(YarnUtil.class);
+
+    public static String getQueueName(String yarnSchedulerType, String queue) {
+        if (YarnResourcemanagerScheduler.FAIR_SCHEDULER.equals(yarnSchedulerType) &&
+                queue != null && !"root".equals(queue) && !queue.startsWith("root.")) {
+            return "root." + queue;
+        }
+        return queue;
+    }
+}