Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement-16887][Dependent Task] Dependet task improvement #16910

Open
wants to merge 18 commits into
base: dev
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ContextType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.graph.DAG;
Expand All @@ -58,6 +59,9 @@
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstanceContext;
import org.apache.dolphinscheduler.dao.entity.TaskInstanceDependentResult;
import org.apache.dolphinscheduler.dao.entity.TaskInstanceDependentResultContext;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
Expand All @@ -70,6 +74,7 @@
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceContextDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceMapDao;
Expand All @@ -96,6 +101,7 @@

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -174,6 +180,9 @@ public class WorkflowInstanceServiceImpl extends BaseServiceImpl implements Work
@Autowired
private CuringParamsService curingGlobalParamsService;

@Autowired
private TaskInstanceContextDao taskInstanceContextDao;

/**
* return top n SUCCESS workflow instance order by running time which started between startTime and endTime
*/
Expand Down Expand Up @@ -245,7 +254,7 @@ public Map<String, Object> queryWorkflowInstanceById(User loginUser, long projec
workflowInstance.getWorkflowDefinitionVersion());

if (workflowDefinition == null || projectCode != workflowDefinition.getProjectCode()) {
log.error("workflow definition does not exist, projectCode:{}.", projectCode);
log.error("workflow definition does not exist, projectCode: {}.", projectCode);
putMsg(result, Status.WORKFLOW_DEFINITION_NOT_EXIST, workflowInstanceId);
} else {
workflowInstance.setLocations(workflowDefinition.getLocations());
Expand Down Expand Up @@ -460,15 +469,44 @@ public Map<String, Object> queryTaskListByWorkflowInstanceId(User loginUser, lon
List<TaskInstance> taskInstanceList =
taskInstanceDao.queryValidTaskListByWorkflowInstanceId(workflowInstanceId,
workflowInstance.getTestFlag());
List<TaskInstanceDependentResult> taskInstanceDependentResultList =
setTaskInstanceDependentResult(taskInstanceList);

Map<String, Object> resultMap = new HashMap<>();
resultMap.put(WORKFLOW_INSTANCE_STATE, workflowInstance.getState().toString());
resultMap.put(TASK_LIST, taskInstanceList);
resultMap.put(TASK_LIST, taskInstanceDependentResultList);
result.put(DATA_LIST, resultMap);

putMsg(result, Status.SUCCESS);
return result;
}

private List<TaskInstanceDependentResult> setTaskInstanceDependentResult(List<TaskInstance> taskInstanceList) {
List<TaskInstanceDependentResult> taskInstanceDependentResultList = taskInstanceList.stream()
.map(taskInstance -> {
TaskInstanceDependentResult taskInstanceDependentResult = new TaskInstanceDependentResult();
BeanUtils.copyProperties(taskInstance, taskInstanceDependentResult);
return taskInstanceDependentResult;
}).collect(Collectors.toList());
List<Integer> taskInstanceIdList = taskInstanceList.stream()
.map(TaskInstance::getId).collect(Collectors.toList());
List<TaskInstanceContext> taskInstanceContextList =
taskInstanceContextDao.batchQueryByTaskInstanceIdsAndContextType(taskInstanceIdList,
ContextType.DEPENDENT_RESULT);
for (TaskInstanceContext taskInstanceContext : taskInstanceContextList) {
for (TaskInstanceDependentResultContext taskInstanceDependentResultContext : taskInstanceContext
.getTaskDependentResultContext()) {
for (TaskInstanceDependentResult taskInstanceDependentResult : taskInstanceDependentResultList) {
if (taskInstanceDependentResult.getId().equals(taskInstanceContext.getTaskInstanceId())) {
taskInstanceDependentResult
.setTaskInstanceDependentResult(taskInstanceDependentResultContext);
}
}
}
}
return taskInstanceDependentResultList;
}

@Override
public List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User loginUser, Integer taskId) {
TaskInstance taskInstance = taskInstanceDao.queryById(taskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ContextType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
Expand All @@ -46,6 +47,8 @@
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstanceContext;
import org.apache.dolphinscheduler.dao.entity.TaskInstanceDependentResultContext;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
Expand All @@ -60,10 +63,12 @@
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceContextDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceMapDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.model.TaskNode;
Expand All @@ -90,6 +95,7 @@
import org.mockito.quality.Strictness;

import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
Expand Down Expand Up @@ -154,6 +160,9 @@ public class WorkflowInstanceServiceTest {
@Mock
private WorkflowInstanceMapDao workflowInstanceMapDao;

@Mock
private TaskInstanceContextDao taskInstanceContextDao;

private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
+ "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
Expand Down Expand Up @@ -465,6 +474,18 @@ public void testQueryTaskListByWorkflowInstanceId() throws IOException {
taskInstance.setTaskType("SHELL");
List<TaskInstance> taskInstanceList = new ArrayList<>();
taskInstanceList.add(taskInstance);
List<TaskInstanceDependentResultContext> taskInstanceDependentResultContextList = new ArrayList<>();
TaskInstanceContext taskInstanceContext = new TaskInstanceContext();
taskInstanceContext.setTaskInstanceId(0);
taskInstanceContext.setContextType(ContextType.DEPENDENT_RESULT);
TaskInstanceDependentResultContext taskInstanceDependentResultContext =
new TaskInstanceDependentResultContext();
taskInstanceDependentResultContext.setProjectCode(projectCode);
taskInstanceDependentResultContext.setDependentResult(DependResult.SUCCESS);
taskInstanceContext.setTaskDependentResultContext(
Lists.asList(taskInstanceDependentResultContext, new TaskInstanceDependentResultContext[0]));
List<Integer> taskInstanceIdList = new ArrayList<>();
taskInstanceIdList.add(0);
Result res = new Result();
res.setCode(Status.SUCCESS.ordinal());
res.setData("xxx");
Expand All @@ -476,6 +497,9 @@ public void testQueryTaskListByWorkflowInstanceId() throws IOException {
workflowInstance.getTestFlag()))
.thenReturn(taskInstanceList);
when(loggerService.queryLog(loginUser, taskInstance.getId(), 0, 4098)).thenReturn(res);
when(taskInstanceContextDao.batchQueryByTaskInstanceIdsAndContextType(taskInstanceIdList,
ContextType.DEPENDENT_RESULT))
.thenReturn(Lists.asList(taskInstanceContext, new TaskInstanceContext[0]));
Map<String, Object> successRes =
workflowInstanceService.queryTaskListByWorkflowInstanceId(loginUser, projectCode, 1);
Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,13 @@ public final class Constants {
public static final String SUBWORKFLOW_INSTANCE_ID = "subWorkflowInstanceId";
public static final String WORKFLOW_INSTANCE_STATE = "workflowInstanceState";
public static final String PARENT_WORKFLOW_INSTANCE = "parentWorkflowInstance";
public static final String DEPENDENCE = "dependence";
public static final String TASK_LIST = "taskList";
public static final String QUEUE = "queue";
public static final String QUEUE_NAME = "queueName";
public static final String DEPENDENT_SPLIT = ":||";

/**
* dependent task
*/
public static final long DEPENDENT_ALL_TASK_CODE = -1;
public static final long DEPENDENT_WORKFLOW_CODE = 0;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.enums;

import lombok.Getter;

import com.baomidou.mybatisplus.annotation.EnumValue;

@Getter
public enum ContextType {

DEPENDENT_RESULT(1, "dependent task result");

@EnumValue
ruanwenjun marked this conversation as resolved.
Show resolved Hide resolved
private final int code;
private final String desc;

ContextType(int code, String desc) {
this.code = code;
this.desc = desc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.dolphinscheduler.dao.entity;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
Expand Down Expand Up @@ -259,10 +258,6 @@ public Map<String, String> getTaskParamMap() {
return taskParamMap;
}

public String getDependence() {
return JSONUtils.getNodeString(this.taskParams, Constants.DEPENDENCE);
}

public Integer getCpuQuota() {
return cpuQuota == null ? -1 : cpuQuota;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import java.util.Date;

import lombok.Data;

import com.baomidou.mybatisplus.annotation.TableName;

/**
* task definition log
*/
@Data
@TableName("t_ds_task_definition_log")
public class TaskDefinitionLog extends TaskDefinition {

Expand Down Expand Up @@ -77,22 +80,6 @@ public TaskDefinitionLog(TaskDefinition taskDefinition) {
this.setTaskExecuteType(taskDefinition.getTaskExecuteType());
}

public int getOperator() {
return operator;
}

public void setOperator(int operator) {
this.operator = operator;
}

public Date getOperateTime() {
return operateTime;
}

public void setOperateTime(Date operateTime) {
this.operateTime = operateTime;
}

@Override
public boolean equals(Object o) {
return super.equals(o);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.entity;

import org.apache.dolphinscheduler.common.enums.ContextType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;

import java.util.Date;
import java.util.List;

import lombok.Data;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;

@Data
@TableName("t_ds_task_instance_context")
public class TaskInstanceContext {

@TableId(value = "id", type = IdType.AUTO)
private Integer id;

private Integer taskInstanceId;

private String context;

private ContextType contextType;
ruanwenjun marked this conversation as resolved.
Show resolved Hide resolved

private Date createTime;

private Date updateTime;

public List<TaskInstanceDependentResultContext> getTaskDependentResultContext() {
if (contextType == ContextType.DEPENDENT_RESULT && context != null) {
return JSONUtils.toList(context, TaskInstanceDependentResultContext.class);
}
return null;
}

public void setTaskDependentResultContext(List<TaskInstanceDependentResultContext> taskInstanceDependentResultContexts) {
this.context = JSONUtils.toJsonString(taskInstanceDependentResultContexts);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.entity;

import lombok.Data;
import lombok.EqualsAndHashCode;

@EqualsAndHashCode(callSuper = true)
Fixed Show fixed Hide fixed
@Data
public class TaskInstanceDependentResult extends TaskInstance {
ruanwenjun marked this conversation as resolved.
Show resolved Hide resolved

private TaskInstanceDependentResultContext taskInstanceDependentResult;

}
Loading
Loading