Skip to content

Commit

Permalink
[Optimization-4047][web] Add push task into DolphinScheduler (#4048)
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo authored Dec 15, 2024
1 parent 52fee76 commit 04f9072
Show file tree
Hide file tree
Showing 6 changed files with 589 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ public class SchedulerController {
example = "1")
public Result<TaskDefinition> getTaskDefinition(@ApiParam(value = "dinky任务id") @RequestParam Long dinkyTaskId) {
TaskDefinition taskDefinitionInfo = schedulerService.getTaskDefinitionInfo(dinkyTaskId);
if (taskDefinitionInfo == null) {
return Result.failed(Status.DS_TASK_NOT_EXIST);
}
return Result.succeed(taskDefinitionInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.dinky.service.catalogue.CatalogueService;
import org.dinky.utils.JsonUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -97,7 +98,7 @@ public boolean pushAddTask(DinkyTaskRequest dinkyTaskRequest) {
// Get process from dolphin scheduler
ProcessDefinition process = processClient.getProcessDefinitionInfo(projectCode, processName);

String taskName = catalogue.getName() + ":" + catalogue.getId();
String taskName = catalogue.getName();
dinkyTaskRequest.setName(taskName);

TaskRequest taskRequest = new TaskRequest();
Expand Down Expand Up @@ -180,7 +181,8 @@ private void updateProcessDefinition(ProcessDefinition process, Long taskCode, T
}
List<ProcessTaskRelation> processTaskRelationList = dagData.getProcessTaskRelationList();
List<TaskDefinition> taskDefinitionList = dagData.getTaskDefinitionList();
List<DagNodeLocation> locations = JsonUtils.toList(process.getLocations(), DagNodeLocation.class);
List<DagNodeLocation> locations =
new ArrayList<>(JsonUtils.toList(process.getLocations(), DagNodeLocation.class));

if (CollUtil.isNotEmpty(locations)) {
boolean matched = locations.stream().anyMatch(location -> location.getTaskCode() == taskCode);
Expand Down Expand Up @@ -307,8 +309,7 @@ public List<TaskMainInfo> getTaskMainInfos(long dinkyTaskId) {
long projectCode = SystemInit.getProject().getCode();
List<TaskMainInfo> taskMainInfos = taskClient.getTaskMainInfos(projectCode, "", "", "");
// 去掉本身
taskMainInfos.removeIf(taskMainInfo ->
(catalogue.getName() + ":" + catalogue.getId()).equalsIgnoreCase(taskMainInfo.getTaskName()));
taskMainInfos.removeIf(taskMainInfo -> (catalogue.getName()).equalsIgnoreCase(taskMainInfo.getTaskName()));
return taskMainInfos;
}

Expand All @@ -331,18 +332,16 @@ public TaskDefinition getTaskDefinitionInfo(long dinkyTaskId) {
long projectCode = dinkyProject.getCode();

String processName = getDinkyNames(catalogue, 0);
String taskName = catalogue.getName() + ":" + catalogue.getId();
String taskName = catalogue.getName();
TaskMainInfo taskMainInfo = taskClient.getTaskMainInfo(projectCode, processName, taskName, TASK_TYPE);
TaskDefinition taskDefinition;
if (taskMainInfo == null) {
log.error(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST.getMessage(), processName, taskName);
throw new BusException(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST, processName, taskName);
return null;
}

taskDefinition = taskClient.getTaskDefinition(projectCode, taskMainInfo.getTaskCode());
if (taskDefinition == null) {
log.error(Status.DS_WORK_FLOW_NOT_SAVE.getMessage());
throw new BusException(Status.DS_WORK_FLOW_NOT_SAVE);
return null;
}

taskDefinition.setProcessDefinitionCode(taskMainInfo.getProcessDefinitionCode());
Expand Down Expand Up @@ -380,12 +379,12 @@ private String getDinkyNames(Catalogue catalogue, int i) {
throw new SchedulerException("Get Node List Error");
}

String name = i == 0 ? catalogue.getName() + ":" + catalogue.getId() : catalogue.getName();
String name = i == 0 ? catalogue.getName() : catalogue.getName();
String next = getDinkyNames(catalogue, ++i);

if (Strings.isNullOrEmpty(next)) {
return name;
}
return name + "_" + next;
return name + "/" + next;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import { l } from '@/utils/intl';
import { Badge, Space } from 'antd';
import { CheckboxOptionType } from 'antd/es/checkbox/Group';
import { DefaultOptionType } from 'antd/es/select';

/**
* priority list for select | 优先级列表
*/
export const PriorityList: DefaultOptionType[] = [
{
label: (
<Space>
<Badge color={'red'} />
Highest
</Space>
),
value: 'HIGHEST',
key: 'HIGHEST'
},
{
label: (
<Space>
<Badge color={'orange'} />
High
</Space>
),
value: 'HIGH',
key: 'HIGH'
},
{
label: (
<Space>
<Badge color={'blue'} />
Medium
</Space>
),
value: 'MEDIUM',
key: 'MEDIUM'
},
{
label: (
<Space>
<Badge color={'cyan'} />
Low
</Space>
),
value: 'LOW',
key: 'LOW'
},
{
label: (
<Space>
<Badge color={'purple'} />
Lowest
</Space>
),
value: 'LOWEST',
key: 'LOWEST'
}
];

export const TimeoutNotifyStrategy: CheckboxOptionType[] = [
{
label: l('datastudio.header.pushdolphin.timeoutFlag.warn'),
value: 'WARN'
},
{
label: l('datastudio.header.pushdolphin.timeoutFlag.failed'),
value: 'FAILED'
}
];
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import { DolphinTaskDefinition, PushDolphinParams } from '@/types/Studio/data.d';

export const transformPushDolphinParams = (
dolphinTaskDefinition: DolphinTaskDefinition,
pushDolphinParams: PushDolphinParams,
toFormValues: boolean
) => {
if (toFormValues && dolphinTaskDefinition) {
const transformValue: PushDolphinParams = {
...pushDolphinParams,
description: dolphinTaskDefinition.description,
timeoutFlag: dolphinTaskDefinition.timeoutFlag === 'OPEN',
flag: dolphinTaskDefinition.flag === 'YES',
isCache: dolphinTaskDefinition.isCache === 'YES',
upstreamCodes: dolphinTaskDefinition.upstreamTaskMap
? Object.keys(dolphinTaskDefinition.upstreamTaskMap)
: [],
timeoutNotifyStrategy:
dolphinTaskDefinition.timeoutNotifyStrategy === 'WARNFAILED'
? ['WARN', 'FAILED']
: [dolphinTaskDefinition.timeoutNotifyStrategy]
};
return transformValue;
} else {
const falseTransformValue: DolphinTaskDefinition = {
...dolphinTaskDefinition,
...pushDolphinParams,
description: pushDolphinParams.description,
timeoutFlag: pushDolphinParams.timeoutFlag ? 'OPEN' : 'CLOSE',
flag: pushDolphinParams.flag ? 'YES' : 'NO',
isCache: pushDolphinParams.isCache ? 'YES' : 'NO',
timeoutNotifyStrategy: (pushDolphinParams.timeoutNotifyStrategy as string[]).join('')
};
return falseTransformValue;
}
};
Loading

0 comments on commit 04f9072

Please sign in to comment.