taskInstanceIdentifySet = Collections.synchronizedSet(new HashSet<>());
-
- /**
- * put task instance to priority queue
- *
- * @param taskInstance taskInstance
- */
- @Override
- public void put(TaskInstance taskInstance) {
- Preconditions.checkNotNull(taskInstance);
- queue.add(taskInstance);
- taskInstanceIdentifySet.add(getTaskInstanceIdentify(taskInstance));
- }
-
- /**
- * take task info
- *
- * @return task instance
- * @throws TaskPriorityQueueException
- */
- @Override
- public TaskInstance take() throws TaskPriorityQueueException {
- TaskInstance taskInstance = queue.poll();
- if (taskInstance != null) {
- taskInstanceIdentifySet.remove(getTaskInstanceIdentify(taskInstance));
- }
- return taskInstance;
- }
-
- /**
- * poll task info with timeout
- *
- * WARN: Please use PriorityBlockingQueue if you want to use poll(timeout, unit)
- * because this method of override interface used without considering accuracy of timeout
- *
- * @param timeout
- * @param unit
- * @return
- * @throws TaskPriorityQueueException
- * @throws InterruptedException
- */
- @Override
- public TaskInstance poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException {
- throw new TaskPriorityQueueException(
- "This operation is not currently supported and suggest to use PriorityBlockingQueue if you want!");
- }
-
- /**
- * peek taskInfo
- *
- * @return task instance
- */
- public TaskInstance peek() {
- return queue.peek();
- }
-
- /**
- * queue size
- *
- * @return size
- */
- @Override
- public int size() {
- return queue.size();
- }
-
- /**
- * clear task
- *
- */
- public void clear() {
- queue.clear();
- taskInstanceIdentifySet.clear();
- }
-
- /**
- * whether contains the task instance
- *
- * @param taskInstance task instance
- * @return true is contains
- */
- public boolean contains(TaskInstance taskInstance) {
- Preconditions.checkNotNull(taskInstance);
- return taskInstanceIdentifySet.contains(getTaskInstanceIdentify(taskInstance));
- }
-
- /**
- * remove task
- *
- * @param taskInstance task instance
- * @return true if remove success
- */
- public boolean remove(TaskInstance taskInstance) {
- Preconditions.checkNotNull(taskInstance);
- taskInstanceIdentifySet.remove(getTaskInstanceIdentify(taskInstance));
- return queue.remove(taskInstance);
- }
-
- /**
- * get iterator
- *
- * @return Iterator
- */
- public Iterator iterator() {
- return queue.iterator();
- }
-
- // since the task instance will not contain taskInstanceId until insert into database
- // So we use processInstanceId + taskCode + version to identify a taskInstance.
- private String getTaskInstanceIdentify(TaskInstance taskInstance) {
- return String.join(
- String.valueOf(taskInstance.getWorkflowInstanceId()),
- String.valueOf(taskInstance.getTaskCode()),
- String.valueOf(taskInstance.getTaskDefinitionVersion()), "-");
- }
-
- /**
- * This comparator is used to sort task instances in the standby queue.
- * If the TaskInstance is in the same taskGroup, then we will sort the TaskInstance by {@link TaskInstance#getTaskGroupPriority()} in the taskGroup.
- * Otherwise, we will sort the TaskInstance by {@link TaskInstance#getTaskInstancePriority()} in the workflow.
- */
- private static class TaskInstancePriorityComparator implements Comparator {
-
- @Override
- public int compare(TaskInstance o1, TaskInstance o2) {
- int taskPriorityInTaskGroup = -1 * Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority());
- int taskInstancePriorityInWorkflow =
- Long.compare(o1.getTaskInstancePriority().getCode(), o2.getTaskInstancePriority().getCode());
-
- if (o1.getTaskGroupId() == o2.getTaskGroupId()) {
- // If at the same taskGroup
- if (taskPriorityInTaskGroup != 0) {
- return taskPriorityInTaskGroup;
- }
- }
- return taskInstancePriorityInWorkflow;
- }
- }
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
deleted file mode 100644
index 2989e098528b..000000000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.queue;
-
-import org.apache.dolphinscheduler.common.constants.Constants;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.Map;
-import java.util.Objects;
-
-import lombok.Data;
-
-@Data
-public class TaskPriority implements Comparable {
-
- private int workflowInstancePriority;
-
- private int workflowInstanceId;
-
- private int taskInstancePriority;
-
- private int taskId;
-
- private TaskExecutionContext taskExecutionContext;
-
- private String groupName;
-
- private Map context;
- private long checkpoint;
-
- private int taskGroupPriority;
-
- public TaskPriority() {
- this.checkpoint = System.currentTimeMillis();
- }
-
- public TaskPriority(int workflowInstancePriority,
- int workflowInstanceId,
- int taskInstancePriority,
- int taskId,
- int taskGroupPriority, String groupName) {
- this.workflowInstancePriority = workflowInstancePriority;
- this.workflowInstanceId = workflowInstanceId;
- this.taskInstancePriority = taskInstancePriority;
- this.taskId = taskId;
- this.taskGroupPriority = taskGroupPriority;
- this.groupName = groupName;
- this.checkpoint = System.currentTimeMillis();
- }
-
- @Override
- public int compareTo(TaskPriority other) {
- if (this.getWorkflowInstancePriority() > other.getWorkflowInstancePriority()) {
- return 1;
- }
- if (this.getWorkflowInstancePriority() < other.getWorkflowInstancePriority()) {
- return -1;
- }
-
- if (this.getWorkflowInstanceId() > other.getWorkflowInstanceId()) {
- return 1;
- }
- if (this.getWorkflowInstanceId() < other.getWorkflowInstanceId()) {
- return -1;
- }
-
- if (this.getTaskInstancePriority() > other.getTaskInstancePriority()) {
- return 1;
- }
- if (this.getTaskInstancePriority() < other.getTaskInstancePriority()) {
- return -1;
- }
- if (this.getTaskGroupPriority() != other.getTaskGroupPriority()) {
- // larger number, higher priority
- return Constants.OPPOSITE_VALUE
- * Integer.compare(this.getTaskGroupPriority(), other.getTaskGroupPriority());
- }
- if (this.getTaskId() > other.getTaskId()) {
- return 1;
- }
- if (this.getTaskId() < other.getTaskId()) {
- return -1;
- }
- String thisGroupName =
- StringUtils.isNotBlank(this.getGroupName()) ? this.getGroupName() : Constants.EMPTY_STRING;
- String otherGroupName =
- StringUtils.isNotBlank(other.getGroupName()) ? other.getGroupName() : Constants.EMPTY_STRING;
- if (!thisGroupName.equals(otherGroupName)) {
- return thisGroupName.compareTo(otherGroupName);
- }
- return Long.compare(this.getCheckpoint(), other.getCheckpoint());
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- TaskPriority that = (TaskPriority) o;
- return workflowInstancePriority == that.workflowInstancePriority
- && workflowInstanceId == that.workflowInstanceId
- && taskInstancePriority == that.taskInstancePriority
- && taskId == that.taskId
- && taskGroupPriority == that.taskGroupPriority
- && Objects.equals(groupName, that.groupName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(workflowInstancePriority,
- workflowInstanceId,
- taskInstancePriority,
- taskId,
- taskGroupPriority,
- groupName);
- }
-
- @Override
- public String toString() {
- return "TaskPriority{"
- + "workflowInstancePriority="
- + workflowInstancePriority
- + ", workflowInstanceId="
- + workflowInstanceId
- + ", taskInstancePriority="
- + taskInstancePriority
- + ", taskId="
- + taskId
- + ", taskExecutionContext="
- + taskExecutionContext
- + ", groupName='"
- + groupName
- + '\''
- + ", context="
- + context
- + ", checkpoint="
- + checkpoint
- + ", taskGroupPriority="
- + taskGroupPriority
- + '}';
- }
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
deleted file mode 100644
index 736e117fe508..000000000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.queue;
-
-import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * task priority queue
- * @param
- */
-public interface TaskPriorityQueue {
-
- /**
- * put task info
- *
- * @param taskInfo taskInfo
- * @throws TaskPriorityQueueException
- */
- void put(T taskInfo);
-
- /**
- * take taskInfo
- *
- * @return taskInfo
- * @throws TaskPriorityQueueException
- */
- T take() throws TaskPriorityQueueException, InterruptedException;
-
- /**
- * poll taskInfo with timeout
- * @param timeout
- * @param unit
- * @return
- * @throws TaskPriorityQueueException
- * @throws InterruptedException
- */
- T poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException;
-
- /**
- * size
- *
- * @return size
- * @throws TaskPriorityQueueException
- */
- int size() throws TaskPriorityQueueException;
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
deleted file mode 100644
index d9578b0757d7..000000000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.queue;
-
-import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
-
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.springframework.stereotype.Service;
-
-/**
- * A singleton of a task queue implemented using PriorityBlockingQueue
- */
-@Service
-public class TaskPriorityQueueImpl implements TaskPriorityQueue {
-
- /**
- * Task queue, this queue is unbounded, this means it will cause OutOfMemoryError.
- * The master will stop to generate the task if memory is too high.
- */
- private final PriorityBlockingQueue queue = new PriorityBlockingQueue<>(3000);
-
- /**
- * put task takePriorityInfo
- *
- * @param taskPriorityInfo takePriorityInfo
- */
- @Override
- public void put(TaskPriority taskPriorityInfo) {
- queue.put(taskPriorityInfo);
- }
-
- /**
- * take taskInfo
- *
- * @return taskInfo
- * @throws TaskPriorityQueueException
- */
- @Override
- public TaskPriority take() throws TaskPriorityQueueException, InterruptedException {
- return queue.take();
- }
-
- /**
- * poll taskInfo with timeout
- *
- * @param timeout
- * @param unit
- * @return
- * @throws TaskPriorityQueueException
- * @throws InterruptedException
- */
- @Override
- public TaskPriority poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException {
- return queue.poll(timeout, unit);
- }
-
- /**
- * queue size
- *
- * @return size
- * @throws TaskPriorityQueueException
- */
- @Override
- public int size() throws TaskPriorityQueueException {
- return queue.size();
- }
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/Constants.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/Constants.java
deleted file mode 100644
index 9ed22d6d6bef..000000000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/Constants.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.utils;
-
-import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-
-import lombok.experimental.UtilityClass;
-
-@UtilityClass
-public final class Constants {
-
- public static final int[] NOT_TERMINATED_STATES = new int[]{
- TaskExecutionStatus.DISPATCH.getCode(),
- WorkflowExecutionStatus.RUNNING_EXECUTION.getCode(),
- WorkflowExecutionStatus.READY_PAUSE.getCode(),
- WorkflowExecutionStatus.READY_STOP.getCode(),
- TaskExecutionStatus.NEED_FAULT_TOLERANCE.getCode(),
- };
-
- public static final int[] RUNNING_PROCESS_STATE = new int[]{
- TaskExecutionStatus.RUNNING_EXECUTION.getCode(),
- TaskExecutionStatus.SUBMITTED_SUCCESS.getCode(),
- TaskExecutionStatus.DISPATCH.getCode(),
- WorkflowExecutionStatus.SERIAL_WAIT.getCode()
- };
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
index cc4e9b7100e9..414e9970ec88 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
@@ -207,33 +207,6 @@ private static List getFlowNodeListPre(TaskNode startNode,
return resultList;
}
- /**
- * generate dag by start nodes and recovery nodes
- *
- * @param totalTaskNodeList totalTaskNodeList
- * @param startNodeNameList startNodeNameList
- * @param recoveryNodeCodeList recoveryNodeCodeList
- * @param depNodeType depNodeType
- * @return workflow dag
- * @throws Exception if error throws Exception
- */
- public static WorkflowDag generateFlowDag(List totalTaskNodeList,
- List startNodeNameList,
- List recoveryNodeCodeList,
- TaskDependType depNodeType) throws Exception {
-
- List destTaskNodeList = generateFlowNodeListByStartNode(totalTaskNodeList, startNodeNameList,
- recoveryNodeCodeList, depNodeType);
- if (destTaskNodeList.isEmpty()) {
- return null;
- }
- List taskNodeRelations = generateRelationListByFlowNodes(destTaskNodeList);
- WorkflowDag workflowDag = new WorkflowDag();
- workflowDag.setEdges(taskNodeRelations);
- workflowDag.setNodes(destTaskNodeList);
- return workflowDag;
- }
-
/**
* find node by node code
*
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java
deleted file mode 100644
index 54349af38bd7..000000000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.utils;
-
-import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
-import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_FATHER_PARAMS;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
-import org.apache.dolphinscheduler.dao.entity.WorkflowInstanceRelation;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import com.google.common.base.Strings;
-
-/**
- * Param Utility class
- */
-public class ParamUtils {
-
- /**
- * convert globalParams string to global parameter map
- * @param globalParams globalParams
- * @return parameter map
- */
- public static Map getGlobalParamMap(String globalParams) {
- List propList;
- Map globalParamMap = new HashMap<>();
- if (!Strings.isNullOrEmpty(globalParams)) {
- propList = JSONUtils.toList(globalParams, Property.class);
- globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
- }
- return globalParamMap;
- }
-
- /**
- * Get sub workflow parameters
- * @param instanceMap workflow instance map
- * @param parentWorkflowInstance parent workflow instance
- * @param fatherParams fatherParams
- * @return sub workflow parameters
- */
- public static String getSubWorkFlowParam(WorkflowInstanceRelation instanceMap,
- WorkflowInstance parentWorkflowInstance,
- Map fatherParams) {
- // set sub work workflow command
- String workflowMapStr = JSONUtils.toJsonString(instanceMap);
- Map cmdParam = JSONUtils.toMap(workflowMapStr);
- if (parentWorkflowInstance.isComplementData()) {
- Map parentParam = JSONUtils.toMap(parentWorkflowInstance.getCommandParam());
- String endTime = parentParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE);
- String startTime = parentParam.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE);
- String scheduleTime = parentParam.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
- if (StringUtils.isNotEmpty(startTime) && StringUtils.isNotEmpty(endTime)) {
- cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, endTime);
- cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, startTime);
- }
- if (StringUtils.isNotEmpty(scheduleTime)) {
- cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, scheduleTime);
- }
- workflowMapStr = JSONUtils.toJsonString(cmdParam);
- }
- if (MapUtils.isNotEmpty(fatherParams)) {
- cmdParam.put(CMD_PARAM_FATHER_PARAMS, JSONUtils.toJsonString(fatherParams));
- workflowMapStr = JSONUtils.toJsonString(cmdParam);
- }
- return workflowMapStr;
- }
-
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessData.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessData.java
deleted file mode 100644
index ae12269acf23..000000000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessData.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.utils;
-
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import org.apache.dolphinscheduler.service.model.TaskNode;
-
-import java.util.List;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
-
-@Data
-@NoArgsConstructor
-public class ProcessData {
-
- @EqualsAndHashCode.Include
- private List tasks;
-
- @EqualsAndHashCode.Include
- private List globalParams;
-
- private int timeout;
-
- private int tenantId;
-
- public ProcessData(List tasks, List globalParams) {
- this.tasks = tasks;
- this.globalParams = globalParams;
- }
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java
deleted file mode 100644
index f0c8c59ed3ad..000000000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.utils;
-
-import org.apache.dolphinscheduler.common.constants.Constants;
-import org.apache.dolphinscheduler.common.utils.FileUtils;
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-import lombok.NonNull;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * mainly used to get the start command line of a process.
- */
-@Slf4j
-public class ProcessUtils {
-
- /**
- * find logs and kill yarn tasks.
- *
- * @param taskExecutionContext taskExecutionContext
- * @return yarn application ids
- */
- public static @Nullable List killApplication(@NonNull List appIds,
- @NonNull TaskExecutionContext taskExecutionContext) {
- try {
- Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- if (CollectionUtils.isNotEmpty(appIds)) {
- taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, appIds));
- if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) {
- taskExecutionContext
- .setExecutePath(FileUtils
- .getTaskInstanceWorkingDirectory(taskExecutionContext.getTaskInstanceId()));
- }
- FileUtils.createDirectoryWith755(Paths.get(taskExecutionContext.getExecutePath()));
- org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(taskExecutionContext);
- return appIds;
- } else {
- log.info("The current appId is empty, don't need to kill the yarn job, taskInstanceId: {}",
- taskExecutionContext.getTaskInstanceId());
- }
- } catch (Exception e) {
- log.error("Kill yarn job failure, taskInstanceId: {}", taskExecutionContext.getTaskInstanceId(), e);
- }
- return Collections.emptyList();
- }
-}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java
index dc4547dcb5eb..3f246abbf805 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java
@@ -17,28 +17,16 @@
package org.apache.dolphinscheduler.service.command;
-import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_WORKFLOW_ID_STRING;
import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.enums.WarningType;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
-import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
-import org.apache.dolphinscheduler.dao.entity.WorkflowInstanceRelation;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -50,8 +38,6 @@
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
-import com.fasterxml.jackson.databind.JsonNode;
-
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class MessageServiceImplTest {
@@ -68,73 +54,6 @@ class MessageServiceImplTest {
@Mock
private ScheduleMapper scheduleMapper;
- @Test
- public void testCreateSubCommand() {
- WorkflowInstance parentInstance = new WorkflowInstance();
- parentInstance.setWarningType(WarningType.SUCCESS);
- parentInstance.setWarningGroupId(0);
-
- TaskInstance task = new TaskInstance();
- task.setTaskParams("{\"processDefinitionCode\":10}}");
- task.setId(10);
- task.setTaskCode(1L);
- task.setTaskDefinitionVersion(1);
-
- WorkflowInstance childInstance = null;
- WorkflowInstanceRelation instanceMap = new WorkflowInstanceRelation();
- instanceMap.setParentWorkflowInstanceId(1);
- instanceMap.setParentTaskInstanceId(10);
- Command command;
-
- // father history: start; child null == command type: start
- parentInstance.setHistoryCmd("START_PROCESS");
- parentInstance.setCommandType(CommandType.START_PROCESS);
- WorkflowDefinition workflowDefinition = new WorkflowDefinition();
- workflowDefinition.setCode(10L);
- Mockito.when(processDefineMapper.queryByDefineId(100)).thenReturn(workflowDefinition);
- Mockito.when(processDefineMapper.queryByCode(10L)).thenReturn(workflowDefinition);
- command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
- Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType());
-
- // father history: start,start failure; child null == command type: start
- parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
- parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
- command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
- Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType());
-
- // father history: scheduler,start failure; child null == command type: scheduler
- parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
- parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS");
- command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
- Assertions.assertEquals(CommandType.SCHEDULER, command.getCommandType());
-
- // father history: complement,start failure; child null == command type: complement
-
- String startString = "2020-01-01 00:00:00";
- String endString = "2020-01-10 00:00:00";
- parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
- parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS");
- Map complementMap = new HashMap<>();
- complementMap.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, startString);
- complementMap.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, endString);
- parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap));
- command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
- Assertions.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType());
-
- JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam());
- Date start = DateUtils.stringToDate(complementDate.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE).asText());
- Date end = DateUtils.stringToDate(complementDate.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE).asText());
- Assertions.assertEquals(startString, DateUtils.dateToString(start));
- Assertions.assertEquals(endString, DateUtils.dateToString(end));
-
- // father history: start,failure,start failure; child not null == command type: start failure
- childInstance = new WorkflowInstance();
- parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
- parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
- command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
- Assertions.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType());
- }
-
@Test
public void testVerifyIsNeedCreateCommand() {
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java
deleted file mode 100644
index 6c22944b50b1..000000000000
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.queue;
-
-import org.apache.dolphinscheduler.common.enums.Priority;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
-
-import java.util.concurrent.TimeUnit;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class StandByTaskInstancePriorityQueueTest {
-
- @Test
- public void put() throws TaskPriorityQueueException {
- StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
- TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
- TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
- queue.put(taskInstanceHigPriority);
- queue.put(taskInstanceMediumPriority);
- Assertions.assertEquals(2, queue.size());
- Assertions.assertTrue(queue.contains(taskInstanceHigPriority));
- Assertions.assertTrue(queue.contains(taskInstanceMediumPriority));
- }
-
- @Test
- public void take() throws Exception {
- StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
- int peekBeforeLength = queue.size();
- queue.take();
- Assertions.assertTrue(queue.size() < peekBeforeLength);
- }
-
- @Test
- public void poll() throws Exception {
- StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
- Assertions.assertThrows(TaskPriorityQueueException.class, () -> {
- queue.poll(1000, TimeUnit.MILLISECONDS);
- });
- }
-
- @Test
- public void peek() throws Exception {
- StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
- int peekBeforeLength = queue.size();
- Assertions.assertEquals(peekBeforeLength, queue.size());
- }
-
- @Test
- public void peekTaskGroupPriority() throws Exception {
- StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
-
- TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 2);
- TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 1);
- queue.put(taskInstanceMediumPriority);
- queue.put(taskInstanceHigPriority);
- TaskInstance taskInstance = queue.peek();
- queue.clear();
- Assertions.assertEquals(taskInstance.getName(), "high");
-
- taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
- taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 2);
- queue.put(taskInstanceMediumPriority);
- queue.put(taskInstanceHigPriority);
- taskInstance = queue.peek();
- queue.clear();
- Assertions.assertEquals("medium", taskInstance.getName());
-
- taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
- taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 2);
- queue.put(taskInstanceMediumPriority);
- queue.put(taskInstanceHigPriority);
- taskInstance = queue.peek();
- queue.clear();
- Assertions.assertEquals("medium", taskInstance.getName());
-
- taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
- taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
- queue.put(taskInstanceMediumPriority);
- queue.put(taskInstanceHigPriority);
- taskInstance = queue.peek();
- queue.clear();
- Assertions.assertEquals("high", taskInstance.getName());
-
- }
-
- @Test
- public void size() throws Exception {
- Assertions.assertEquals(2, getPeerTaskInstancePriorityQueue().size());
- }
-
- @Test
- public void contains() throws Exception {
- StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
- TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
- queue.put(taskInstanceMediumPriority);
- Assertions.assertTrue(queue.contains(taskInstanceMediumPriority));
- TaskInstance taskInstance2 = createTaskInstance("medium2", Priority.MEDIUM, 1);
- taskInstance2.setWorkflowInstanceId(2);
- Assertions.assertFalse(queue.contains(taskInstance2));
- }
-
- @Test
- public void remove() {
- StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
- TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
- queue.put(taskInstanceMediumPriority);
- int peekBeforeLength = queue.size();
- queue.remove(taskInstanceMediumPriority);
- Assertions.assertNotEquals(peekBeforeLength, queue.size());
- Assertions.assertFalse(queue.contains(taskInstanceMediumPriority));
- }
-
- /**
- * get queue
- *
- * @return queue
- * @throws Exception
- */
- private StandByTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception {
- StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
- TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
- TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
- taskInstanceHigPriority.setTaskGroupPriority(3);
- taskInstanceMediumPriority.setTaskGroupPriority(2);
- queue.put(taskInstanceMediumPriority);
- queue.put(taskInstanceHigPriority);
- return queue;
- }
-
- /**
- * create task instance
- *
- * @param name name
- * @param priority priority
- * @return
- */
- private TaskInstance createTaskInstance(String name, Priority priority, int taskGroupPriority) {
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setName(name);
- taskInstance.setTaskInstancePriority(priority);
- taskInstance.setTaskGroupPriority(taskGroupPriority);
- return taskInstance;
- }
-}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java
deleted file mode 100644
index aeaf079c9e3c..000000000000
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.queue;
-
-import org.apache.dolphinscheduler.common.enums.Priority;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class TaskPriorityQueueImplTest {
-
- @Test
- public void testSort() {
- TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, 1, "default");
- TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, 1, "default");
- TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, 1, "default");
- List taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo);
- Collections.sort(taskPriorities);
- Assertions.assertEquals(
- Arrays.asList(priorityOne, priorityTwo, priorityThree),
- taskPriorities);
-
- priorityOne = new TaskPriority(0, 1, 0, 0, 1, "default");
- priorityTwo = new TaskPriority(0, 2, 0, 0, 1, "default");
- priorityThree = new TaskPriority(0, 3, 0, 0, 1, "default");
- taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo);
- Collections.sort(taskPriorities);
- Assertions.assertEquals(
- Arrays.asList(priorityOne, priorityTwo, priorityThree),
- taskPriorities);
-
- priorityOne = new TaskPriority(0, 0, 1, 0, 1, "default");
- priorityTwo = new TaskPriority(0, 0, 2, 0, 1, "default");
- priorityThree = new TaskPriority(0, 0, 3, 0, 1, "default");
- taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo);
- Collections.sort(taskPriorities);
- Assertions.assertEquals(
- Arrays.asList(priorityOne, priorityTwo, priorityThree),
- taskPriorities);
-
- priorityOne = new TaskPriority(0, 0, 0, 1, 1, "default");
- priorityTwo = new TaskPriority(0, 0, 0, 2, 1, "default");
- priorityThree = new TaskPriority(0, 0, 0, 3, 1, "default");
- taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo);
- Collections.sort(taskPriorities);
- Assertions.assertEquals(
- Arrays.asList(priorityOne, priorityTwo, priorityThree),
- taskPriorities);
-
- priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1");
- priorityTwo = new TaskPriority(0, 0, 0, 0, 1, "default_2");
- priorityThree = new TaskPriority(0, 0, 0, 0, 1, "default_3");
- taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo);
- Collections.sort(taskPriorities);
- Assertions.assertEquals(
- Arrays.asList(priorityOne, priorityTwo, priorityThree),
- taskPriorities);
-
- priorityOne = new TaskPriority(0, 0, 0, 0, 2, "default_1");
- priorityTwo = new TaskPriority(0, 0, 0, 0, 1, "default_2");
- priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_3");
- taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo);
- Collections.sort(taskPriorities);
- Assertions.assertEquals(
- Arrays.asList(priorityThree, priorityOne, priorityTwo),
- taskPriorities);
-
- priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1");
- priorityTwo = new TaskPriority(0, 0, 0, 0, 1, "default_2");
- priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_3");
- taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo);
- Collections.sort(taskPriorities);
- Assertions.assertEquals(
- Arrays.asList(priorityThree, priorityOne, priorityTwo),
- taskPriorities);
-
- priorityTwo = new TaskPriority(0, 0, 0, 0, 1, "default_1");
- priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1");
- priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_1");
- taskPriorities = Arrays.asList(priorityTwo, priorityOne, priorityThree);
- Collections.sort(taskPriorities);
- Assertions.assertEquals(
- Arrays.asList(priorityThree, priorityTwo, priorityOne),
- taskPriorities);
- }
-
- @Test
- public void put() throws Exception {
- TaskPriorityQueue queue = getPriorityQueue();
- Assertions.assertEquals(2, queue.size());
- }
-
- @Test
- public void take() throws Exception {
- TaskPriorityQueue queue = getPriorityQueue();
- int peekBeforeLength = queue.size();
- queue.take();
- Assertions.assertTrue(queue.size() < peekBeforeLength);
- }
-
- @Test
- public void poll() throws Exception {
- TaskPriorityQueue queue = getPriorityQueue();
- int peekBeforeLength = queue.size();
- queue.poll(1000, TimeUnit.MILLISECONDS);
- queue.poll(1000, TimeUnit.MILLISECONDS);
- Assertions.assertEquals(0, queue.size());
- queue.poll(1000, TimeUnit.MILLISECONDS);
- }
-
- @Test
- public void size() throws Exception {
- Assertions.assertEquals(2, getPriorityQueue().size());
- }
-
- /**
- * get queue
- *
- * @return queue
- * @throws Exception
- */
- private TaskPriorityQueue getPriorityQueue() throws Exception {
- TaskPriorityQueue queue = new TaskPriorityQueueImpl();
- TaskPriority taskInstanceHigPriority = createTaskPriority(Priority.HIGH.getCode(), 1);
- TaskPriority taskInstanceMediumPriority = createTaskPriority(Priority.MEDIUM.getCode(), 2);
- queue.put(taskInstanceHigPriority);
- queue.put(taskInstanceMediumPriority);
- return queue;
- }
-
- /**
- * create task priority
- *
- * @param priority
- * @param processInstanceId
- * @return
- */
- private TaskPriority createTaskPriority(Integer priority, Integer processInstanceId) {
- TaskPriority priorityOne = new TaskPriority(priority, processInstanceId, 0, 0, 1, "default");
- return priorityOne;
- }
-}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
index f5d8b3674b5a..5489b9de8a2d 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
@@ -24,6 +24,7 @@
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
@@ -40,6 +41,10 @@
import java.util.Map;
import java.util.Set;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -679,4 +684,24 @@ public void testBuildDagGraph() {
Assertions.assertNotNull(dag);
}
+ @Data
+ @NoArgsConstructor
+ private static class ProcessData {
+
+ @EqualsAndHashCode.Include
+ private List tasks;
+
+ @EqualsAndHashCode.Include
+ private List globalParams;
+
+ private int timeout;
+
+ private int tenantId;
+
+ public ProcessData(List tasks, List globalParams) {
+ this.tasks = tasks;
+ this.globalParams = globalParams;
+ }
+ }
+
}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java
deleted file mode 100644
index cdd0d761a4c3..000000000000
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.utils;
-
-import java.util.Map;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class ParamUtilsTest {
-
- @Test
- public void testGetGlobalParamMap() {
- String globalParam = "[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]";
- Map globalParamMap = ParamUtils.getGlobalParamMap(globalParam);
- Assertions.assertEquals(globalParamMap.size(), 1);
- Assertions.assertEquals(globalParamMap.get("startParam1"), "");
-
- Map emptyParamMap = ParamUtils.getGlobalParamMap(null);
- Assertions.assertEquals(emptyParamMap.size(), 0);
- }
-}
From 8bec50317d3ffdddb25acf57d8b30d0ba9b27c91 Mon Sep 17 00:00:00 2001
From: xiangzihao <460888207@qq.com>
Date: Wed, 27 Nov 2024 19:08:09 +0800
Subject: [PATCH 4/4] [Chore] Add flame diagram in CI (#16847)
---
.github/workflows/docs.yml | 8 +++++++-
.github/workflows/unit-test.yml | 4 ++++
docs/docs/en/guide/parameter/context.md | 2 +-
docs/docs/zh/guide/parameter/context.md | 2 +-
4 files changed, 13 insertions(+), 3 deletions(-)
diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml
index 81eb7f20738a..6b9915297310 100644
--- a/.github/workflows/docs.yml
+++ b/.github/workflows/docs.yml
@@ -55,12 +55,18 @@ jobs:
timeout-minutes: 30
steps:
- uses: actions/checkout@v4
+ with:
+ submodules: true
+ - name: Collect Workflow Telemetry
+ uses: ./.github/actions/workflow-telemetry-action
+ with:
+ comment_on_pr: false
- run: sudo npm install -g markdown-link-check@3.11.2
- run: sudo apt install plocate -y
# NOTE: Change command from `find . -name "*.md"` to `find . -not -path "*/node_modules/*" -not -path "*/.tox/*" -name "*.md"`
# if you want to run check locally
- run: |
- for file in $(locate "$PWD*/*.md" | grep -v ./deploy/terraform/aws/README.md); do
+ for file in $(locate "$PWD*/*.md" | grep -v ./deploy/terraform/aws/README.md | grep -v ./.github); do
markdown-link-check -c .dlc.json -q "$file" &
done
wait
diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml
index 273b934f5c47..7782e14d668c 100644
--- a/.github/workflows/unit-test.yml
+++ b/.github/workflows/unit-test.yml
@@ -62,6 +62,10 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
+ - name: Collect Workflow Telemetry
+ uses: ./.github/actions/workflow-telemetry-action
+ with:
+ comment_on_pr: false
- name: Sanity Check
uses: ./.github/actions/sanity-check
with:
diff --git a/docs/docs/en/guide/parameter/context.md b/docs/docs/en/guide/parameter/context.md
index d69fce86d8a8..37a5f6599da5 100644
--- a/docs/docs/en/guide/parameter/context.md
+++ b/docs/docs/en/guide/parameter/context.md
@@ -14,7 +14,7 @@ DolphinScheduler allows parameter transfer between tasks. Currently, transfer di
* [SQL](../task/sql.md)
* [Procedure](../task/stored-procedure.md)
* [Python](../task/python.md)
-* [SubWorkflow](../task/sub-workflow)
+* [SubWorkflow](../task/sub-workflow.md)
* [Kubernetes](../task/kubernetes.md)
When defining an upstream node, if there is a need to transmit the result of that node to a dependency related downstream node. You need to set an `OUT` direction parameter to [Custom Parameters] of the [Current Node Settings]. If it is a sub-workflow node, there is no need to set a parameter in [Current Node Settings], but an `OUT` direction parameter needs to be set in the workflow definition of the sub-workflow.
diff --git a/docs/docs/zh/guide/parameter/context.md b/docs/docs/zh/guide/parameter/context.md
index 6b1246e9efa6..20f1578cd814 100644
--- a/docs/docs/zh/guide/parameter/context.md
+++ b/docs/docs/zh/guide/parameter/context.md
@@ -14,7 +14,7 @@ DolphinScheduler 允许在任务间进行参数传递,目前传递方向仅支
* [SQL](../task/sql.md)
* [Procedure](../task/stored-procedure.md)
* [Python](../task/python.md)
-* [SubWorkflow](../task/sub-workflow)
+* [SubWorkflow](../task/sub-workflow.md)
* [Kubernetes](../task/kubernetes.md)
当定义上游节点时,如果有需要将该节点的结果传递给有依赖关系的下游节点,需要在【当前节点设置】的【自定义参数】设置一个方向是 OUT 的变量。如果是 SubWorkflow 节点无需在【当前节点设置】中设置变量,需要在子流程的工作流定义中设置一个方向是 OUT 的变量。