From 0be48941411b6075173729aa3a8a0f7f5f025d34 Mon Sep 17 00:00:00 2001 From: frankfreedom <772598220@qq.com> Date: Wed, 19 Aug 2020 00:42:07 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E9=9D=99=E6=80=81?= =?UTF-8?q?=E5=8F=98=E9=87=8F--properties=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../project/DirectoryYamlFlowLoader.java | 5 +- .../common/jobExecutor/utils/Date.java | 72 +++ .../SystemBuiltInParamJodeTimeUtils.java | 412 +++++------------- .../utils/SystemBuiltInParamUtils.java | 20 +- .../main/java/azkaban/execapp/FlowRunner.java | 23 +- 5 files changed, 212 insertions(+), 320 deletions(-) create mode 100644 azkaban-common/src/main/java/com/webank/wedatasphere/schedulis/common/jobExecutor/utils/Date.java diff --git a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java index 2ed44fe..91b475c 100755 --- a/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java +++ b/azkaban-common/src/main/java/azkaban/project/DirectoryYamlFlowLoader.java @@ -137,7 +137,8 @@ private void convertYamlFiles(final File projectDir) { } } } catch (final Exception e) { - this.errors.add("Error loading flow yaml file " + file.getName() + ":" + logger.error("Error loading flow yaml file {}, cause by:", file.getName(), e); + this.errors.add("Error loading flow yaml file " + file.getName() + ", cause by:" + e.getMessage()); } } @@ -224,7 +225,7 @@ private void addEdges(final AzkabanNode node, final AzkabanFlow azkabanFlow, if (recStack.contains(parent)) { // Cycles found, including self cycle. edge.setError("Cycles found."); - this.errors.add("Cycles found at " + edge.getId()); + this.errors.add("Cycles found at " + edge.getId() + ", please check the dependency information."); } else { // Valid edge. Continue to process the parent node recursively. addEdges(azkabanFlow.getNode(parent), azkabanFlow, flowName, recStack, visited); diff --git a/azkaban-common/src/main/java/com/webank/wedatasphere/schedulis/common/jobExecutor/utils/Date.java b/azkaban-common/src/main/java/com/webank/wedatasphere/schedulis/common/jobExecutor/utils/Date.java new file mode 100644 index 0000000..9dfa0c0 --- /dev/null +++ b/azkaban-common/src/main/java/com/webank/wedatasphere/schedulis/common/jobExecutor/utils/Date.java @@ -0,0 +1,72 @@ +package com.webank.wedatasphere.schedulis.common.jobExecutor.utils; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +public enum Date { + + RUN_TODAY("run_today", "day", "yyyyMMdd"), + RUN_TODAY_STD("run_today_std", "day", "yyyy-MM-dd"), + RUN_DATE("run_date", "day", "yyyyMMdd"), + RUN_DATE_STD("run_date_std", "day", "yyyy-MM-dd"), + RUN_MONTH_BEGIN("run_month_begin", "month", "yyyyMMdd"), + RUN_MONTH_BEGIN_STD("run_month_begin_std", "month", "yyyy-MM-dd"), + RUN_MONTH_END("run_month_end", "month", "yyyyMMdd"), + RUN_MONTH_END_STD("run_month_end_std", "month", "yyyy-MM-dd"), + + RUN_QUARTER_BEGIN("run_quarter_begin", "quarter", "yyyyMMdd"), + RUN_QUARTER_END("run_quarter_end", "quarter", "yyyyMMdd"), + RUN_HALF_YEAR_BEGIN("run_half_year_begin", "halfYear", "yyyyMMdd"), + RUN_HALF_YEAR_END("run_half_year_end", "halfYear", "yyyyMMdd"), + RUN_YEAR_BEGIN("run_year_begin", "year", "yyyyMMdd"), + RUN_YEAR_END("run_year_end", "year", "yyyyMMdd"), + RUN_LAST_MONTH_END("run_last_month_end", "month", "yyyyMMdd"), + RUN_LAST_QUARTER_END("run_last_quarter_end", "quarter", "yyyyMMdd"), + RUN_LAST_YEAR_END("run_last_year_end", "year", "yyyyMMdd"), + + RUN_QUARTER_BEGIN_STD("run_quarter_begin_std", "quarter", "yyyy-MM-dd"), + RUN_QUARTER_END_STD("run_quarter_end_std", "quarter", "yyyy-MM-dd"), + RUN_HALF_YEAR_BEGIN_STD("run_half_year_begin_std", "halfYear", "yyyy-MM-dd"), + RUN_HALF_YEAR_END_STD("run_half_year_end_std", "halfYear", "yyyy-MM-dd"), + RUN_YEAR_BEGIN_STD("run_year_begin_std", "year", "yyyy-MM-dd"), + RUN_YEAR_END_STD("run_year_end_std", "year", "yyyy-MM-dd"), + RUN_LAST_MONTH_END_STD("run_last_month_end_std", "month", "yyyy-MM-dd"), + RUN_LAST_QUARTER_END_STD("run_last_quarter_end_std", "quarter", "yyyy-MM-dd"), + RUN_LAST_YEAR_END_STD("run_last_year_end_std", "year", "yyyy-MM-dd"); + + private String value; + private String calRule; + private String format; + + private static Map DATE_MAP = Arrays.stream(Date.values()).collect(Collectors.toMap(x -> x.getValue(), x -> x)); + + + Date(String value, String calRule, String format) { + this.value = value; + this.calRule = calRule; + this.format = format; + } + + + public String getValue() { + return value; + } + + public String getCalRule() { + return calRule; + } + + public String getFormat() { + return format; + } + + public static Map getDateMap() { + return DATE_MAP; + } + + @Override + public String toString() { + return this.value; + } +} diff --git a/azkaban-common/src/main/java/com/webank/wedatasphere/schedulis/common/jobExecutor/utils/SystemBuiltInParamJodeTimeUtils.java b/azkaban-common/src/main/java/com/webank/wedatasphere/schedulis/common/jobExecutor/utils/SystemBuiltInParamJodeTimeUtils.java index 4d4d489..6b28fa6 100755 --- a/azkaban-common/src/main/java/com/webank/wedatasphere/schedulis/common/jobExecutor/utils/SystemBuiltInParamJodeTimeUtils.java +++ b/azkaban-common/src/main/java/com/webank/wedatasphere/schedulis/common/jobExecutor/utils/SystemBuiltInParamJodeTimeUtils.java @@ -16,9 +16,10 @@ package com.webank.wedatasphere.schedulis.common.jobExecutor.utils; +import com.webank.wedatasphere.schedulis.common.utils.DateUtils; import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.Logger; import org.joda.time.LocalDate; import org.joda.time.format.DateTimeFormat; @@ -42,90 +43,110 @@ public class SystemBuiltInParamJodeTimeUtils { - private static final Logger utilLogger = LoggerFactory.getLogger(SystemBuiltInParamJodeTimeUtils.class); - public static final String RUN_TODAY = "run_today"; - public static final String RUN_TODAY_STD = "run_today_std"; - public static final String RUN_DATE = "run_date"; - public static final String RUN_DATE_STD = "run_date_std"; - public static final String RUN_MONTH_BEGIN = "run_month_begin"; - public static final String RUN_MONTH_BEGIN_STD = "run_month_begin_std"; - public static final String RUN_MONTH_END = "run_month_end"; - public static final String RUN_MONTH_END_STD = "run_month_end_std"; - public static final String MINUS = "MINUS"; - public static final String PLUS = "PLUS"; + private static final Logger logger = LoggerFactory.getLogger(SystemBuiltInParamJodeTimeUtils.class); + public static final String TIME_TEMPLATE = "(\\d{4}\\.\\d{2}\\.\\d{2}|\\d{4}/\\d{2}/\\d{2}|\\d{4}-\\d{2}-\\d{2}|\\d{4}\\d{2}\\d{2})"; private Map propMap = new HashMap<>(); - private Map defaultDate = new HashMap<>();; + private Map defaultDate = new HashMap<>(); + + private static String re = "[a-z_]+((\\+|-)[1-9][0-9]*){0,1}"; private void initDate(ExecutableFlow executableFlow) throws RuntimeException{ LocalDate runDate = null; - DateTimeFormatter dateTimeFormatter0 = DateTimeFormat.forPattern("yyyyMMdd"); - DateTimeFormatter dateTimeFormatter1 = DateTimeFormat.forPattern("yyyy-MM-dd"); + DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyyMMdd"); //历史重跑 if(2 == executableFlow.getFlowType()){ runDate = new LocalDate(Long.valueOf(executableFlow.getRepeatOption().get("startTimeLong"))).minusDays(1); } else { - if (null != executableFlow.getExecutionOptions().getFlowParameters().get(RUN_DATE)) { - runDate = LocalDate.parse(executableFlow.getExecutionOptions().getFlowParameters().get(RUN_DATE), dateTimeFormatter0); - } else if(this.propMap.get(RUN_DATE) != null) { - String tmp = this.propMap.get(RUN_DATE).replaceAll("[\"'./-]",""); - runDate = LocalDate.parse(tmp, dateTimeFormatter0); + if (null != executableFlow.getExecutionOptions().getFlowParameters().get(Date.RUN_DATE.getValue())) { + runDate = LocalDate.parse(executableFlow.getExecutionOptions().getFlowParameters().get(Date.RUN_DATE.getValue()), dateTimeFormatter); + } else if(this.propMap.get(Date.RUN_DATE.getValue()) != null) { + String tmp = this.propMap.get(Date.RUN_DATE.getValue()).replaceAll("[\"'./-]",""); + runDate = LocalDate.parse(tmp, dateTimeFormatter); } else { runDate = new LocalDate(executableFlow.getSubmitTime()).minusDays(1); } } //用于前端显示 executableFlow.setRunDate(runDate.toString("yyyyMMdd")); - defaultDate.put(RUN_DATE, runDate); - defaultDate.put(RUN_DATE_STD, runDate); - defaultDate.put(RUN_TODAY, runDate.plusDays(1)); - defaultDate.put(RUN_TODAY_STD, runDate.plusDays(1)); - defaultDate.put(RUN_MONTH_BEGIN, runDate.dayOfMonth().withMinimumValue()); - defaultDate.put(RUN_MONTH_BEGIN_STD, runDate.dayOfMonth().withMinimumValue()); - defaultDate.put(RUN_MONTH_END, runDate.dayOfMonth().withMaximumValue()); - defaultDate.put(RUN_MONTH_END_STD, runDate.dayOfMonth().withMaximumValue()); - - if(2 != executableFlow.getFlowType()){ - LocalDate newRunDateStd = hasNewDate(executableFlow, RUN_DATE_STD, dateTimeFormatter1); - if(newRunDateStd != null){ - defaultDate.put(RUN_DATE_STD, newRunDateStd); - } - LocalDate newRunToday = hasNewDate(executableFlow, RUN_TODAY, dateTimeFormatter0); - if(newRunToday != null){ - defaultDate.put(RUN_TODAY, newRunToday); - } - LocalDate newRunTodayStd = hasNewDate(executableFlow, RUN_TODAY_STD, dateTimeFormatter1); - if(newRunTodayStd != null){ - defaultDate.put(RUN_TODAY_STD, newRunTodayStd); - } - LocalDate newRunMonthBegin = hasNewDate(executableFlow, RUN_MONTH_BEGIN, dateTimeFormatter0); - if(newRunMonthBegin != null){ - defaultDate.put(RUN_MONTH_BEGIN, newRunMonthBegin); - } - LocalDate newRunMonthBeginStd = hasNewDate(executableFlow, RUN_MONTH_BEGIN_STD, dateTimeFormatter1); - if(newRunMonthBeginStd != null){ - defaultDate.put(RUN_MONTH_BEGIN_STD, newRunMonthBeginStd); - } - LocalDate newRunMonthEnd = hasNewDate(executableFlow, RUN_MONTH_END, dateTimeFormatter0); - if(newRunMonthEnd != null){ - defaultDate.put(RUN_MONTH_END, newRunMonthEnd); - } - LocalDate newRunMonthEndStd = hasNewDate(executableFlow, RUN_MONTH_END_STD, dateTimeFormatter1); - if(newRunMonthEndStd != null){ - defaultDate.put(RUN_MONTH_END_STD, newRunMonthEndStd); + + defaultDate.put(Date.RUN_DATE.getValue(), runDate); + defaultDate.put(Date.RUN_DATE_STD.getValue(), runDate); + defaultDate.put(Date.RUN_TODAY.getValue(), runDate.plusDays(1)); + defaultDate.put(Date.RUN_TODAY_STD.getValue(), runDate.plusDays(1)); + defaultDate.put(Date.RUN_MONTH_BEGIN.getValue(), runDate.dayOfMonth().withMinimumValue()); + defaultDate.put(Date.RUN_MONTH_BEGIN_STD.getValue(), runDate.dayOfMonth().withMinimumValue()); + defaultDate.put(Date.RUN_MONTH_END.getValue(), runDate.dayOfMonth().withMaximumValue()); + defaultDate.put(Date.RUN_MONTH_END_STD.getValue(), runDate.dayOfMonth().withMaximumValue()); + + defaultDate.put(Date.RUN_QUARTER_BEGIN.getValue(), DateUtils.getQuarterBegin(runDate)); + defaultDate.put(Date.RUN_QUARTER_END.getValue(), DateUtils.getQuarterEnd(runDate)); + defaultDate.put(Date.RUN_HALF_YEAR_BEGIN.getValue(), DateUtils.getHalfYearBegin(runDate)); + defaultDate.put(Date.RUN_HALF_YEAR_END.getValue(), DateUtils.getHalfYearEnd(runDate)); + defaultDate.put(Date.RUN_YEAR_BEGIN.getValue(), DateUtils.getYearBegin(runDate)); + defaultDate.put(Date.RUN_YEAR_END.getValue(), DateUtils.getYearEnd(runDate)); + defaultDate.put(Date.RUN_LAST_MONTH_END.getValue(), DateUtils.getLastMonthEnd(runDate)); + defaultDate.put(Date.RUN_LAST_QUARTER_END.getValue(), DateUtils.getLastQuarterEnd(runDate)); + defaultDate.put(Date.RUN_LAST_YEAR_END.getValue(), DateUtils.getLastYearEnd(runDate)); + + defaultDate.put(Date.RUN_QUARTER_BEGIN_STD.getValue(), DateUtils.getQuarterBegin(runDate)); + defaultDate.put(Date.RUN_QUARTER_END_STD.getValue(), DateUtils.getQuarterEnd(runDate)); + defaultDate.put(Date.RUN_HALF_YEAR_BEGIN_STD.getValue(), DateUtils.getHalfYearBegin(runDate)); + defaultDate.put(Date.RUN_HALF_YEAR_END_STD.getValue(), DateUtils.getHalfYearEnd(runDate)); + defaultDate.put(Date.RUN_YEAR_BEGIN_STD.getValue(), DateUtils.getYearBegin(runDate)); + defaultDate.put(Date.RUN_YEAR_END_STD.getValue(), DateUtils.getYearEnd(runDate)); + defaultDate.put(Date.RUN_LAST_MONTH_END_STD.getValue(), DateUtils.getLastMonthEnd(runDate)); + defaultDate.put(Date.RUN_LAST_QUARTER_END_STD.getValue(), DateUtils.getLastQuarterEnd(runDate)); + defaultDate.put(Date.RUN_LAST_YEAR_END_STD.getValue(), DateUtils.getLastYearEnd(runDate)); + + if (2 != executableFlow.getFlowType()) { + Date date[] = { + Date.RUN_DATE_STD, + Date.RUN_TODAY, + Date.RUN_TODAY_STD, + Date.RUN_MONTH_BEGIN, + Date.RUN_MONTH_BEGIN_STD, + Date.RUN_MONTH_END, + Date.RUN_MONTH_END_STD, + Date.RUN_QUARTER_BEGIN, + Date.RUN_QUARTER_BEGIN_STD, + Date.RUN_QUARTER_END, + Date.RUN_QUARTER_END_STD, + Date.RUN_LAST_QUARTER_END, + Date.RUN_LAST_QUARTER_END_STD, + Date.RUN_HALF_YEAR_BEGIN, + Date.RUN_HALF_YEAR_BEGIN_STD, + Date.RUN_HALF_YEAR_END, + Date.RUN_HALF_YEAR_END_STD, + Date.RUN_YEAR_BEGIN, + Date.RUN_YEAR_BEGIN_STD, + Date.RUN_YEAR_END, + Date.RUN_YEAR_END_STD, + Date.RUN_LAST_MONTH_END, + Date.RUN_LAST_MONTH_END_STD, + Date.RUN_LAST_YEAR_END, + Date.RUN_LAST_YEAR_END_STD + }; + for (Date item : date) { + LocalDate newDate; + newDate = hasNewDate(executableFlow, item); + if (newDate != null) { + defaultDate.put(item.getValue(), newDate); + } } } } - private LocalDate hasNewDate(ExecutableFlow executableFlow, String dateType, DateTimeFormatter dateTimeFormatter){ + private LocalDate hasNewDate(ExecutableFlow executableFlow, Date dateType){ LocalDate newDate = null; - if (null != executableFlow.getExecutionOptions().getFlowParameters().get(dateType)) { - newDate = LocalDate.parse(executableFlow.getExecutionOptions().getFlowParameters().get(dateType), dateTimeFormatter); - } else if(this.propMap.get(dateType) != null) { - newDate = LocalDate.parse(this.propMap.get(dateType), dateTimeFormatter); + if (null != executableFlow.getExecutionOptions().getFlowParameters().get(dateType.getValue())) { + newDate = LocalDate.parse(executableFlow.getExecutionOptions().getFlowParameters().get(dateType.getValue()), + DateTimeFormat.forPattern(dateType.getFormat())); + } else if(this.propMap.get(dateType.getValue()) != null) { + newDate = LocalDate.parse(this.propMap.get(dateType.getValue()), DateTimeFormat.forPattern(dateType.getFormat())); } return newDate; } @@ -138,7 +159,7 @@ private void FileWrite(String filePath, String fileStr){ //写入到文件 fw.write(fileStr); } catch (Exception e) { - utilLogger.error("写入脚本文件异常!", e); + logger.error("写入脚本文件异常!", e); e.printStackTrace(); }finally { if(fw != null){ @@ -193,7 +214,7 @@ private void findScriptFilePath(String dirPath, List filePathList){ File f = new File(dirPath); if (!f.exists()) { //System.out.println(dirPath + " not exists"); - utilLogger.error("文件地址: " + dirPath + "不存在!"); + logger.error("文件地址: " + dirPath + "不存在!"); } File fa[] = f.listFiles(); for (int i = 0; i < fa.length; i++) { @@ -203,7 +224,8 @@ private void findScriptFilePath(String dirPath, List filePathList){ } else { if(fs.getName().endsWith(".py") || fs.getName().endsWith(".sh") || fs.getName().endsWith(".sql") || fs.getName().endsWith(".hql") - || fs.getName().endsWith(".job") || fs.getName().endsWith(".flow")){ + || fs.getName().endsWith(".job") || fs.getName().endsWith(".flow") + || fs.getName().endsWith(".properties")){ filePathList.add(fs.getPath().toString()); } } @@ -228,7 +250,7 @@ private Map readProperties(String propPath){ } } } catch (Exception ex) { - utilLogger.error("读取properties配置文件异常!", ex); + logger.error("读取properties配置文件异常!", ex); ex.printStackTrace(); } finally { if (input != null) { @@ -264,7 +286,7 @@ private String readFile(String filePath) return fileStr; } catch (Exception e) { - utilLogger.error("读取脚本文件异常!", e); + logger.error("读取脚本文件异常!", e); e.printStackTrace(); } finally { if(br != null){ @@ -290,7 +312,7 @@ private Map paramDecompose(String fileStr, ExecutableFlow ef){ while(matcher.find()){ String fullStr = matcher.group(); // fullStr = ${abcd} String valueStr = matcher.group(1); // valueStr = abcd - String timeParam = scriptTimeHandle(valueStr); + String timeParam = calculationDate(valueStr); if(!"".equals(timeParam)) { paramReplaceMap.put(fullStr, timeParam); } @@ -300,143 +322,33 @@ private Map paramDecompose(String fileStr, ExecutableFlow ef){ return paramReplaceMap; } - private String scriptTimeHandle(String param) { - //时间字符串 + private String calculationDate(String fullVal) { + fullVal = fullVal.replaceAll("\\s*", ""); String timeStr = ""; - param = param.replaceAll("\\s*", ""); - - if (RUN_DATE.equals(param)) { - timeStr = defaultDate.get(RUN_DATE).toString("yyyyMMdd"); - - } else if (RUN_DATE_STD.equals(param)) { - timeStr = defaultDate.get(RUN_DATE_STD).toString("yyyy-MM-dd"); - - } else if (param.contains(RUN_DATE) && !param.contains(RUN_DATE_STD)) { - String mathStr = StringUtils.substringAfter(param, RUN_DATE); - String[] sAry = {}; - if (MINUS.equals(paramVerify(mathStr, param))) { - sAry = mathStr.split("-"); - timeStr = defaultDate.get(RUN_DATE).minusDays(Integer.valueOf(sAry[1])).toString("yyyyMMdd"); - } else if (PLUS.equals(paramVerify(mathStr, param))) { - sAry = mathStr.split("\\+"); - timeStr = defaultDate.get(RUN_DATE).plusDays(Integer.valueOf(sAry[1])).toString("yyyyMMdd"); - } - - } else if (param.contains(RUN_DATE) && param.contains(RUN_DATE_STD)) { - String mathStr = StringUtils.substringAfter(param, RUN_DATE_STD); - String[] sAry = {}; - if (MINUS.equals(paramVerify(mathStr, param))) { - sAry = mathStr.split("-"); - timeStr = defaultDate.get(RUN_DATE_STD).minusDays(Integer.valueOf(sAry[1])).toString(); - } else if (PLUS.equals(paramVerify(mathStr, param))) { - sAry = mathStr.split("\\+"); - timeStr = defaultDate.get(RUN_DATE_STD).plusDays(Integer.valueOf(sAry[1])).toString(); - } - } - - if (RUN_MONTH_BEGIN.equals(param)) { - timeStr = defaultDate.get(RUN_MONTH_BEGIN).toString("yyyyMMdd"); - - } else if (RUN_MONTH_BEGIN_STD.equals(param)) { - timeStr = defaultDate.get(RUN_MONTH_BEGIN_STD).toString("yyyy-MM-dd"); - - } else if (param.contains(RUN_MONTH_BEGIN) && !param.contains(RUN_MONTH_BEGIN_STD)) { - String mathStr = StringUtils.substringAfter(param, RUN_MONTH_BEGIN); - String[] sAry = {}; - if (MINUS.equals(paramVerify(mathStr, param))) { - sAry = mathStr.split("-"); - timeStr = defaultDate.get(RUN_MONTH_BEGIN).minusMonths(Integer.valueOf(sAry[1])).dayOfMonth() - .withMinimumValue().toString("yyyyMMdd"); - - } else if (PLUS.equals(paramVerify(mathStr, param))) { - sAry = mathStr.split("\\+"); - timeStr = defaultDate.get(RUN_MONTH_BEGIN).plusMonths(Integer.valueOf(sAry[1])).dayOfMonth() - .withMinimumValue().toString("yyyyMMdd"); - } - - } else if (param.contains(RUN_MONTH_BEGIN) && param.contains(RUN_MONTH_BEGIN_STD)) { - String mathStr = StringUtils.substringAfter(param, RUN_MONTH_BEGIN_STD); - String[] sAry = {}; - if (MINUS.equals(paramVerify(mathStr, param))) { - sAry = mathStr.split("-"); - timeStr = defaultDate.get(RUN_MONTH_BEGIN_STD).minusMonths(Integer.valueOf(sAry[1])).dayOfMonth() - .withMinimumValue().toString("yyyy-MM-dd"); - - } else if (PLUS.equals(paramVerify(mathStr, param))) { - sAry = mathStr.split("\\+"); - timeStr = defaultDate.get(RUN_MONTH_BEGIN_STD).plusMonths(Integer.valueOf(sAry[1])).dayOfMonth() - .withMinimumValue().toString("yyyy-MM-dd"); - } - } - - if(RUN_MONTH_END.equals(param)){ - timeStr = defaultDate.get(RUN_MONTH_END).toString("yyyyMMdd"); - - }else if(RUN_MONTH_END_STD.equals(param)){ - timeStr = defaultDate.get(RUN_MONTH_END_STD).toString("yyyy-MM-dd"); - - }else if(param.contains(RUN_MONTH_END) && !param.contains(RUN_MONTH_END_STD)){ - String mathStr = StringUtils.substringAfter(param, RUN_MONTH_END); - String[] sAry = {}; - - if(MINUS.equals(paramVerify(mathStr, param))) { - sAry = mathStr.split("-"); - timeStr = defaultDate.get(RUN_MONTH_END).minusMonths(Integer.valueOf(sAry[1])).dayOfMonth() - .withMaximumValue().toString("yyyyMMdd"); - - } else if(PLUS.equals(paramVerify(mathStr, param))) { - sAry = mathStr.split("\\+"); - timeStr = defaultDate.get(RUN_MONTH_END).plusMonths(Integer.valueOf(sAry[1])).dayOfMonth() - .withMaximumValue().toString("yyyyMMdd"); - - } - - }else if(param.contains(RUN_MONTH_END) && param.contains(RUN_MONTH_END_STD)){ - String mathStr = StringUtils.substringAfter(param, RUN_MONTH_END_STD); - String[] sAry = {}; - - if(MINUS.equals(paramVerify(mathStr, param))) { - sAry = mathStr.split("-"); - timeStr = defaultDate.get(RUN_MONTH_END_STD).minusMonths(Integer.valueOf(sAry[1])).dayOfMonth() - .withMaximumValue().toString("yyyy-MM-dd"); - - } else if(PLUS.equals(paramVerify(mathStr, param))) { - sAry = mathStr.split("\\+"); - timeStr = defaultDate.get(RUN_MONTH_END_STD).plusMonths(Integer.valueOf(sAry[1])).dayOfMonth() - .withMaximumValue().toString("yyyy-MM-dd"); - } - } - - if (RUN_TODAY.equals(param)) { - timeStr = defaultDate.get(RUN_TODAY).toString("yyyyMMdd"); - } else if (RUN_TODAY_STD.equals(param)) { - timeStr = defaultDate.get(RUN_TODAY_STD).toString("yyyy-MM-dd"); - } else if (param.contains(RUN_TODAY) && !param.contains(RUN_TODAY_STD)) { - String mathStr = StringUtils.substringAfter(param, RUN_TODAY); - String[] sAry = {}; - if (MINUS.equals(paramVerify(mathStr, param))) { - sAry = mathStr.split("-"); - timeStr = defaultDate.get(RUN_TODAY).minusDays(Integer.valueOf(sAry[1])).toString("yyyyMMdd"); - } else if (PLUS.equals(paramVerify(mathStr, param))) { - sAry = mathStr.split("\\+"); - timeStr = defaultDate.get(RUN_TODAY).plusDays(Integer.valueOf(sAry[1])).toString("yyyyMMdd"); - } - } else if (param.contains(RUN_TODAY) && param.contains(RUN_TODAY_STD)) { - String mathStr = StringUtils.substringAfter(param, RUN_TODAY_STD); - String[] sAry = {}; - if (MINUS.equals(paramVerify(mathStr, param))) { - sAry = mathStr.split("-"); - timeStr = defaultDate.get(RUN_TODAY_STD).minusDays(Integer.valueOf(sAry[1])).toString("yyyy-MM-dd"); - } else if (PLUS.equals(paramVerify(mathStr, param))) { - sAry = mathStr.split("\\+"); - timeStr = defaultDate.get(RUN_TODAY_STD).plusDays(Integer.valueOf(sAry[1])).toString("yyyy-MM-dd"); + if (fullVal.matches(re)) { + String str[] = fullVal.split("\\+|-"); + if (str.length == 1) { + String key = str[0]; + int num = 0; + if (defaultDate.containsKey(key) && Date.getDateMap().containsKey(key)) { + Date date = Date.getDateMap().get(key); + timeStr = DateUtils.calDate(date, num, defaultDate.get(key)); + } + } else { + String key = str[0]; + int num = Integer.parseInt(StringUtils.substringAfter(fullVal, key)); + if (defaultDate.containsKey(key) && Date.getDateMap().containsKey(key)) { + Date date = Date.getDateMap().get(key); + timeStr = DateUtils.calDate(date, num, defaultDate.get(key)); + } } } - return timeStr; } + + //过滤用户设置的参数 排除用户设置过的参数 private void filterUserParam(Map systemParam, ExecutableFlow ef){ Map handleMap = new HashMap<>(); @@ -483,7 +395,7 @@ public void run(String workingDir, ExecutableFlow ef){ } } } catch(RuntimeException e){ - utilLogger.error("set rundate failed {}", e); + logger.error("set rundate failed {}", e); } //所有脚本的文件地址 @@ -494,7 +406,7 @@ public void run(String workingDir, ExecutableFlow ef){ initDate(ef); } catch (RuntimeException re){ initDateIsSuccess = false; - utilLogger.error("parse run_date failed.", re); + logger.error("parse run_date failed.", re); } //循环脚本文件地址 for(String filePath : scriptPathList){ @@ -519,104 +431,6 @@ public Map getPropMap() { return propMap; } - public void setPropMap(Map propMap) { - this.propMap = propMap; - } - - public void addPropMap(String key, String value) { - this.propMap.put(key, value); - } - - - private String paramVerify(String param, String fullParam){ - - String symbol = ""; - - String reg = "[0-9]"; - - String[] mathStr = {}; - - int minusSite = param.indexOf("-"); - int plusSite = param.indexOf("+"); - - String[] sAry = null; - - if((plusSite > minusSite && minusSite != -1) || plusSite == -1){ - sAry = param.split("-"); - symbol = MINUS; - } else if ((minusSite > plusSite && plusSite != -1) || minusSite == -1){ - sAry = param.split("\\+"); - symbol = PLUS; - } - - if(fullParam.contains(RUN_DATE) && !fullParam.contains(RUN_DATE_STD)){ - mathStr = StringUtils.split(fullParam, RUN_DATE); - } else if (fullParam.contains(RUN_DATE) && fullParam.contains(RUN_DATE_STD)) { - mathStr = StringUtils.split(param, RUN_DATE_STD); - } else if (fullParam.contains(RUN_MONTH_BEGIN) && !fullParam.contains(RUN_MONTH_BEGIN_STD)){ - mathStr = StringUtils.split(param, RUN_MONTH_BEGIN); - } else if (fullParam.contains(RUN_MONTH_BEGIN) && fullParam.contains(RUN_MONTH_BEGIN_STD)){ - mathStr = StringUtils.split(param, RUN_MONTH_BEGIN_STD); - } else if(fullParam.contains(RUN_MONTH_END) && !fullParam.contains(RUN_MONTH_END_STD)){ - mathStr = StringUtils.split(param, RUN_MONTH_END); - } else if(fullParam.contains(RUN_MONTH_END) && fullParam.contains(RUN_MONTH_END_STD)){ - mathStr = StringUtils.split(fullParam, RUN_MONTH_END_STD); - }else if(fullParam.contains(RUN_TODAY) && !fullParam.contains(RUN_TODAY_STD)){ - mathStr = StringUtils.split(fullParam, RUN_TODAY); - } else if (fullParam.contains(RUN_TODAY) && fullParam.contains(RUN_TODAY_STD)) { - mathStr = StringUtils.split(param, RUN_TODAY_STD); - } - - if(mathStr.length > 0){ - Pattern pattern = Pattern.compile(reg); - if(!pattern.matcher(mathStr[0]).find()){ - utilLogger.error("脚本替换参数适配异常!请检查脚本!"); - utilLogger.error("The script parameter ${" + fullParam + "} exception!Please check the script!"); - symbol = "FALSE"; - } - } - - if(sAry.length > 1 && sAry.length == 2){ - String start = sAry[0]; - if(StringUtils.isNotEmpty(start)){ - utilLogger.error("脚本替换参数适配异常!请检查脚本!"); - utilLogger.error("The script parameter ${" + fullParam + "} exception!Please check the script!"); - symbol = "FALSE"; - } - String str = sAry[1]; - Pattern pattern = Pattern.compile("[0-9]*"); - if(!pattern.matcher(str).matches()){ - utilLogger.error("脚本替换参数适配异常!请检查脚本!"); - utilLogger.error("The script parameter ${" + fullParam + "} exception!Please check the script!"); - symbol = "FALSE"; - } - }else if(sAry.length > 2){//多个运算符号就报异常 - utilLogger.error("脚本替换参数适配异常!请检查脚本!"); - utilLogger.error("The script parameter ${" + fullParam + "} exception!Please check the script!"); - symbol = "FALSE"; - }else if(sAry.length <= 1){//多个运算符号就报异常 - utilLogger.error("脚本替换参数适配异常!请检查脚本!"); - utilLogger.error("The script parameter ${" + fullParam + "} exception!Please check the script!"); - symbol = "FALSE"; - } - - return symbol; - } - - - private long flowParamTimesHandle(ExecutableFlow ef){ - long flowParamSetTime = 0; - org.joda.time.format.DateTimeFormatter formatter; - if(null != ef.getExecutionOptions().getFlowParameters().get("run_date")){ - formatter = DateTimeFormat.forPattern("yyyyMMdd"); - LocalDate localDate = LocalDate.parse(ef.getExecutionOptions().getFlowParameters().get("run_date") - , formatter); - flowParamSetTime = localDate.toDate().getTime(); - } else { - - } - return flowParamSetTime; - } public static boolean dateFormatCheck(String date){ Pattern p = Pattern.compile(TIME_TEMPLATE); @@ -624,7 +438,7 @@ public static boolean dateFormatCheck(String date){ if(m.matches()) { return true; } else { - utilLogger.error(date + ",不是合法的日期格式!"); + logger.error(date + ",不是合法的日期格式!"); return false; } } diff --git a/azkaban-common/src/main/java/com/webank/wedatasphere/schedulis/common/jobExecutor/utils/SystemBuiltInParamUtils.java b/azkaban-common/src/main/java/com/webank/wedatasphere/schedulis/common/jobExecutor/utils/SystemBuiltInParamUtils.java index 7dcc127..5a8582b 100755 --- a/azkaban-common/src/main/java/com/webank/wedatasphere/schedulis/common/jobExecutor/utils/SystemBuiltInParamUtils.java +++ b/azkaban-common/src/main/java/com/webank/wedatasphere/schedulis/common/jobExecutor/utils/SystemBuiltInParamUtils.java @@ -38,13 +38,13 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.Logger; @Deprecated public class SystemBuiltInParamUtils { - private static final Logger utilLogger = LoggerFactory.getLogger(SystemBuiltInParamUtils.class); + private static final Logger logger = LoggerFactory.getLogger(SystemBuiltInParamUtils.class); public static final String RUN_DATE = "run_date"; public static final String RUN_DATE_STD = "run_date_std"; @@ -65,7 +65,7 @@ private void FileWrite(String filePath, String fileStr){ //写入到文件 fw.write(fileStr); } catch (Exception e) { - utilLogger.error("写入脚本文件异常!", e); + logger.error("写入脚本文件异常!", e); e.printStackTrace(); }finally { if(fw != null){ @@ -120,7 +120,7 @@ private void findScriptFilePath(String dirPath, List filePathList){ File f = new File(dirPath); if (!f.exists()) { //System.out.println(dirPath + " not exists"); - utilLogger.error("文件地址: " + dirPath + "不存在!"); + logger.error("文件地址: " + dirPath + "不存在!"); } File fa[] = f.listFiles(); for (int i = 0; i < fa.length; i++) { @@ -154,7 +154,7 @@ private Map readProperties(String propPath){ } } } catch (Exception ex) { - utilLogger.error("读取properties配置文件异常!", ex); + logger.error("读取properties配置文件异常!", ex); ex.printStackTrace(); } finally { if (input != null) { @@ -187,7 +187,7 @@ private String readFile(String filePath) //System.out.println(fileStr); return fileStr; } catch (Exception e) { - utilLogger.error("读取脚本文件异常!", e); + logger.error("读取脚本文件异常!", e); e.printStackTrace(); } finally { if(br != null){ @@ -568,20 +568,20 @@ private String paramVerify(String param, String fullParam){ if(sAry.length > 1 && sAry.length == 2){ String start = sAry[0]; if(StringUtils.isNotEmpty(start)){ - utilLogger.error("脚本替换参数适配异常!请检查脚本!"); + logger.error("脚本替换参数适配异常!请检查脚本!"); throw new RuntimeException("The script parameter ${" + fullParam + "} exception!Please check the script!"); } String str = sAry[1]; Pattern pattern = Pattern.compile("[0-9]*"); if(!pattern.matcher(str).matches()){ - utilLogger.error("脚本替换参数适配异常!请检查脚本!"); + logger.error("脚本替换参数适配异常!请检查脚本!"); throw new RuntimeException("The script parameter ${" + fullParam + "} exception!Please check the script!"); } }else if(sAry.length > 2){//多个运算符号就报异常 - utilLogger.error("脚本替换参数适配异常!请检查脚本!"); + logger.error("脚本替换参数适配异常!请检查脚本!"); throw new RuntimeException("The script parameter ${" + fullParam + "} exception!Please check the script!"); }else if(sAry.length <= 1){//多个运算符号就报异常 - utilLogger.error("脚本替换参数适配异常!请检查脚本!"); + logger.error("脚本替换参数适配异常!请检查脚本!"); throw new RuntimeException("The script parameter ${" + fullParam + "} exception!Please check the script!"); } diff --git a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java index b86766d..7416ea6 100644 --- a/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java +++ b/azkaban-exec-server/src/main/java/azkaban/execapp/FlowRunner.java @@ -170,7 +170,6 @@ public class FlowRunner extends EventHandler implements Runnable { private String loggerName; private String logFileName; - /** * Constructor. This will create its own ExecutorService for thread pools */ @@ -311,6 +310,19 @@ private void initNSWTSSValue(){ logger.info("nsWtss: " + this.flow.getNsWtss() + ", flowParamNsWtss: " + flowParamNsWtss + ", flowPropNswtss:" + flowPropNswtss); } +/** + * rundate替换 + */ + private void runDateReplace(){ + //获取执行Flow节点 + ExecutableFlow ef = this.flow; + // FIXME New feature, replace the run_date variable in the file before the job stream starts running. + SystemBuiltInParamJodeTimeUtils sbipu = new SystemBuiltInParamJodeTimeUtils(); + if(null == ef.getParentFlow()){ + sbipu.run(this.execDir.getPath(), ef); + } + } + private void alertOnIMSRegistStart(){ try { // 注册并上报作业流开始 @@ -343,6 +355,7 @@ public void run() { try { // FIXME Create a thread pool and add a thread pool (executorServiceForCheckers) for running checker tasks. createThreadPool(); + runDateReplace(); this.logger.info("Fetching job and shared properties."); if (!FlowLoaderUtils.isAzkabanFlowVersion20(this.flow.getAzkabanFlowVersion())) { loadAllProperties(); @@ -359,14 +372,6 @@ public void run() { this.logger.info("Updating initial flow directory."); updateFlow(); - //获取执行Flow节点 - ExecutableFlow ef = this.flow; - // FIXME New feature, replace the run_date variable in the file before the job stream starts running. - SystemBuiltInParamJodeTimeUtils sbipu = new SystemBuiltInParamJodeTimeUtils(); - if(null == ef.getParentFlow()){ - sbipu.run(this.execDir.getPath(), ef); - } - this.fireEventListeners( Event.create(this, EventType.FLOW_STARTED, new EventData(this.getExecutableFlow()))); From 22d0d6e7b7393ac0401a6cf8773453991c9dee50 Mon Sep 17 00:00:00 2001 From: frankfreedom <772598220@qq.com> Date: Wed, 19 Aug 2020 00:50:31 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E9=9D=99=E6=80=81?= =?UTF-8?q?=E5=8F=98=E9=87=8F--properties=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../schedulis/common/utils/DateUtils.java | 200 ++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 azkaban-common/src/main/java/com/webank/wedatasphere/schedulis/common/utils/DateUtils.java diff --git a/azkaban-common/src/main/java/com/webank/wedatasphere/schedulis/common/utils/DateUtils.java b/azkaban-common/src/main/java/com/webank/wedatasphere/schedulis/common/utils/DateUtils.java new file mode 100644 index 0000000..6b24733 --- /dev/null +++ b/azkaban-common/src/main/java/com/webank/wedatasphere/schedulis/common/utils/DateUtils.java @@ -0,0 +1,200 @@ +package com.webank.wedatasphere.schedulis.common.utils; + +import com.webank.wedatasphere.schedulis.common.jobExecutor.utils.Date; +import org.joda.time.LocalDate; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +public class DateUtils { + + + public static final DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyyMMdd"); + + /** + * 获取当前季度 + * @return + */ + public static int getQuarter(LocalDate date) { + return ((date.getMonthOfYear() - 1) / 3) + 1; + } + + /** + * 判断当前时间是上半年还是下半年 + * @return + */ + public static boolean isFirstYear(LocalDate date) { + return date.getMonthOfYear() < 7 ? true : false; + } + + /** + * 季度初 + * @return + */ + public static LocalDate getQuarterBegin(LocalDate localDate){ + int year = localDate.getYear(); + LocalDate date = null; + switch (getQuarter(localDate)){ + case 1: + date = LocalDate.parse(year + "0101", dateTimeFormatter); + break; + case 2: + date = LocalDate.parse(year + "0401", dateTimeFormatter); + break; + case 3: + date = LocalDate.parse(year + "0701", dateTimeFormatter); + break; + case 4: + date = LocalDate.parse(year + "1001", dateTimeFormatter); + break; + } + return date; + } + + /** + * 季度末 + * @return + */ + public static LocalDate getQuarterEnd(LocalDate localDate){ + int year = localDate.getYear(); + LocalDate date = null; + switch (getQuarter(localDate)){ + case 1: + date = LocalDate.parse(year + "0331", dateTimeFormatter); + break; + case 2: + date = LocalDate.parse(year + "0630", dateTimeFormatter); + break; + case 3: + date = LocalDate.parse(year + "0930", dateTimeFormatter); + break; + case 4: + date = LocalDate.parse(year + "1231", dateTimeFormatter); + break; + } + return date; + } + + /** + * 半年初 + * @return + */ + public static LocalDate getHalfYearBegin(LocalDate localDate){ + int year = localDate.getYear(); + LocalDate date; + if (isFirstYear(localDate)){ + date = LocalDate.parse(year + "0101", dateTimeFormatter); + } else { + date = LocalDate.parse(year + "0701", dateTimeFormatter); + } + return date; + } + + /** + * 半年末 + * @return + */ + public static LocalDate getHalfYearEnd(LocalDate localDate){ + int year = localDate.getYear(); + LocalDate date; + if (isFirstYear(localDate)){ + date = LocalDate.parse(year + "0630", dateTimeFormatter); + } else { + date = LocalDate.parse(year + "1231", dateTimeFormatter); + } + return date; + } + + /** + * 年初 + * @return + */ + public static LocalDate getYearBegin(LocalDate date){ + return LocalDate.parse(date.getYear() + "0101", dateTimeFormatter); + } + + /** + * 年末 + * @return + */ + public static LocalDate getYearEnd(LocalDate date){ + return LocalDate.parse(date.getYear() + "1231", dateTimeFormatter); + } + + + /** + * 上月末 + * @return + */ + public static LocalDate getLastMonthEnd(LocalDate date){ + return date.minusMonths(1).dayOfMonth().withMaximumValue(); + } + + + /** + * 上季度末 + * @return + */ + public static LocalDate getLastQuarterEnd(LocalDate date){ + return getQuarterEnd(date.minusMonths(3)); + } + + + /** + * 去年末 + * @return + */ + public static LocalDate getLastYearEnd(LocalDate date){ + return getYearEnd(date.minusYears(1)); + } + + public static String calDate(Date date, int num, LocalDate localDate){ + LocalDate tmpDate; + if(num == 0){ + tmpDate = localDate; + } else { + switch (date.getCalRule()) { + case "day": + tmpDate = localDate.plusDays(num); + break; + case "month": + if (date.getValue().contains("begin")) { + tmpDate = localDate.plusMonths(num).dayOfMonth().withMinimumValue(); + } else if(date.getValue().contains("end")){ + tmpDate = localDate.plusMonths(num).dayOfMonth().withMaximumValue(); + } else { + tmpDate = localDate.plusMonths(num); + } + break; + case "quarter": + if (date.getValue().contains("begin")) { + tmpDate = localDate.plusMonths(num * 3).dayOfMonth().withMinimumValue(); + } else if(date.getValue().contains("end")){ + tmpDate = localDate.plusMonths(num * 3).dayOfMonth().withMaximumValue(); + } else { + tmpDate = localDate.plusMonths(num * 3); + } + break; + case "halfYear": + if (date.getValue().contains("begin")) { + tmpDate = localDate.plusMonths(num * 6).dayOfMonth().withMinimumValue(); + } else if(date.getValue().contains("end")){ + tmpDate = localDate.plusMonths(num * 6).dayOfMonth().withMaximumValue(); + } else { + tmpDate = localDate.plusMonths(num * 6); + } + break; + default: + if (date.getValue().contains("begin")) { + tmpDate = localDate.plusYears(num).dayOfMonth().withMinimumValue(); + } else if(date.getValue().contains("end")){ + tmpDate = localDate.plusYears(num).dayOfMonth().withMaximumValue(); + } else { + tmpDate = localDate.plusYears(num); + } + break; + } + } + return tmpDate.toString(date.getFormat()); + } + +}