Skip to content

Commit

Permalink
Add varibles params in APIController and Web (#2544)
Browse files Browse the repository at this point in the history
* Optimize the process

* add task varibles post

* fix world
  • Loading branch information
gaoyan1998 authored Nov 18, 2023
1 parent 3ec1e1f commit 6e9eafc
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

import org.dinky.data.annotations.Log;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskSubmitDto;
import org.dinky.data.enums.BusinessType;
import org.dinky.data.enums.Status;
import org.dinky.data.exception.NotSupportExplainExcepition;
import org.dinky.data.model.ID;
import org.dinky.data.model.job.JobInstance;
import org.dinky.data.result.Result;
import org.dinky.data.result.SqlExplainResult;
Expand Down Expand Up @@ -70,7 +70,8 @@ public class APIController {
@ApiOperation("Submit Task")
public Result<JobResult> submitTask(@RequestParam Integer id) throws Exception {
taskService.initTenantByTaskId(id);
JobResult jobResult = taskService.submitTask(id, null);
JobResult jobResult =
taskService.submitTask(TaskSubmitDto.builder().id(id).build());
if (jobResult.isSuccess()) {
return Result.succeed(jobResult, Status.EXECUTE_SUCCESS);
} else {
Expand All @@ -81,9 +82,9 @@ public Result<JobResult> submitTask(@RequestParam Integer id) throws Exception {
@PostMapping("/submitTask")
@ApiOperation("Submit Task")
// @Log(title = "Submit Task", businessType = BusinessType.SUBMIT)
public Result<JobResult> submitTask(@RequestBody ID id) throws Exception {
taskService.initTenantByTaskId(id.getId());
JobResult jobResult = taskService.submitTask(id.getId(), null);
public Result<JobResult> submitTask(@RequestBody TaskSubmitDto submitDto) throws Exception {
taskService.initTenantByTaskId(submitDto.getId());
JobResult jobResult = taskService.submitTask(submitDto);
if (jobResult.isSuccess()) {
return Result.succeed(jobResult, Status.EXECUTE_SUCCESS);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.dto.TaskSaveDTO;
import org.dinky.data.dto.TaskSubmitDto;
import org.dinky.data.enums.BusinessType;
import org.dinky.data.enums.JobLifeCycle;
import org.dinky.data.enums.ProcessType;
Expand Down Expand Up @@ -76,7 +77,8 @@ public class TaskController {
@Log(title = "Submit Task", businessType = BusinessType.SUBMIT)
@ExecuteProcess(type = ProcessType.FLINK_SUBMIT)
public Result<JobResult> submitTask(@ProcessId @RequestParam Integer id) throws Exception {
JobResult jobResult = taskService.submitTask(id, null);
JobResult jobResult =
taskService.submitTask(TaskSubmitDto.builder().id(id).build());
if (jobResult.isSuccess()) {
return Result.succeed(jobResult, Status.EXECUTE_SUCCESS);
} else {
Expand Down
3 changes: 0 additions & 3 deletions dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ public class TaskDTO extends AbstractStatementDTO {
notes = "The execution mode for the SQL query")
private String type;

@ApiModelProperty(value = "Check Point", dataType = "Integer", example = "1", notes = "Check point for the task")
private Integer checkPoint;

@ApiModelProperty(
value = "Save Point Strategy",
dataType = "Integer",
Expand Down
48 changes: 48 additions & 0 deletions dinky-admin/src/main/java/org/dinky/data/dto/TaskSubmitDto.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.data.dto;

import java.util.Map;

import io.swagger.annotations.ApiModelProperty;
import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class TaskSubmitDto {

@ApiModelProperty(value = "ID", dataType = "Integer", example = "6", notes = "The identifier of the execution")
private Integer id;

@ApiModelProperty(
value = "Save Point Path",
dataType = "String",
example = "/savepoints",
notes = "The path for save points")
private String savePointPath;

@ApiModelProperty(
value = "Variables",
dataType = "Map<String, String>",
example = "{\"key\": \"value\"}",
notes = "Variables")
private Map<String, String> variables;
}
3 changes: 2 additions & 1 deletion dinky-admin/src/main/java/org/dinky/service/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.dinky.data.dto.DebugDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.dto.TaskSubmitDto;
import org.dinky.data.enums.JobLifeCycle;
import org.dinky.data.exception.ExcuteException;
import org.dinky.data.exception.NotSupportExplainExcepition;
Expand Down Expand Up @@ -70,7 +71,7 @@ public interface TaskService extends ISuperService<Task> {
* @return A {@link JobResult} object representing the result of the submitted task.
* @throws ExcuteException If there is an error executing the task.
*/
JobResult submitTask(Integer id, String savePointPath) throws Exception;
JobResult submitTask(TaskSubmitDto submitDto) throws Exception;

/**
* Debug the given task and return the job result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.dinky.data.dto.DebugDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.dto.TaskSubmitDto;
import org.dinky.data.enums.JobLifeCycle;
import org.dinky.data.enums.JobStatus;
import org.dinky.data.enums.ProcessStepType;
Expand Down Expand Up @@ -98,6 +99,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -168,7 +170,9 @@ private String[] buildParams(int id) {
}

@ProcessStep(type = ProcessStepType.SUBMIT_PRECHECK)
public void preCheckTask(TaskDTO task) throws TaskNotDoneException {
public TaskDTO prepareTask(TaskSubmitDto submitDto) throws TaskNotDoneException {
TaskDTO task = this.getTaskInfoById(submitDto.getId());

log.info("Start check and config task, task:{}", task.getName());

Assert.notNull(task, Status.TASK_NOT_EXIST.getMessage());
Expand All @@ -181,6 +185,13 @@ public void preCheckTask(TaskDTO task) throws TaskNotDoneException {
throw new BusException(Status.TASK_STATUS_IS_NOT_DONE.getMessage());
}
}

if (StringUtils.isNotBlank(submitDto.getSavePointPath())) {
task.setSavePointStrategy(SavePointStrategy.CUSTOM.getValue());
task.setSavePointPath(submitDto.getSavePointPath());
}
task.setVariables(Optional.ofNullable(submitDto.getVariables()).orElse(new HashMap<>()));
return task;
}

@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE)
Expand Down Expand Up @@ -232,7 +243,11 @@ public String buildEnvSql(AbstractStatementDTO task) {
if (Asserts.isNotNullString(flinkWithSql)) {
sql += flinkWithSql + CommonConstant.LineSep;
}
task.setVariables(fragmentVariableService.listEnabledVariables());
// The order cannot be wrong here,
// and the variables from the parameter have the highest priority
Map<String, String> variables = fragmentVariableService.listEnabledVariables();
variables.putAll(task.getVariables());
task.setVariables(variables);
}
int envId = Optional.ofNullable(task.getEnvId()).orElse(-1);
if (envId > 0) {
Expand All @@ -248,22 +263,16 @@ public String buildEnvSql(AbstractStatementDTO task) {
}

@Override
public JobResult submitTask(Integer id, String savePointPath) throws Exception {
TaskDTO taskDTO = this.getTaskInfoById(id);

if (StringUtils.isNotBlank(savePointPath)) {
taskDTO.setSavePointStrategy(SavePointStrategy.CUSTOM.getValue());
taskDTO.setSavePointPath(savePointPath);
}
public JobResult submitTask(TaskSubmitDto submitDto) throws Exception {
// 注解自调用会失效,这里通过获取对象方法绕过此限制
TaskServiceImpl taskServiceBean = applicationContext.getBean(TaskServiceImpl.class);
taskServiceBean.preCheckTask(taskDTO);
TaskDTO taskDTO = taskServiceBean.prepareTask(submitDto);
// The job instance does not exist by default,
// so that it does not affect other operations, such as checking the jobmanager address
taskDTO.setJobInstanceId(null);
JobResult jobResult = taskServiceBean.executeJob(taskDTO);
log.info("Job Submit success");
Task task = new Task(id, jobResult.getJobInstanceId());
Task task = new Task(submitDto.getId(), jobResult.getJobInstanceId());
if (!this.updateById(task)) {
throw new BusException(Status.TASK_UPDATE_FAILED.getMessage());
}
Expand Down Expand Up @@ -306,7 +315,8 @@ public JobResult restartTask(Integer id, String savePointPath) throws Exception
cancelTaskJob(task);
}
}
return submitTask(id, savePointPath);
return submitTask(
TaskSubmitDto.builder().id(id).savePointPath(savePointPath).build());
}

@Override
Expand Down
76 changes: 76 additions & 0 deletions dinky-web/src/pages/DataStudio/LeftContainer/GlobaleVar/index.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import { queryList } from '@/services/api';
import { API_CONSTANTS } from '@/services/endpoints';
import { GlobalVar } from '@/types/RegCenter/data';
import { l } from '@/utils/intl';
import { EyeTwoTone } from '@ant-design/icons';
import ProTable, { ProColumns } from '@ant-design/pro-table';
import { Modal, Space, Typography } from 'antd';

const { Text, Link } = Typography;
const { confirm } = Modal;

const GlobalVariable = (props: any) => {
const columns: ProColumns<GlobalVar>[] = [
{
title: l('rc.gv.name'),
dataIndex: 'name',
sorter: true,
width: '80%',
ellipsis: true,
render: (_, record) => {
return (
<Space direction='vertical' size={0}>
<Text copyable>{`\$\{${record.name}\}`}</Text>
<Text type='secondary'>{record.note}</Text>
</Space>
);
}
},
{
title: l('rc.gv.value'),
dataIndex: 'id',
width: '20%',
render: (_, record) => (
<EyeTwoTone onClick={() => confirm({ content: record.fragmentValue })} />
)
}
];

return (
<ProTable<GlobalVar>
showHeader={false}
tableStyle={{ margin: 0, paddingInline: 0 }}
search={false}
defaultSize={'small'}
options={false}
type={'list'}
toolBarRender={() => [<Link href={'/#/registration/fragment'}>{l('rc.gv.Management')}</Link>]}
columns={columns}
request={(params) =>
queryList(API_CONSTANTS.GLOBAL_VARIABLE, { ...params, filter: { enabled: [1] } })
}
pagination={{ hideOnSinglePage: true, pageSize: 10, size: 'small', showTotal: () => '' }}
/>
);
};

export default GlobalVariable;
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import {
ProFormText
} from '@ant-design/pro-components';
import { Badge, Space, Typography } from 'antd';
import { useForm } from 'antd/es/form/Form';
import { debounce } from 'lodash';
import { useEffect } from 'react';
import { connect } from 'umi';
Expand Down
9 changes: 9 additions & 0 deletions dinky-web/src/pages/DataStudio/route.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import JsonToSql from '@/pages/DataStudio/BottomContainer/Tools/JsonToSql';
import { isSql } from '@/pages/DataStudio/HeaderContainer/service';
import Catalog from '@/pages/DataStudio/LeftContainer/Catalog';
import DataSource from '@/pages/DataStudio/LeftContainer/DataSource';
import GlobalVariable from '@/pages/DataStudio/LeftContainer/GlobaleVar';
import Project from '@/pages/DataStudio/LeftContainer/Project';
import { TabsPageSubType, TabsPageType } from '@/pages/DataStudio/model';
import HistoryVersion from '@/pages/DataStudio/RightContainer/HistoryVersion';
Expand All @@ -42,6 +43,7 @@ import {
DatabaseOutlined,
EnvironmentOutlined,
FolderOutlined,
FunctionOutlined,
HistoryOutlined,
InfoCircleOutlined,
InsertRowRightOutlined,
Expand Down Expand Up @@ -80,6 +82,13 @@ export const LeftSide = [
icon: <DatabaseOutlined />,
label: l('menu.datastudio.datasource'),
children: <DataSource />
},
{
auth: '/datastudio/left/globalVariable',
key: 'menu.registration.fragment',
icon: <FunctionOutlined />,
label: l('menu.registration.fragment'),
children: <GlobalVariable />
}
];

Expand Down

0 comments on commit 6e9eafc

Please sign in to comment.