Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement-16887][Dependent Task] Dependet task improvement #16910

Open
wants to merge 18 commits into
base: dev
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@
import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskDependentResult;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstanceDependentResult;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
Expand All @@ -70,6 +72,7 @@
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.TaskDependentResultDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceMapDao;
Expand All @@ -96,6 +99,7 @@

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -174,6 +178,9 @@ public class WorkflowInstanceServiceImpl extends BaseServiceImpl implements Work
@Autowired
private CuringParamsService curingGlobalParamsService;

@Autowired
private TaskDependentResultDao taskDependentResultDao;

/**
* return top n SUCCESS workflow instance order by running time which started between startTime and endTime
*/
Expand Down Expand Up @@ -245,7 +252,7 @@ public Map<String, Object> queryWorkflowInstanceById(User loginUser, long projec
workflowInstance.getWorkflowDefinitionVersion());

if (workflowDefinition == null || projectCode != workflowDefinition.getProjectCode()) {
log.error("workflow definition does not exist, projectCode:{}.", projectCode);
log.error("workflow definition does not exist, projectCode: {}.", projectCode);
putMsg(result, Status.WORKFLOW_DEFINITION_NOT_EXIST, workflowInstanceId);
} else {
workflowInstance.setLocations(workflowDefinition.getLocations());
Expand Down Expand Up @@ -460,15 +467,39 @@ public Map<String, Object> queryTaskListByWorkflowInstanceId(User loginUser, lon
List<TaskInstance> taskInstanceList =
taskInstanceDao.queryValidTaskListByWorkflowInstanceId(workflowInstanceId,
workflowInstance.getTestFlag());
List<TaskInstanceDependentResult> taskInstanceDependentResultList =
setTaskInstanceDependentResult(taskInstanceList);

Map<String, Object> resultMap = new HashMap<>();
resultMap.put(WORKFLOW_INSTANCE_STATE, workflowInstance.getState().toString());
resultMap.put(TASK_LIST, taskInstanceList);
resultMap.put(TASK_LIST, taskInstanceDependentResultList);
result.put(DATA_LIST, resultMap);

putMsg(result, Status.SUCCESS);
return result;
}

private List<TaskInstanceDependentResult> setTaskInstanceDependentResult(List<TaskInstance> taskInstanceList) {
List<TaskInstanceDependentResult> taskInstanceDependentResultList = taskInstanceList.stream()
.map(taskInstance -> {
TaskInstanceDependentResult taskInstanceDependentResult = new TaskInstanceDependentResult();
BeanUtils.copyProperties(taskInstance, taskInstanceDependentResult);
return taskInstanceDependentResult;
}).collect(Collectors.toList());
List<Integer> taskInstanceIdList = taskInstanceList.stream()
.map(TaskInstance::getId).collect(Collectors.toList());
List<TaskDependentResult> taskDependentResultList =
taskDependentResultDao.batchQueryTaskDependentResultByTaskInstanceIds(taskInstanceIdList);
for (TaskInstanceDependentResult taskInstanceDependentResult : taskInstanceDependentResultList) {
for (TaskDependentResult taskDependentResult : taskDependentResultList) {
if (taskInstanceDependentResult.getId().equals(taskDependentResult.getTaskInstanceId())) {
taskInstanceDependentResult.setTaskDependentResult(taskDependentResult);
}
}
}
return taskInstanceDependentResultList;
}

@Override
public List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User loginUser, Integer taskId) {
TaskInstance taskInstance = taskInstanceDao.queryById(taskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,13 @@ public final class Constants {
public static final String SUBWORKFLOW_INSTANCE_ID = "subWorkflowInstanceId";
public static final String WORKFLOW_INSTANCE_STATE = "workflowInstanceState";
public static final String PARENT_WORKFLOW_INSTANCE = "parentWorkflowInstance";
public static final String DEPENDENCE = "dependence";
public static final String TASK_LIST = "taskList";
public static final String QUEUE = "queue";
public static final String QUEUE_NAME = "queueName";
public static final String DEPENDENT_SPLIT = ":||";

/**
* dependent task
*/
public static final long DEPENDENT_ALL_TASK_CODE = -1;
public static final long DEPENDENT_WORKFLOW_CODE = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.dolphinscheduler.dao.entity;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
Expand Down Expand Up @@ -259,10 +258,6 @@ public Map<String, String> getTaskParamMap() {
return taskParamMap;
}

public String getDependence() {
return JSONUtils.getNodeString(this.taskParams, Constants.DEPENDENCE);
}

public Integer getCpuQuota() {
return cpuQuota == null ? -1 : cpuQuota;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import java.util.Date;

import lombok.Data;

import com.baomidou.mybatisplus.annotation.TableName;

/**
* task definition log
*/
@Data
@TableName("t_ds_task_definition_log")
public class TaskDefinitionLog extends TaskDefinition {

Expand Down Expand Up @@ -77,22 +80,6 @@ public TaskDefinitionLog(TaskDefinition taskDefinition) {
this.setTaskExecuteType(taskDefinition.getTaskExecuteType());
}

public int getOperator() {
return operator;
}

public void setOperator(int operator) {
this.operator = operator;
}

public Date getOperateTime() {
return operateTime;
}

public void setOperateTime(Date operateTime) {
this.operateTime = operateTime;
}

@Override
public boolean equals(Object o) {
return super.equals(o);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.entity;

import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;

import java.util.Date;

import lombok.Data;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;

@Data
@TableName("t_ds_task_dependent_result")
public class TaskDependentResult {

@TableId(value = "id", type = IdType.AUTO)
private Integer id;

private Integer taskInstanceId;

private Long projectCode;

private Long workflowDefinitionCode;

private Long taskDefinitionCode;

private DependResult dependentResult;

private Date createTime;

private Date updateTime;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.entity;

import lombok.Data;
import lombok.EqualsAndHashCode;

@EqualsAndHashCode(callSuper = true)
Fixed Show fixed Hide fixed
@Data
public class TaskInstanceDependentResult extends TaskInstance {
ruanwenjun marked this conversation as resolved.
Show resolved Hide resolved

private TaskDependentResult taskDependentResult;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.mapper;

import org.apache.dolphinscheduler.dao.entity.TaskDependentResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;

import org.apache.ibatis.annotations.Param;

import java.util.List;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;

public interface TaskDependentResultMapper extends BaseMapper<TaskDependentResult> {

List<TaskDependentResult> queryTaskDependentResultListByTaskInstanceId(@Param("taskInstanceId") long taskInstanceId);

int deleteTaskDependentResultByTaskInstanceId(@Param("taskInstanceId") int taskInstanceId);

TaskDependentResult queryTaskDependentResultByTaskDependentResult(@Param("taskDependentResult") TaskDependentResult taskDependentResult);

int updateDependentResultByTaskInstanceId(@Param("dependentResult") DependResult dependentResult,
@Param("taskInstanceId") long taskInstanceId);

List<TaskDependentResult> batchQueryTaskDependentResultByTaskInstanceIds(@Param("taskInstanceIds") List<Integer> taskInstanceIds);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.repository;

import org.apache.dolphinscheduler.dao.entity.TaskDependentResult;

import java.util.List;

public interface TaskDependentResultDao extends IDao<TaskDependentResult> {

List<TaskDependentResult> queryTaskDependentResultByTaskInstanceId(Integer taskInstanceId);

int deleteTaskDependentResultByTaskInstanceId(Integer taskInstanceId);

int upsertTaskDependentResult(TaskDependentResult taskDependentResult);

List<TaskDependentResult> batchQueryTaskDependentResultByTaskInstanceIds(List<Integer> taskInstanceIds);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.repository.impl;

import org.apache.dolphinscheduler.dao.entity.TaskDependentResult;
import org.apache.dolphinscheduler.dao.mapper.TaskDependentResultMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.TaskDependentResultDao;

import org.apache.commons.collections4.CollectionUtils;

import java.util.Collections;
import java.util.List;

import org.springframework.stereotype.Repository;

@Repository
public class TaskDependentResultDaoImpl extends BaseDao<TaskDependentResult, TaskDependentResultMapper>
implements
TaskDependentResultDao {

public TaskDependentResultDaoImpl(TaskDependentResultMapper taskDependentResultMapper) {
super(taskDependentResultMapper);
}

@Override
public List<TaskDependentResult> queryTaskDependentResultByTaskInstanceId(Integer taskInstanceId) {
if (taskInstanceId == null) {
return Collections.emptyList();
}
return mybatisMapper.queryTaskDependentResultListByTaskInstanceId(taskInstanceId);
}

@Override
public int deleteTaskDependentResultByTaskInstanceId(Integer taskInstanceId) {
if (taskInstanceId == null) {
return 0;
}
return mybatisMapper.deleteTaskDependentResultByTaskInstanceId(taskInstanceId);
}

@Override
public int upsertTaskDependentResult(TaskDependentResult taskDependentResult) {
if (taskDependentResult == null) {
return 0;
}
TaskDependentResult dbTaskDependentResult =
mybatisMapper.queryTaskDependentResultByTaskDependentResult(taskDependentResult);
if (dbTaskDependentResult == null) {
return mybatisMapper.insert(taskDependentResult);
} else {
return mybatisMapper.updateDependentResultByTaskInstanceId(taskDependentResult.getDependentResult(),
taskDependentResult.getTaskInstanceId());
}
}

@Override
public List<TaskDependentResult> batchQueryTaskDependentResultByTaskInstanceIds(List<Integer> taskInstanceIds) {
if (CollectionUtils.isEmpty(taskInstanceIds)) {
return Collections.emptyList();
}
return mybatisMapper.batchQueryTaskDependentResultByTaskInstanceIds(taskInstanceIds);
}
}
Loading
Loading