diff --git a/dinky-admin/src/main/java/org/dinky/aop/LogAspect.java b/dinky-admin/src/main/java/org/dinky/aop/LogAspect.java index fa219adb4f..a394f88fac 100644 --- a/dinky-admin/src/main/java/org/dinky/aop/LogAspect.java +++ b/dinky-admin/src/main/java/org/dinky/aop/LogAspect.java @@ -117,7 +117,6 @@ protected void handleCommonLogic(final JoinPoint joinPoint, final Exception e, O if (e != null) { operLog.setStatus(BusinessStatus.FAIL.ordinal()); - log.error("pre doAfterThrowing Exception:{}", e.getMessage()); operLog.setErrorMsg(StringUtils.substring(e.getMessage(), 0, 2000)); } operLog.setStatus(BusinessStatus.SUCCESS.ordinal()); @@ -137,8 +136,7 @@ protected void handleCommonLogic(final JoinPoint joinPoint, final Exception e, O } catch (Exception exp) { // 记录本地异常日志 - log.error("pre doAfterThrowing Exception:{}", exp.getMessage()); - exp.printStackTrace(); + log.error("pre doAfterThrowing Exception:", exp); } } diff --git a/dinky-admin/src/main/java/org/dinky/aop/ProcessAspect.java b/dinky-admin/src/main/java/org/dinky/aop/ProcessAspect.java new file mode 100644 index 0000000000..3e8a5c3946 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/aop/ProcessAspect.java @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.aop; + +import org.dinky.context.ConsoleContextHolder; +import org.dinky.process.annotations.ExecuteProcess; +import org.dinky.process.annotations.ProcessId; +import org.dinky.process.annotations.ProcessStep; +import org.dinky.process.enums.ProcessStatus; +import org.dinky.process.enums.ProcessStepType; +import org.dinky.process.enums.ProcessType; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; + +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.reflect.MethodSignature; +import org.slf4j.MDC; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Aspect +@Slf4j +@Component +public class ProcessAspect { + + public static String PROCESS_NAME = "name"; + public static String PROCESS_STEP = "step"; + public ConsoleContextHolder contextHolder = ConsoleContextHolder.getInstances(); + + /** + * Block all {@link ExecuteProcess} annotations, + * As the beginning of the process, set all initialization information + */ + @Around(value = "@annotation(executeProcess)") + public Object processAround(ProceedingJoinPoint joinPoint, ExecuteProcess executeProcess) throws Throwable { + + Object result; + Object processId = getProcessId(joinPoint); + String name = executeProcess.type() + String.valueOf(processId); + ProcessType type = executeProcess.type(); + contextHolder.registerProcess(type, name); + MDC.put(PROCESS_NAME, name); + + try { + result = joinPoint.proceed(); + contextHolder.finishedProcess(name, ProcessStatus.FINISHED, null); + } catch (Throwable e) { + contextHolder.finishedProcess(name, ProcessStatus.FAILED, e); + throw e; + } finally { + // Note that this must be cleaned up,Otherwise, the situation of OOM may occur + MDC.clear(); + } + return result; + } + + /** + * Block all {@link ProcessStep} annotations, + * As a specific task step + */ + @Around(value = "@annotation(processStep)") + public Object processStepAround(ProceedingJoinPoint joinPoint, ProcessStep processStep) throws Throwable { + + Object result; + // Record the current step and restore it after the execution is completed + String parentStep = MDC.get(PROCESS_STEP); + ProcessStepType processStepType = processStep.type(); + MDC.put(PROCESS_STEP, processStepType.getValue()); + contextHolder.registerProcessStep(processStepType, MDC.get(PROCESS_NAME)); + + try { + result = joinPoint.proceed(); + contextHolder.finishedStep(MDC.get(PROCESS_NAME), processStepType, ProcessStatus.FINISHED, null); + } catch (Exception e) { + contextHolder.finishedStep(MDC.get(PROCESS_NAME), processStepType, ProcessStatus.FAILED, e); + throw e; + } finally { + // If a parent step exists, it is restored after the execution is complete + if (parentStep != null) { + MDC.put(PROCESS_STEP, parentStep); + } + } + return result; + } + + private Object getProcessId(ProceedingJoinPoint joinPoint) { + Object[] params = joinPoint.getArgs(); + if (params.length == 0) { + throw new IllegalArgumentException("Must have ProcessId params"); + } + + // Get the method, here you can convert the signature strong to MethodSignature + MethodSignature signature = (MethodSignature) joinPoint.getSignature(); + Method method = signature.getMethod(); + + Annotation[][] annotations = method.getParameterAnnotations(); + for (int i = 0; i < annotations.length; i++) { + Object param = params[i]; + Annotation[] paramAnn = annotations[i]; + for (Annotation annotation : paramAnn) { + if (annotation instanceof ProcessId) { + return param; + } + } + } + throw new IllegalArgumentException("Must have ProcessId annoation params"); + } +} diff --git a/dinky-admin/src/main/java/org/dinky/configure/schedule/metrics/GatherSysIndicator.java b/dinky-admin/src/main/java/org/dinky/configure/schedule/metrics/GatherSysIndicator.java index 3239aa0b32..9392ea8e12 100644 --- a/dinky-admin/src/main/java/org/dinky/configure/schedule/metrics/GatherSysIndicator.java +++ b/dinky-admin/src/main/java/org/dinky/configure/schedule/metrics/GatherSysIndicator.java @@ -93,7 +93,7 @@ public void updateState() { metrics.setContent(metricsTotal); metrics.setHeartTime(now); metrics.setModel(MetricsType.LOCAL.getType()); - MetricsContextHolder.sendAsync(metrics); + MetricsContextHolder.getInstances().sendAsync(metrics.getModel(), metrics); log.debug("Collecting jvm information ends."); } diff --git a/dinky-admin/src/main/java/org/dinky/context/BaseSseContext.java b/dinky-admin/src/main/java/org/dinky/context/BaseSseContext.java new file mode 100644 index 0000000000..0737fdcae2 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/context/BaseSseContext.java @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.context; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalNotification; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class BaseSseContext { + + /** + * Cache that stores SseEmitter objects for sending metric data, + * prevents OOM with LoadingCache, and is automatically removed when objects + * in the cache are not accessed or used for more than 60 seconds. + */ + protected LoadingCache> sseList = CacheBuilder.newBuilder() + .expireAfterAccess(60, TimeUnit.SECONDS) + .removalListener(this::onRemove) + .build(CacheLoader.from(key -> new ArrayList<>())); + + /** + * Called during an asynchronous send procedure + */ + public abstract void append(K key, V o); + + /** + * send data asynchronously. + */ + public void sendAsync(K key, V o) { + CompletableFuture.runAsync(() -> { + append(key, o); + send(key, o); + }); + } + + /** + * send data. + */ + protected void send(K key, V o) { + List sseEmitters = sseList.getIfPresent(key); + if (sseEmitters != null) { + sseEmitters.forEach(sseEmitter -> { + try { + sseEmitter.send(o); + } catch (Exception e) { + log.warn("send metrics error:{}", e.getMessage()); + closeSse(sseEmitter); + sseEmitters.remove(sseEmitter); + } + }); + } + } + + /** + * remove the SseEmitter object from the cache + * When the connection times out or actively exits. + * + * @param removalNotification RemovalNotification object + */ + protected void onRemove(RemovalNotification> removalNotification) { + assert removalNotification.getValue() != null; + removalNotification.getValue().forEach(this::closeSse); + } + + /** + * close the SseEmitter object. + * + * @param sseEmitter SseEmitter object + */ + protected void closeSse(SseEmitter sseEmitter) { + try { + sseEmitter.complete(); + } catch (Exception e) { + log.warn("complete sseEmitter failed:{}", e.getMessage()); + } + } +} diff --git a/dinky-admin/src/main/java/org/dinky/context/ConsoleContextHolder.java b/dinky-admin/src/main/java/org/dinky/context/ConsoleContextHolder.java new file mode 100644 index 0000000000..20cf655925 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/context/ConsoleContextHolder.java @@ -0,0 +1,234 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.context; + +import org.dinky.data.exception.BusException; +import org.dinky.process.enums.ProcessStatus; +import org.dinky.process.enums.ProcessStepType; +import org.dinky.process.enums.ProcessType; +import org.dinky.process.model.ProcessEntity; +import org.dinky.process.model.ProcessStep; +import org.dinky.utils.LogUtil; + +import java.nio.charset.Charset; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import com.alibaba.fastjson2.JSONObject; + +import cn.hutool.core.io.FileUtil; +import cn.hutool.core.io.IORuntimeException; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.text.StrFormatter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ConsoleContextHolder extends BaseSseContext { + protected static final ConsoleContextHolder instance = new ConsoleContextHolder(); + + /** + * Get an instance of ConsoleContextHolder. + * + * @return ConsoleContextHolder instance + */ + public static ConsoleContextHolder getInstances() { + return instance; + } + + private final Map logPross = new ConcurrentHashMap<>(); + + /** + * Get a list of all processes + * */ + public List list() { + return new ArrayList<>(logPross.values()); + } + + /** + * Add the SseEmitter object to the context. + * + * @param processName process name, which is used as an indication for the keyword list + * @param sseEmitter SseEmitter object + */ + public void addSse(String processName, SseEmitter sseEmitter) { + List emitters = sseList.getIfPresent(processName); + if (emitters == null) { + emitters = new ArrayList<>(); + sseList.put(processName, emitters); + } + emitters.add(sseEmitter); + + if (logPross.containsKey(processName)) { + sendAsync(processName, logPross.get(processName)); + } else { + String filePath = String.format("%s/tmp/log/%s.json", System.getProperty("user.dir"), processName); + try { + String string = FileUtil.readString(filePath, Charset.defaultCharset()); + ProcessEntity entity = JSONObject.parseObject(string, ProcessEntity.class); + sendAsync(processName, entity); + } catch (IORuntimeException e) { + log.warn("{} have no cache files", processName); + } + } + } + + @Override + public void append(String key, ProcessEntity o) {} + + /** + * Add log messages to specific processes and process steps. + * + * @param processName process name + * @param processStep process step type + * @param log messages + * @throws BusException Throws an exception if the process does not exist + */ + public void appendLog(String processName, ProcessStepType processStep, String log) { + if (!logPross.containsKey(processName)) { + throw new BusException(StrFormatter.format("process {} does not exist", processName)); + } + logPross.get(processName).appendLog(log); + Map parentStep = getParentNode(processStep, getStepsMap(processName)); + parentStep.get(processStep.getValue()).appendLog(log); + sendAsync(processName, logPross.get(processName)); + } + + /** + * Register a new process. + * + * @param type process type + * @param processName process name + * @throws RuntimeException Throws an exception if the process already exists + */ + public void registerProcess(ProcessType type, String processName) throws RuntimeException { + if (logPross.containsKey(processName)) { + throw new BusException("Another user is running an action to suppress this request"); + } + ProcessEntity entity = ProcessEntity.builder() + .pid(processName) + .log(new StringBuilder()) + .errLog(new StringBuilder()) + .status(ProcessStatus.INITIALIZING) + .type(type) + .name(processName) + .startTime(LocalDateTime.now()) + .stepsMap(new LinkedHashMap<>()) + .build(); + logPross.put(processName, entity); + } + + /** + * Register a new process step. + * + * @param type process step type + * @param processName process name + * @throws RuntimeException Throws an exception if the process does not exist + */ + public void registerProcessStep(ProcessStepType type, String processName) throws RuntimeException { + if (!logPross.containsKey(processName)) { + throw new BusException(StrFormatter.format("Process {} does not exist", processName)); + } + ProcessEntity process = logPross.get(processName); + process.setStatus(ProcessStatus.RUNNING); + ProcessStep processStep = ProcessStep.builder() + .stepStatus(ProcessStatus.RUNNING) + .startTime(LocalDateTime.now()) + .type(type) + .name(type.getDesc().getMessage()) + .log(new StringBuilder()) + .errLog(new StringBuilder()) + .childStepsMap(new LinkedHashMap<>()) + .build(); + getParentNode(type, process.getStepsMap()).put(type.getValue(), processStep); + } + + /** + * Mark the process as completed. + * + * @param processName process name + * @param status Process status + * @param e exception object, optional + */ + public void finishedProcess(String processName, ProcessStatus status, Throwable e) { + if (!logPross.containsKey(processName)) { + return; + } + ProcessEntity process = logPross.get(processName); + process.setStatus(status); + process.setEndTime(LocalDateTime.now()); + process.setTime(process.getEndTime().compareTo(process.getStartTime())); + if (e != null) { + process.appendErrLog(LogUtil.getError(e)); + } + String filePath = String.format("%s/tmp/log/%s.json", System.getProperty("user.dir"), process.getName()); + if (FileUtil.exist(filePath)) { + Assert.isTrue(FileUtil.del(filePath)); + } + FileUtil.writeUtf8String(JSONObject.toJSONString(process), filePath); + logPross.remove(processName); + } + + /** + * Mark process step as completed. + * + * @param processName process name + * @param type process step type + * @param status Process step status + * @param e exception object, optional + */ + public void finishedStep(String processName, ProcessStepType type, ProcessStatus status, Throwable e) { + if (!logPross.containsKey(processName)) { + return; + } + Map processStepNode = getParentNode(type, getStepsMap(processName)); + ProcessStep processStep = processStepNode.get(type.getValue()); + processStep.setStepStatus(status); + processStep.setEndTime(LocalDateTime.now()); + processStep.setTime(processStep.getEndTime().compareTo(processStep.getStartTime())); + if (e != null) { + logPross.get(processName).appendErrLog(LogUtil.getError(e)); + } + } + + /** + * The * method first checks if a given type has a parent step (i.e. type.getParentStep()) is null). + * If there is a parent step, recursively call the getParentNode method, passing the parent step type and stepsMap + * to get the child step mapping of the parent node. + * If a given type does not have a parent step, it returns steps directly to indicate that the step of that type is a top-level step. + * Finally, the getParentNode method returns a mapping of child steps containing the parent node for use in other methods. + */ + private Map getParentNode(ProcessStepType type, Map stepsMap) { + if (type.getParentStep() != null) { + Map map = getParentNode(type.getParentStep(), stepsMap); + return map.get(type.getParentStep().getValue()).getChildStepsMap(); + } + return stepsMap; + } + + private Map getStepsMap(String processName) { + return logPross.get(processName).getStepsMap(); + } +} diff --git a/dinky-admin/src/main/java/org/dinky/context/MetricsContextHolder.java b/dinky-admin/src/main/java/org/dinky/context/MetricsContextHolder.java index ddc20c61f8..5b60be817a 100644 --- a/dinky-admin/src/main/java/org/dinky/context/MetricsContextHolder.java +++ b/dinky-admin/src/main/java/org/dinky/context/MetricsContextHolder.java @@ -27,15 +27,9 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalNotification; - import lombok.extern.slf4j.Slf4j; /** @@ -43,36 +37,32 @@ * including operations such as storing and sending metric data. */ @Slf4j -public class MetricsContextHolder { +public class MetricsContextHolder extends BaseSseContext { + + protected static final MetricsContextHolder instance = new MetricsContextHolder(); + + public static MetricsContextHolder getInstances() { + return instance; + } /** * Temporary cache monitoring information, mainly to prevent excessive buffering of write IO, * when metricsVOS data reaches 1000 or the time exceeds 5 seconds */ - private static final List metricsVOS = Collections.synchronizedList(new ArrayList<>()); + private final List metricsVOS = Collections.synchronizedList(new ArrayList<>()); - private static final Long lastDumpTime = System.currentTimeMillis(); - - /** - * Cache that stores SseEmitter objects for sending metric data, - * prevents OOM with LoadingCache, and is automatically removed when objects - * in the cache are not accessed or used for more than 60 seconds. - */ - private static final LoadingCache> sseList = CacheBuilder.newBuilder() - .expireAfterAccess(60, TimeUnit.SECONDS) - .removalListener(MetricsContextHolder::onRemove) - .build(CacheLoader.from(key -> new ArrayList<>())); + private final Long lastDumpTime = System.currentTimeMillis(); - /** - * The sendAsync method is used to send metric data asynchronously. - * - * @param metrics metric data object - */ - public static void sendAsync(MetricsVO metrics) { - CompletableFuture.runAsync(() -> { - dumpMetrics(metrics); - send(metrics); - }); + @Override + public void append(String key, MetricsVO o) { + metricsVOS.add(o); + long duration = System.currentTimeMillis() - lastDumpTime; + synchronized (metricsVOS) { + if (metricsVOS.size() > 1000 || duration > 1000 * 5) { + PaimonUtil.writeMetrics(metricsVOS); + metricsVOS.clear(); + } + } } /** @@ -82,7 +72,7 @@ public static void sendAsync(MetricsVO metrics) { * @param sseEmitter SseEmitter object * @param lastTime initialization data intercepts the largest timestamp */ - public static void addSse(List keys, SseEmitter sseEmitter, LocalDateTime lastTime) { + public void addSse(List keys, SseEmitter sseEmitter, LocalDateTime lastTime) { keys.forEach(key -> { List sseEmitters = sseList.getIfPresent(key); if (sseEmitters == null) { @@ -101,7 +91,7 @@ public static void addSse(List keys, SseEmitter sseEmitter, LocalDateTim * @param sseEmitter SseEmitter object * @param lastTime's last timestamp */ - private static void sendInitData(List keys, SseEmitter sseEmitter, LocalDateTime lastTime) { + private void sendInitData(List keys, SseEmitter sseEmitter, LocalDateTime lastTime) { CompletableFuture.runAsync(() -> { synchronized (metricsVOS) { metricsVOS.forEach(metricsVO -> { @@ -118,63 +108,4 @@ private static void sendInitData(List keys, SseEmitter sseEmitter, Local } }); } - - /** - * The dumpMetrics method is used to dump metric data to paimon. - * - * @param metrics metric data object - */ - private static void dumpMetrics(MetricsVO metrics) { - metricsVOS.add(metrics); - long duration = System.currentTimeMillis() - lastDumpTime; - synchronized (metricsVOS) { - if (metricsVOS.size() > 1000 || duration > 1000 * 5) { - PaimonUtil.writeMetrics(metricsVOS); - metricsVOS.clear(); - } - } - } - - /** - * The send method is used to send metric data. - * - * @param metrics metric data object - */ - private static void send(MetricsVO metrics) { - List sseEmitters = sseList.getIfPresent(metrics.getModel()); - if (sseEmitters != null) { - sseEmitters.forEach(sseEmitter -> { - try { - sseEmitter.send(metrics); - } catch (Exception e) { - log.warn("send metrics error:{}", e.getMessage()); - closeSse(sseEmitter); - sseEmitters.remove(sseEmitter); - } - }); - } - } - - /** - * The onRemove method is used to remove the SseEmitter object from the cache. - * - * @param removalNotification RemovalNotification object - */ - private static void onRemove(RemovalNotification> removalNotification) { - assert removalNotification.getValue() != null; - removalNotification.getValue().forEach(MetricsContextHolder::closeSse); - } - - /** - * The closeSse method is used to close the SseEmitter object. - * - * @param sseEmitter SseEmitter object - */ - private static void closeSse(SseEmitter sseEmitter) { - try { - sseEmitter.complete(); - } catch (Exception e) { - log.warn("complete sseEmitter failed:{}", e.getMessage()); - } - } } diff --git a/dinky-admin/src/main/java/org/dinky/controller/APIController.java b/dinky-admin/src/main/java/org/dinky/controller/APIController.java index cdc8551a1e..fa89946097 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/APIController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/APIController.java @@ -30,7 +30,6 @@ import org.dinky.gateway.enums.SavePointType; import org.dinky.gateway.result.SavePointResult; import org.dinky.job.JobResult; -import org.dinky.process.exception.ExcuteException; import org.dinky.service.JobInstanceService; import org.dinky.service.TaskService; @@ -68,7 +67,7 @@ public class APIController { @PostMapping("/submitTask") @ApiOperation("Submit Task") // @Log(title = "Submit Task", businessType = BusinessType.SUBMIT) - public Result submitTask(@RequestBody TaskDTO taskDTO) throws ExcuteException { + public Result submitTask(@RequestBody TaskDTO taskDTO) throws Exception { JobResult jobResult = taskService.submitTask(taskDTO.getId(), null); if (jobResult.isSuccess()) { return Result.succeed(jobResult, Status.EXECUTE_SUCCESS); @@ -90,7 +89,7 @@ public Result cancel(@RequestParam Integer id) { @GetMapping(value = "/restartTask") @ApiOperation("Restart Task") // @Log(title = "Restart Task", businessType = BusinessType.REMOTE_OPERATION) - public Result restartTask(@RequestParam Integer id, String savePointPath) throws ExcuteException { + public Result restartTask(@RequestParam Integer id, String savePointPath) throws Exception { return Result.succeed(taskService.restartTask(id, savePointPath)); } diff --git a/dinky-admin/src/main/java/org/dinky/controller/ProcessController.java b/dinky-admin/src/main/java/org/dinky/controller/ProcessController.java index 99a7b447a6..f3a652697b 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/ProcessController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/ProcessController.java @@ -19,24 +19,23 @@ package org.dinky.controller; -import org.dinky.data.annotation.Log; -import org.dinky.data.enums.BusinessType; -import org.dinky.data.enums.Status; +import org.dinky.context.ConsoleContextHolder; import org.dinky.data.result.ProTableResult; -import org.dinky.data.result.Result; import org.dinky.process.model.ProcessEntity; -import org.dinky.service.ProcessService; +import org.dinky.sse.SseEmitterUTF8; -import java.util.List; +import java.util.concurrent.TimeUnit; +import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; -import cn.dev33.satoken.stp.StpUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import lombok.RequiredArgsConstructor; @@ -51,7 +50,17 @@ @RequiredArgsConstructor public class ProcessController { - private final ProcessService processService; + @GetMapping(value = "/getLastUpdateData", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @ApiOperation("Get Last Update Data") + @ApiImplicitParams({ + @ApiImplicitParam(name = "lastTime", value = "Last Time", required = false, dataType = "Long"), + @ApiImplicitParam(name = "keys", value = "jobids", required = true, dataType = "String") + }) + public SseEmitter getLastUpdateData(String keys) { + SseEmitter emitter = new SseEmitterUTF8(TimeUnit.MINUTES.toMillis(30)); + ConsoleContextHolder.getInstances().addSse(keys, emitter); + return emitter; + } /** * List all process @@ -66,34 +75,9 @@ public class ProcessController { value = "true: list active process, false: list inactive process", dataType = "Boolean") public ProTableResult listAllProcess(@RequestParam boolean active) { - List processEntities = processService.listAllProcess(active); return ProTableResult.builder() .success(true) - .data(processEntities) + .data(ConsoleContextHolder.getInstances().list()) .build(); } - - /** - * get process by user id - * - * @return {@link ProTableResult} <{@link String} > - */ - @GetMapping("/getConsoleByUserId") - @ApiOperation("Get Log from Process by user id") - public Result getConsoleByUserId() { - return Result.data(processService.getConsoleByUserId(StpUtil.getLoginIdAsInt())); - } - - /** - * clear console by user id - * - * @return {@link Result} <{@link String}> - */ - @GetMapping("/clearConsole") - @ApiOperation("Clear console by user id") - @Log(title = "Clear console by user id", businessType = BusinessType.DELETE) - public Result clearConsole() { - processService.clearConsoleByUserId(StpUtil.getLoginIdAsInt()); - return Result.succeed(Status.CLEAR_SUCCESS); - } } diff --git a/dinky-admin/src/main/java/org/dinky/controller/TaskController.java b/dinky-admin/src/main/java/org/dinky/controller/TaskController.java index 13780f87a5..be571e0564 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/TaskController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/TaskController.java @@ -33,7 +33,9 @@ import org.dinky.gateway.enums.SavePointType; import org.dinky.gateway.result.SavePointResult; import org.dinky.job.JobResult; -import org.dinky.process.exception.ExcuteException; +import org.dinky.process.annotations.ExecuteProcess; +import org.dinky.process.annotations.ProcessId; +import org.dinky.process.enums.ProcessType; import org.dinky.service.TaskService; import org.dinky.utils.JsonUtils; @@ -71,7 +73,8 @@ public class TaskController { @GetMapping("/submitTask") @ApiOperation("Submit Task") @Log(title = "Submit Task", businessType = BusinessType.SUBMIT) - public Result submitTask(@RequestParam Integer id) throws ExcuteException { + @ExecuteProcess(type = ProcessType.FLINK_SUBMIT) + public Result submitTask(@ProcessId @RequestParam Integer id) throws Exception { JobResult jobResult = taskService.submitTask(id, null); if (jobResult.isSuccess()) { return Result.succeed(jobResult, Status.EXECUTE_SUCCESS); @@ -87,11 +90,13 @@ public Result cancel(@RequestParam Integer id) { return Result.succeed(taskService.cancelTaskJob(taskService.getTaskInfoById(id)), Status.EXECUTE_SUCCESS); } - /** 重启任务 */ + /** + * 重启任务 + */ @GetMapping(value = "/restartTask") @ApiOperation("Restart Task") @Log(title = "Restart Task", businessType = BusinessType.REMOTE_OPERATION) - public Result restartTask(@RequestParam Integer id, String savePointPath) throws ExcuteException { + public Result restartTask(@RequestParam Integer id, String savePointPath) throws Exception { return Result.succeed(taskService.restartTask(id, savePointPath)); } diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobMetricsHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobMetricsHandler.java index 6de3361297..6a6fd2e698 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/JobMetricsHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobMetricsHandler.java @@ -66,7 +66,7 @@ public static void writeFlinkMetrics(JobInfoDetail jobInfoDetail) { AsyncUtil.waitAll(array); MetricsVO metricsVO = new MetricsVO(customMetricsList, jobId, LocalDateTime.now()); - MetricsContextHolder.sendAsync(metricsVO); + MetricsContextHolder.getInstances().sendAsync(metricsVO.getModel(), metricsVO); } /** diff --git a/dinky-admin/src/main/java/org/dinky/service/TaskService.java b/dinky-admin/src/main/java/org/dinky/service/TaskService.java index 0f5470492a..f83f635f5f 100644 --- a/dinky-admin/src/main/java/org/dinky/service/TaskService.java +++ b/dinky-admin/src/main/java/org/dinky/service/TaskService.java @@ -67,7 +67,7 @@ public interface TaskService extends ISuperService { * @return A {@link JobResult} object representing the result of the submitted task. * @throws ExcuteException If there is an error executing the task. */ - JobResult submitTask(Integer id, String savePointPath) throws ExcuteException; + JobResult submitTask(Integer id, String savePointPath) throws Exception; /** * Restart the given task and return the job result. @@ -77,7 +77,7 @@ public interface TaskService extends ISuperService { * @return A {@link JobResult} object representing the result of the restarted task. * @throws ExcuteException If there is an error restarting the task. */ - JobResult restartTask(Integer id, String savePointPath) throws ExcuteException; + JobResult restartTask(Integer id, String savePointPath) throws Exception; /** * Savepoint the given task job and return the savepoint result. diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/DataBaseServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/DataBaseServiceImpl.java index 4ffb0dec13..f8bd6185c0 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/DataBaseServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/DataBaseServiceImpl.java @@ -36,6 +36,8 @@ import org.dinky.metadata.driver.Driver; import org.dinky.metadata.result.JdbcSelectResult; import org.dinky.mybatis.service.impl.SuperServiceImpl; +import org.dinky.process.annotations.ProcessStep; +import org.dinky.process.enums.ProcessStepType; import org.dinky.service.DataBaseService; import org.apache.commons.lang3.StringUtils; @@ -265,6 +267,7 @@ public List explainCommonSql(TaskDTO task) { } @Override + @ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE_COMMON_SQL) public JobResult executeCommonSql(SqlDTO sqlDTO) { JobResult result = new JobResult(); result.setStatement(sqlDTO.getStatement()); diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/MonitorServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/MonitorServiceImpl.java index 4c4fdad237..aa68ff2ce1 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/MonitorServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/MonitorServiceImpl.java @@ -93,7 +93,7 @@ public List getData(Date startTime, Date endTime, List jobIds @Override public SseEmitter sendLatestData(SseEmitter sseEmitter, LocalDateTime lastDate, List keys) { - MetricsContextHolder.addSse(keys, sseEmitter, lastDate); + MetricsContextHolder.getInstances().addSse(keys, sseEmitter, lastDate); return sseEmitter; } diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/ProcessServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/ProcessServiceImpl.java deleted file mode 100644 index c678965df1..0000000000 --- a/dinky-admin/src/main/java/org/dinky/service/impl/ProcessServiceImpl.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.service.impl; - -import org.dinky.process.model.ProcessEntity; -import org.dinky.process.pool.ConsolePool; -import org.dinky.process.pool.ProcessPool; -import org.dinky.service.ProcessService; - -import java.util.Comparator; -import java.util.List; -import java.util.stream.Collectors; - -import org.springframework.stereotype.Service; - -/** - * ProcessServiceImpl - * - * @since 2022/10/16 22:45 - */ -@Service -public class ProcessServiceImpl implements ProcessService { - - @Override - public List listAllProcess(boolean active) { - return ProcessPool.INSTANCE.values().stream() - .filter(t -> active ? t.isActiveProcess() : true) - .sorted(Comparator.comparing(ProcessEntity::getStartTime).reversed()) - .collect(Collectors.toList()); - } - - @Override - public String getConsoleByUserId(Integer userId) { - String user = userId.toString(); - return ConsolePool.INSTANCE.getOrDefault(user, new StringBuilder("")).toString(); - } - - @Override - public void clearConsoleByUserId(Integer userId) { - ConsolePool.clear(userId); - } -} diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/StudioServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/StudioServiceImpl.java index 2a869f80b7..28d2d1de76 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/StudioServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/StudioServiceImpl.java @@ -42,9 +42,6 @@ import org.dinky.job.JobManager; import org.dinky.metadata.driver.Driver; import org.dinky.metadata.result.JdbcSelectResult; -import org.dinky.process.context.ProcessContextHolder; -import org.dinky.process.enums.ProcessType; -import org.dinky.process.model.ProcessEntity; import org.dinky.service.ClusterInstanceService; import org.dinky.service.DataBaseService; import org.dinky.service.StudioService; @@ -61,7 +58,6 @@ import com.fasterxml.jackson.databind.JsonNode; -import cn.dev33.satoken.stp.StpUtil; import cn.hutool.cache.Cache; import cn.hutool.cache.CacheUtil; import lombok.RequiredArgsConstructor; @@ -109,17 +105,16 @@ public SelectResult getJobData(String jobId) { @Override public LineageResult getLineage(StudioCADTO studioCADTO) { - ProcessEntity process = ProcessContextHolder.registerProcess( - ProcessEntity.init(ProcessType.LINEAGE, StpUtil.getLoginIdAsInt())); + // TODO 添加ProcessStep if (Asserts.isNotNullString(studioCADTO.getDialect()) && !Dialect.FLINK_SQL.equalsVal(studioCADTO.getDialect())) { if (Asserts.isNull(studioCADTO.getDatabaseId())) { - process.error("Job's data source not selected!"); + log.error("Job's data source not selected!"); return null; } DataBase dataBase = dataBaseService.getById(studioCADTO.getDatabaseId()); if (Asserts.isNull(dataBase)) { - process.error("Job's data source does not exist!"); + log.error("Job's data source does not exist!"); return null; } if (Dialect.DORIS.equalsVal(studioCADTO.getDialect())) { diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index e25fd64219..5a33caf8e1 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -70,10 +70,8 @@ import org.dinky.metadata.result.JdbcSelectResult; import org.dinky.mybatis.service.impl.SuperServiceImpl; import org.dinky.parser.SqlType; -import org.dinky.process.context.ProcessContextHolder; -import org.dinky.process.enums.ProcessType; -import org.dinky.process.exception.ExcuteException; -import org.dinky.process.model.ProcessEntity; +import org.dinky.process.annotations.ProcessStep; +import org.dinky.process.enums.ProcessStepType; import org.dinky.service.AlertGroupService; import org.dinky.service.CatalogueService; import org.dinky.service.ClusterConfigurationService; @@ -112,6 +110,7 @@ import javax.annotation.Resource; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; @@ -123,7 +122,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import cn.dev33.satoken.stp.StpUtil; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.tree.Tree; @@ -153,6 +151,7 @@ public class TaskServiceImpl extends SuperServiceImpl implemen private final UDFTemplateService udfTemplateService; private final DataSourceProperties dsProperties; private final UserService userService; + private final ApplicationContext applicationContext; @Resource @Lazy @@ -170,19 +169,23 @@ private String[] buildParams(int id) { .split(" "); } - private void preCheckTask(TaskDTO task) throws TaskNotDoneException, SqlExplainExcepition { + @ProcessStep(type = ProcessStepType.SUBMIT_PRECHECK) + public void preCheckTask(TaskDTO task) throws TaskNotDoneException, SqlExplainExcepition { + log.info("Start check and config task, task:{}", task.getName()); Assert.notNull(task, Status.TASK_NOT_EXIST.getMessage()); if (!Dialect.isCommonSql(task.getDialect()) && Asserts.isNotNull(task.getJobInstanceId()) && task.getJobInstanceId() > 0) { - String status = jobInstanceService.getById(task.getJobInstanceId()).getStatus(); - if (!JobStatus.isDone(status)) { + JobInstance jobInstance = jobInstanceService.getById(task.getJobInstanceId()); + if (jobInstance != null && !JobStatus.isDone(jobInstance.getStatus())) { throw new TaskNotDoneException(Status.TASK_STATUS_IS_NOT_DONE.getMessage()); } } + log.info("Start explain Sql,task: {},Dialect:{}", task.getName(), task.getDialect()); + List sqlExplainResults = explainTask(task); for (SqlExplainResult sqlExplainResult : sqlExplainResults) { if (!sqlExplainResult.isParseTrue() || !sqlExplainResult.isExplainTrue()) { @@ -193,38 +196,46 @@ private void preCheckTask(TaskDTO task) throws TaskNotDoneException, SqlExplainE sqlExplainResult.getError())); } } + + log.info("Explain Sql finish"); } - public JobResult executeJob(TaskDTO task) { - ProcessEntity process = ProcessContextHolder.getProcess(); + @ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE) + public JobResult executeJob(TaskDTO task) throws Exception { + JobResult jobResult; if (Dialect.isCommonSql(task.getDialect())) { - process.info("Preparing to execute common sql..."); - JobResult jobResult = - dataBaseService.executeCommonSql(SqlDTO.build(task.getStatement(), task.getDatabaseId(), null)); + log.info("Preparing to execute common sql..."); + SqlDTO sqlDTO = SqlDTO.build(task.getStatement(), task.getDatabaseId(), null); + jobResult = dataBaseService.executeCommonSql(sqlDTO); ResultPool.putCommonSqlCache(task.getId(), (JdbcSelectResult) jobResult.getResult()); - return jobResult; } else { - process.info("Initializing Flink job config..."); - JobManager jobManager = JobManager.build(buildJobConfig(task)); - return jobManager.executeSql(task.getStatement()); + log.info("Initializing Flink job config..."); + JobManager jobManager = JobManager.build( + applicationContext.getBean(TaskServiceImpl.class).buildJobConfig(task)); + jobResult = jobManager.executeSql(task.getStatement()); } + log.info("execute job finished,status is {}", jobResult.getStatus()); + return jobResult; } - private JobConfig buildJobConfig(TaskDTO task) { + @ProcessStep(type = ProcessStepType.SUBMIT_BUILD_CONFIG) + public JobConfig buildJobConfig(TaskDTO task) { task.setStatement(buildEnvSql(task) + task.getStatement()); JobConfig config = task.getJobConfig(); - Savepoints savepoints = savepointsService.getSavePointWithStrategy(task); if (Asserts.isNotNull(savepoints)) { + log.info("Init savePoint"); config.setSavePointPath(savepoints.getPath()); config.getConfigJson().put("execution.savepoint.path", savepoints.getPath()); // todo: 写工具类处理相关配置 } if (GatewayType.get(task.getType()).isDeployCluster()) { + log.info("Init gateway config, type:{}", task.getType()); FlinkClusterConfig flinkClusterCfg = clusterCfgService.getFlinkClusterCfg(config.getClusterConfigurationId()); flinkClusterCfg.getAppConfig().setUserJarParas(buildParams(config.getTaskId())); config.buildGatewayConfig(flinkClusterCfg); } else { + log.info("Init remote cluster"); String address = clusterInstanceService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()); config.setAddress(address); } @@ -233,8 +244,7 @@ private JobConfig buildJobConfig(TaskDTO task) { @Override public String buildEnvSql(AbstractStatementDTO task) { - ProcessEntity process = ProcessContextHolder.getProcess(); - process.info("Start initialize FlinkSQLEnv:"); + log.info("Start initialize FlinkSQLEnv:"); String sql = CommonConstant.LineSep; if (task.isFragment()) { String flinkWithSql = dataBaseService.getEnabledFlinkWithSql(); @@ -250,20 +260,16 @@ public String buildEnvSql(AbstractStatementDTO task) { sql += envTask.getStatement() + CommonConstant.LineSep; } } - process.info("Initializing data permissions..."); + log.info("Initializing data permissions..."); userService.buildRowPermission(); - process.info("Finish initialize FlinkSQLEnv."); + log.info("Finish initialize FlinkSQLEnv."); return sql; } @Override - public JobResult submitTask(Integer id, String savePointPath) throws ExcuteException { + @ProcessStep(type = ProcessStepType.SUBMIT_TASK) + public JobResult submitTask(Integer id, String savePointPath) throws Exception { initTenantByTaskId(id); - ProcessEntity process = StpUtil.isLogin() - ? ProcessContextHolder.registerProcess( - ProcessEntity.init(ProcessType.FLINK_SUBMIT, StpUtil.getLoginIdAsInt())) - : ProcessEntity.NULL_PROCESS; - process.start(); TaskDTO task = this.getTaskInfoById(id); @@ -271,28 +277,26 @@ public JobResult submitTask(Integer id, String savePointPath) throws ExcuteExcep task.setSavePointStrategy(SavePointStrategy.CUSTOM.getValue()); task.setSavePointPath(savePointPath); } + // 注解自调用会失效,这里通过获取对象方法绕过此限制 + TaskServiceImpl taskServiceBean = applicationContext.getBean(TaskServiceImpl.class); + taskServiceBean.preCheckTask(task); - preCheckTask(task); - - JobResult jobResult = executeJob(task); - process.info("execute job finished,status is ", jobResult.getStatus()); + JobResult jobResult = taskServiceBean.executeJob(task); if (Job.JobStatus.SUCCESS == jobResult.getStatus()) { - process.info("Job Submit success"); + log.info("Job Submit success"); task.setJobInstanceId(jobResult.getJobInstanceId()); if (!this.updateById(task.buildTask())) { throw new BusException(Status.TASK_UPDATE_FAILED.getMessage()); } } else { - process.error("Job Submit failed, error: " + jobResult.getError()); + log.error("Job Submit failed, error: " + jobResult.getError()); } - - process.finish(); return jobResult; } @Override - public JobResult restartTask(Integer id, String savePointPath) throws ExcuteException { + public JobResult restartTask(Integer id, String savePointPath) throws Exception { TaskDTO task = this.getTaskInfoById(id); Asserts.checkNull(task, Status.TASK_NOT_EXIST.getMessage()); if (!Dialect.isCommonSql(task.getDialect()) && Asserts.isNotNull(task.getJobInstanceId())) { @@ -340,6 +344,7 @@ public SavePointResult savepointTaskJob(TaskDTO task, SavePointType savePointTyp @Override public List explainTask(TaskDTO task) throws NotSupportExplainExcepition { + if (Dialect.isCommonSql(task.getDialect())) { return dataBaseService.explainCommonSql(task); } else if (task.getDialect().equals(Dialect.FLINK_SQL.getValue())) { diff --git a/dinky-admin/src/main/java/org/dinky/sse/LogSseAppender.java b/dinky-admin/src/main/java/org/dinky/sse/LogSseAppender.java new file mode 100644 index 0000000000..b4ce8c54ac --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/sse/LogSseAppender.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.sse; + +import org.dinky.aop.ProcessAspect; +import org.dinky.context.ConsoleContextHolder; +import org.dinky.process.enums.ProcessStepType; + +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.Layout; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Property; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginElement; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.core.layout.PatternLayout; +import org.apache.logging.log4j.util.ReadOnlyStringMap; + +import java.io.Serializable; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Plugin(name = "LogSseAppender", category = "Core", elementType = "appender", printObject = true) +public class LogSseAppender extends AbstractAppender { + protected LogSseAppender( + String name, + Filter filter, + Layout layout, + boolean ignoreExceptions, + Property[] properties) { + super(name, filter, layout, ignoreExceptions, properties); + } + + /** + * This method is called when a new log comes over, contextData is the data in the MDC, + * set in {@link ProcessAspect} If contextData contains PROCESS_NAME and PROCESS_STEP, + * it is represented as buried point data and sent to {@link ConsoleContextHolder} + * Otherwise, it is a normal log and is ignored + */ + @Override + public void append(LogEvent event) { + ReadOnlyStringMap contextData = event.getContextData(); + if (contextData.containsKey(ProcessAspect.PROCESS_NAME) + && contextData.containsKey(ProcessAspect.PROCESS_STEP)) { + String processName = contextData.getValue(ProcessAspect.PROCESS_NAME); + String processStep = contextData.getValue(ProcessAspect.PROCESS_STEP); + ConsoleContextHolder.getInstances() + .appendLog(processName, ProcessStepType.get(processStep), event.toString()); + } + } + + /** + * createAppender + * + * */ + @PluginFactory + public static LogSseAppender createAppender( + @PluginAttribute("name") String name, + @PluginElement("Filter") final Filter filter, + @PluginElement("Layout") Layout layout) { + if (name == null) { + log.error("No name provided for LogSseAppender"); + return null; + } + if (layout == null) { + layout = PatternLayout.createDefaultLayout(); + } + return new LogSseAppender(name, filter, layout, false, null); + } +} diff --git a/dinky-admin/src/main/resources/log4j2.xml b/dinky-admin/src/main/resources/log4j2.xml index cf06dec7e2..15691ff431 100644 --- a/dinky-admin/src/main/resources/log4j2.xml +++ b/dinky-admin/src/main/resources/log4j2.xml @@ -54,6 +54,16 @@ + + + + + + + + + @@ -66,6 +76,7 @@ + diff --git a/dinky-common/src/main/java/org/dinky/data/enums/Status.java b/dinky-common/src/main/java/org/dinky/data/enums/Status.java index a3f77d450a..1b2a060c36 100644 --- a/dinky-common/src/main/java/org/dinky/data/enums/Status.java +++ b/dinky-common/src/main/java/org/dinky/data/enums/Status.java @@ -352,9 +352,18 @@ public enum Status { * gateway config */ GAETWAY_KUBERNETS_TEST_FAILED(180, "gateway.kubernetes.test.failed"), - GAETWAY_KUBERNETS_TEST_SUCCESS(180, "gateway.kubernetes.test.success"), - ; + GAETWAY_KUBERNETS_TEST_SUCCESS(181, "gateway.kubernetes.test.success"), + /** + * process + * */ + PROCESS_SUBMIT_SUBMITTASK(190, "process.submit.submitTask"), + PROCESS_SUBMIT_CHECKSQL(191, "process.submit.checkSql"), + PROCESS_SUBMIT_EXECUTE(192, "process.submit.execute"), + PROCESS_SUBMIT_BUILDCONFIG(193, "process.submit.buildConfig"), + PROCESS_SUBMIT_EXECUTECOMMSQL(194, "process.submit.execute.commSql"), + PROCESS_SUBMIT_EXECUTEFLINKSQL(195, "process.submit.execute.flinkSql"), + ; private final int code; private final String key; diff --git a/dinky-common/src/main/java/org/dinky/utils/LogUtil.java b/dinky-common/src/main/java/org/dinky/utils/LogUtil.java index 3edbc86718..0081596b36 100644 --- a/dinky-common/src/main/java/org/dinky/utils/LogUtil.java +++ b/dinky-common/src/main/java/org/dinky/utils/LogUtil.java @@ -42,7 +42,6 @@ public static String getError(Throwable e) { PrintWriter pw = new PrintWriter(sw)) { e.printStackTrace(pw); error = sw.toString(); - logger.error(error); } catch (IOException ioe) { ioe.printStackTrace(); } finally { @@ -56,8 +55,7 @@ public static String getError(String msg, Throwable e) { PrintWriter pw = new PrintWriter(sw)) { e.printStackTrace(pw); LocalDateTime now = LocalDateTime.now(); - error = now.toString() + ": " + msg + " \nError message:\n " + sw.toString(); - logger.error(error); + error = now + ": " + msg + " \nError message:\n " + sw; } catch (IOException ioe) { ioe.printStackTrace(); } finally { diff --git a/dinky-common/src/main/resources/i18n/messages_en_US.properties b/dinky-common/src/main/resources/i18n/messages_en_US.properties index 3fe1c46ed6..15b828174f 100644 --- a/dinky-common/src/main/resources/i18n/messages_en_US.properties +++ b/dinky-common/src/main/resources/i18n/messages_en_US.properties @@ -238,3 +238,11 @@ gateway.kubernetes.test.failed= failed to test the Flink configuration: task.status.is.not.done=In the current publishing state, a job is running and the online fails, please stop and go online task.sql.explain.failed=SQL parsing failed, please check the SQL statement task.update.failed=Task Update failed + +# process +process.submit.submitTask= Submit the job +process.submit.checkSql=Check job +process.submit.execute = execute the job +process.submit.buildConfig=Build configuration information +process.submit.execute.commSql=excute commonSql +process.submit.execute.flinkSql=excute flinkSql \ No newline at end of file diff --git a/dinky-common/src/main/resources/i18n/messages_zh_CN.properties b/dinky-common/src/main/resources/i18n/messages_zh_CN.properties index 227ce0b043..1c3cf22476 100644 --- a/dinky-common/src/main/resources/i18n/messages_zh_CN.properties +++ b/dinky-common/src/main/resources/i18n/messages_zh_CN.properties @@ -235,4 +235,12 @@ gateway.kubernetes.test.failed=测试 Flink 配置失败: # Task task.status.is.not.done=当前发布状态下有作业正在运行,上线失败,请停止后上线 task.sql.explain.failed=sql解析失败,请检查 -task.update.failed=Task更新失败 \ No newline at end of file +task.update.failed=Task更新失败 + +# process +process.submit.submitTask=提交作业 +process.submit.checkSql=检查作业 +process.submit.execute=执行作业 +process.submit.buildConfig=构建配置信息 +process.submit.execute.commSql=执行普通sql +process.submit.execute.flinkSql=执行flinkSql \ No newline at end of file diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index 1448ab428e..333bd335e1 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -40,8 +40,6 @@ import org.dinky.job.StatementParam; import org.dinky.parser.SqlType; import org.dinky.parser.check.AddJarSqlParser; -import org.dinky.process.context.ProcessContextHolder; -import org.dinky.process.model.ProcessEntity; import org.dinky.trans.Operations; import org.dinky.utils.DinkyClassLoaderUtil; import org.dinky.utils.LogUtil; @@ -57,22 +55,20 @@ import java.util.List; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; +import lombok.extern.slf4j.Slf4j; /** * Explainer * * @since 2021/6/22 */ +@Slf4j public class Explainer { - private static final Logger logger = LoggerFactory.getLogger(Explainer.class); private Executor executor; private boolean useStatementSet; @@ -184,8 +180,7 @@ public List parseUDFFromStatements(String[] statements) { } public ExplainResult explainSql(String statement) { - ProcessEntity process = ProcessContextHolder.getProcess(); - process.info("Start explain FlinkSQL..."); + log.info("Start explain FlinkSQL..."); JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator)); List sqlExplainRecords = new ArrayList<>(); int index = 1; @@ -207,7 +202,7 @@ record = executor.explainSqlRecord(item.getValue()); record.setIndex(index); sqlExplainRecords.add(record); correct = false; - process.error(error); + log.error(error); break; } record.setExplainTrue(true); @@ -241,7 +236,7 @@ record = executor.explainSqlRecord(item.getValue()); record.setParseTrue(false); record.setExplainTrue(false); correct = false; - process.error(error); + log.error(error); break; } finally { record.setType("Modify DML"); @@ -264,7 +259,7 @@ record = executor.explainSqlRecord(item.getValue()); record.setParseTrue(false); record.setExplainTrue(false); correct = false; - process.error(error); + log.error(error); } finally { record.setType("Modify DML"); record.setExplainTime(LocalDateTime.now()); @@ -286,7 +281,7 @@ record = executor.explainSqlRecord(item.getValue()); record.setParseTrue(false); record.setExplainTrue(false); correct = false; - process.error(error); + log.error(error); } finally { record.setType("Modify DML"); record.setExplainTime(LocalDateTime.now()); @@ -317,7 +312,7 @@ record = new SqlExplainResult(); record.setIndex(index); sqlExplainRecords.add(record); correct = false; - process.error(error); + log.error(error); break; } record.setExplainTrue(true); @@ -326,7 +321,7 @@ record = new SqlExplainResult(); record.setIndex(index++); sqlExplainRecords.add(record); } - process.info(StrUtil.format("A total of {} FlinkSQL have been Explained.", sqlExplainRecords.size())); + log.info(StrUtil.format("A total of {} FlinkSQL have been Explained.", sqlExplainRecords.size())); return new ExplainResult(correct, sqlExplainRecords.size(), sqlExplainRecords); } @@ -387,7 +382,7 @@ public List getLineage(String statement) { executor.executeSql(sql); } } catch (Exception e) { - logger.error(e.getMessage()); + log.error(e.getMessage()); return lineageRelList; } } diff --git a/dinky-core/src/main/java/org/dinky/explainer/sqllineage/LineageBuilder.java b/dinky-core/src/main/java/org/dinky/explainer/sqllineage/LineageBuilder.java index ab829c90b9..297f36ba21 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/sqllineage/LineageBuilder.java +++ b/dinky-core/src/main/java/org/dinky/explainer/sqllineage/LineageBuilder.java @@ -26,8 +26,6 @@ import org.dinky.explainer.lineage.LineageTable; import org.dinky.metadata.driver.Driver; import org.dinky.metadata.driver.DriverConfig; -import org.dinky.process.context.ProcessContextHolder; -import org.dinky.process.model.ProcessEntity; import java.util.ArrayList; import java.util.HashMap; @@ -36,9 +34,6 @@ import java.util.Map; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.alibaba.druid.sql.SQLUtils; import com.alibaba.druid.sql.ast.SQLExpr; import com.alibaba.druid.sql.ast.SQLStatement; @@ -47,12 +42,13 @@ import com.alibaba.druid.sql.ast.statement.SQLInsertStatement; import com.alibaba.druid.stat.TableStat; -public class LineageBuilder { +import lombok.extern.slf4j.Slf4j; - protected static final Logger logger = LoggerFactory.getLogger(LineageBuilder.class); +@Slf4j +public class LineageBuilder { public static LineageResult getSqlLineageByOne(String statement, String type) { - ProcessEntity process = ProcessContextHolder.getProcess(); + // TODO 改为ProcessStep注释 List tables = new ArrayList<>(); List relations = new ArrayList<>(); try { @@ -140,8 +136,7 @@ public static LineageResult getSqlLineageByOne(String statement, String type) { int tSize = tgtList.size(); int sSize = srcLists.size(); if (tSize != sSize && tSize * 2 != sSize) { - logger.error("Target table fields do not match!"); - process.error("Target table fields do not match!"); + log.error("Target table fields do not match!"); return null; } for (int i = 0; i < tSize; i++) { @@ -169,19 +164,19 @@ public static LineageResult getSqlLineageByOne(String statement, String type) { } } } else { - process.info("Does not contain an insert statement, cannot analyze the lineage."); + log.info("Does not contain an insert statement, cannot analyze the lineage."); return null; } } catch (Exception e) { e.printStackTrace(); - process.error("Unexpected exceptions occur! " + e.getMessage()); + log.error("Unexpected exceptions occur! " + e.getMessage()); return null; } return LineageResult.build(tables, relations); } public static LineageResult getSqlLineage(String statement, String type, DriverConfig driverConfig) { - ProcessEntity process = ProcessContextHolder.getProcess(); + // TODO 改为ProcessStep注释 List tables = new ArrayList<>(); List relations = new ArrayList<>(); Map>> srcMap = new HashMap<>(); @@ -207,7 +202,7 @@ public static LineageResult getSqlLineage(String statement, String type, DriverC if (columns.size() <= 0 || sqls[n].contains("*")) { Driver driver = Driver.build(driverConfig); if (!targetTable.contains(".")) { - process.error("Target table not specified database!"); + log.error("Target table not specified database!"); return null; } List columns1 = driver.listColumns( @@ -255,7 +250,7 @@ public static LineageResult getSqlLineage(String statement, String type, DriverC srcMap.put(n, srcLists); tgtMap.put(n, tgtList); } else { - process.info("Does not contain an insert statement, cannot analyze the lineage."); + log.info("Does not contain an insert statement, cannot analyze the lineage."); return null; } } @@ -302,8 +297,8 @@ public static LineageResult getSqlLineage(String statement, String type, DriverC int tSize = tgtList.size(); int sSize = srcLists.size(); if (tSize != sSize && tSize * 2 != sSize) { - logger.error("Target table fields do not match!"); - process.error("Target table fields do not match!"); + log.error("Target table fields do not match!"); + log.error("Target table fields do not match!"); return null; } for (int i = 0; i < tSize; i++) { @@ -332,8 +327,7 @@ public static LineageResult getSqlLineage(String statement, String type, DriverC } } } catch (Exception e) { - e.printStackTrace(); - process.error("Unexpected exceptions occur! " + e.getMessage()); + log.error("Unexpected exceptions occur!", e); return null; } return LineageResult.build(tables, relations); diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index e905b72f6b..dce1deb347 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -57,8 +57,8 @@ import org.dinky.interceptor.FlinkInterceptor; import org.dinky.interceptor.FlinkInterceptorResult; import org.dinky.parser.SqlType; -import org.dinky.process.context.ProcessContextHolder; -import org.dinky.process.model.ProcessEntity; +import org.dinky.process.annotations.ProcessStep; +import org.dinky.process.enums.ProcessStepType; import org.dinky.trans.Operations; import org.dinky.utils.DinkyClassLoaderUtil; import org.dinky.utils.LogUtil; @@ -91,9 +91,6 @@ import java.util.Set; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.fasterxml.jackson.databind.node.ObjectNode; import cn.hutool.core.collection.CollUtil; @@ -103,11 +100,10 @@ import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.URLUtil; import cn.hutool.json.JSONUtil; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class JobManager { - - private static final Logger logger = LoggerFactory.getLogger(JobManager.class); - private JobHandler handler; private ExecutorConfig executorConfig; private JobConfig config; @@ -167,7 +163,7 @@ public static JobManager buildPlanMode(JobConfig config) { JobManager manager = new JobManager(config); manager.setPlanMode(true); manager.init(); - ProcessContextHolder.getProcess().info("Build Flink plan mode success."); + log.info("Build Flink plan mode success."); return manager; } @@ -218,7 +214,7 @@ public void initUDF(List udfList, GatewayType runMode, Integer taskId) { if (taskId == null) { taskId = -RandomUtil.randomInt(0, 1000); } - ProcessEntity process = ProcessContextHolder.getProcess(); + // TODO 改为ProcessStep注释 // 这里要分开 // 1. 得到jar包路径,注入remote环境 @@ -268,12 +264,12 @@ public void initUDF(List udfList, GatewayType runMode, Integer taskId) { addConfigurationClsAndJars(jarList, CollUtil.newArrayList(URLUtils.getURLs(otherPluginsFiles))); } catch (Exception e) { - logger.error("add configuration failed;reason:{}", LogUtil.getError(e)); + log.error("add configuration failed;reason:{}", LogUtil.getError(e)); throw new RuntimeException(e); } - process.info(StrUtil.format("A total of {} UDF have been Init.", udfList.size() + pyUdfFile.size())); - process.info("Initializing Flink UDF...Finish"); + log.info(StrUtil.format("A total of {} UDF have been Init.", udfList.size() + pyUdfFile.size())); + log.info("Initializing Flink UDF...Finish"); } private void writeManifest(Integer taskId, List jarPaths) { @@ -307,8 +303,8 @@ public boolean close() { return true; } - public JobResult executeSql(String statement) { - ProcessEntity process = ProcessContextHolder.getProcess(); + @ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE) + public JobResult executeSql(String statement) throws Exception { Job job = Job.init(runMode, config, executorConfig, executor, statement, useGateway); if (!useGateway) { job.setJobManagerAddress(executorConfig.getJobManagerAddress()); @@ -530,8 +526,8 @@ public JobResult executeSql(String statement) { job.setEndTime(LocalDateTime.now()); job.setStatus(Job.JobStatus.FAILED); job.setError(error); - process.error(error); failed(); + throw e; } finally { close(); } @@ -646,7 +642,7 @@ public boolean cancel(String jobId) { try { return FlinkAPI.build(config.getAddress()).stop(jobId); } catch (Exception e) { - logger.error("停止作业时集群不存在: " + e); + log.error("停止作业时集群不存在: " + e); } return false; } @@ -673,7 +669,7 @@ public static GatewayResult deploySessionCluster(GatewayConfig gatewayConfig) { } public JobResult executeJar() { - ProcessEntity process = ProcessContextHolder.getProcess(); + // TODO 改为ProcessStep注释 Job job = Job.init(runMode, config, executorConfig, executor, null, useGateway); JobContextHolder.setJob(job); ready(); @@ -703,7 +699,7 @@ public JobResult executeJar() { job.setStatus(Job.JobStatus.FAILED); job.setError(error); failed(); - process.error(error); + log.error(error); } finally { close(); } diff --git a/dinky-core/src/test/java/org/dinky/core/JobManagerTest.java b/dinky-core/src/test/java/org/dinky/core/JobManagerTest.java index e16a0465cc..45e961ca1a 100644 --- a/dinky-core/src/test/java/org/dinky/core/JobManagerTest.java +++ b/dinky-core/src/test/java/org/dinky/core/JobManagerTest.java @@ -43,7 +43,7 @@ public class JobManagerTest { @Ignore @Test - public void cancelJobSelect() { + public void cancelJobSelect() throws Exception { JobConfig config = JobConfig.builder() .type(GatewayType.YARN_SESSION.getLongValue()) .useResult(true) diff --git a/dinky-function/src/main/java/org/dinky/function/compiler/JavaCompiler.java b/dinky-function/src/main/java/org/dinky/function/compiler/JavaCompiler.java index 99ef86e811..4eb92d15bf 100644 --- a/dinky-function/src/main/java/org/dinky/function/compiler/JavaCompiler.java +++ b/dinky-function/src/main/java/org/dinky/function/compiler/JavaCompiler.java @@ -21,8 +21,6 @@ import org.dinky.function.constant.PathConstant; import org.dinky.function.data.model.UDF; -import org.dinky.process.context.ProcessContextHolder; -import org.dinky.process.model.ProcessEntity; import org.apache.flink.configuration.ReadableConfig; @@ -46,19 +44,19 @@ public class JavaCompiler implements FunctionCompiler { */ @Override public boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) { - ProcessEntity process = ProcessContextHolder.getProcess(); - process.info("正在编译 java 代码 , class: " + udf.getClassName()); + // TODO 改为ProcessStep注释 + log.info("正在编译 java 代码 , class: " + udf.getClassName()); CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(udf.getCode()); boolean res = compiler.compilerToTmpPath(PathConstant.getUdfCompilerJavaPath(missionId)); String className = compiler.getFullClassName(); if (res) { - process.info("class编译成功:" + className); - process.info("compilerTakeTime:" + compiler.getCompilerTakeTime()); + log.info("class编译成功:" + className); + log.info("compilerTakeTime:" + compiler.getCompilerTakeTime()); return true; } else { log.error("class编译失败:{}", className); - process.error("class编译失败:" + className); - process.error(compiler.getCompilerMessage()); + log.error("class编译失败:" + className); + log.error(compiler.getCompilerMessage()); return false; } } diff --git a/dinky-function/src/main/java/org/dinky/function/compiler/PythonFunction.java b/dinky-function/src/main/java/org/dinky/function/compiler/PythonFunction.java index 4f69b4046f..d8b11aceca 100644 --- a/dinky-function/src/main/java/org/dinky/function/compiler/PythonFunction.java +++ b/dinky-function/src/main/java/org/dinky/function/compiler/PythonFunction.java @@ -25,8 +25,6 @@ import org.dinky.function.data.model.UDF; import org.dinky.function.util.UDFUtil; import org.dinky.function.util.ZipWriter; -import org.dinky.process.context.ProcessContextHolder; -import org.dinky.process.model.ProcessEntity; import org.apache.flink.client.python.PythonFunctionFactory; import org.apache.flink.configuration.Configuration; @@ -66,9 +64,9 @@ public class PythonFunction implements FunctionCompiler, FunctionPackage { @Override public boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) { Asserts.checkNull(udf, "flink-config 不能为空"); - ProcessEntity process = ProcessContextHolder.getProcess(); + // TODO 改为ProcessStep注释 - process.info("正在编译 python 代码 , class: " + udf.getClassName()); + log.info("正在编译 python 代码 , class: " + udf.getClassName()); File pyFile = FileUtil.writeUtf8String( udf.getCode(), PathConstant.getUdfCompilerPythonPath(missionId, UDFUtil.getPyFileName(udf.getClassName()) + ".py")); @@ -85,9 +83,9 @@ public boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) { SystemConfiguration.getInstances().getPythonHome()); PythonFunctionFactory.getPythonFunction(udf.getClassName(), configuration, null); - process.info("Python udf编译成功 ; className:" + udf.getClassName()); + log.info("Python udf编译成功 ; className:" + udf.getClassName()); } catch (Exception e) { - process.error("Python udf编译失败 ; className:" + log.error("Python udf编译失败 ; className:" + udf.getClassName() + " 。 原因: " + ExceptionUtil.getRootCauseMessage(e)); diff --git a/dinky-function/src/main/java/org/dinky/function/compiler/ScalaCompiler.java b/dinky-function/src/main/java/org/dinky/function/compiler/ScalaCompiler.java index 10f057a2cb..95f295b411 100644 --- a/dinky-function/src/main/java/org/dinky/function/compiler/ScalaCompiler.java +++ b/dinky-function/src/main/java/org/dinky/function/compiler/ScalaCompiler.java @@ -20,29 +20,30 @@ package org.dinky.function.compiler; import org.dinky.function.data.model.UDF; -import org.dinky.process.context.ProcessContextHolder; -import org.dinky.process.model.ProcessEntity; import org.apache.flink.configuration.ReadableConfig; +import lombok.extern.slf4j.Slf4j; + /** * scala编译 * * @since 0.6.8 */ +@Slf4j public class ScalaCompiler implements FunctionCompiler { @Override public boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) { - ProcessEntity process = ProcessContextHolder.getProcess(); + // TODO 改为ProcessStep注释 String className = udf.getClassName(); - process.info("正在编译 scala 代码 , class: " + className); + log.info("正在编译 scala 代码 , class: " + className); if (CustomStringScalaCompiler.getInterpreter(missionId).compileString(udf.getCode())) { - process.info("scala class编译成功:" + className); + log.info("scala class编译成功:" + className); return true; } else { - process.error("scala class编译失败:" + className); + log.error("scala class编译失败:" + className); return false; } } diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetsOperatorGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetsOperatorGateway.java index ac84456541..724feb7937 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetsOperatorGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubernetsOperatorGateway.java @@ -32,8 +32,6 @@ import org.dinky.gateway.kubernetes.operator.api.JobSpec; import org.dinky.gateway.result.SavePointResult; import org.dinky.gateway.result.TestResult; -import org.dinky.process.context.ProcessContextHolder; -import org.dinky.process.model.ProcessEntity; import org.dinky.utils.TextUtil; import org.apache.flink.configuration.CoreOptions; @@ -41,7 +39,6 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.Map; import org.slf4j.Logger; @@ -64,7 +61,7 @@ public abstract class KubernetsOperatorGateway extends AbstractGateway { private FlinkDeployment flinkDeployment = new FlinkDeployment(); private FlinkDeploymentSpec flinkDeploymentSpec = new FlinkDeploymentSpec(); private KubernetesClient kubernetesClient; - private ProcessEntity process = ProcessContextHolder.getProcess(); + // TODO 改为ProcessStep注释 private static final Logger logger = LoggerFactory.getLogger(KubernetsOperatorGateway.class); @@ -132,14 +129,6 @@ private void initJob() { jarMainClass, userJarPath, userJarParas); - process.info(String.format( - "\n" - + "The app config is : \n" - + "jarMainClass:%s\n" - + " userJarPath:%s\n" - + " userJarParas:%s\n" - + " ", - jarMainClass, userJarPath, Arrays.toString(userJarParas))); if (Asserts.isNullString(jarMainClass) || Asserts.isNullString(userJarPath)) { throw new IllegalArgumentException("jar MainClass or userJarPath must be config!!!"); @@ -157,11 +146,9 @@ private void initJob() { jobSpecBuilder.upgradeMode(UpgradeMode.SAVEPOINT); logger.info("find save point config, the path is : {}", savePointPath); - process.info(String.format("find save point config, the path is : %s", savePointPath)); } else { jobSpecBuilder.upgradeMode(UpgradeMode.STATELESS); logger.info("no save point config"); - process.info("no save point config"); } flinkDeploymentSpec.setJob(jobSpecBuilder.build()); @@ -174,13 +161,11 @@ private void initResource(KubernetesClient kubernetesClient) { String jbcpu = kubernetsConfiguration.getOrDefault("kubernetes.jobmanager.cpu", "1"); String jbmem = flinkConfig.getConfiguration().getOrDefault("jobmanager.memory.process.size", "1G"); logger.info("jobmanager resource is : cpu-->{}, mem-->{}", jbcpu, jbmem); - process.info(String.format("jobmanager resource is : cpu-->%s, mem-->%s", jbcpu, jbmem)); jobManagerSpec.setResource(new Resource(Double.parseDouble(jbcpu), jbmem)); String tmcpu = kubernetsConfiguration.getOrDefault("kubernetes.taskmanager.cpu", "1"); String tmmem = flinkConfig.getConfiguration().getOrDefault("taskmanager.memory.process.size", "1G"); logger.info("taskmanager resource is : cpu-->{}, mem-->{}", tmcpu, tmmem); - process.info(String.format("taskmanager resource is : cpu-->%s, mem-->%s", tmcpu, tmmem)); taskManagerSpec.setResource(new Resource(Double.parseDouble(tmcpu), tmmem)); if (!TextUtil.isEmpty(k8sConfig.getPodTemplate())) { @@ -211,7 +196,6 @@ private void initSpec() { String serviceAccount = kubernetsConfiguration.get("kubernetes.service.account"); logger.info("\nflinkVersion is : {} \n image is : {}", flinkVersion, image); - process.info(String.format("\nflinkVersion is : %s \n image is : %s", flinkVersion, image)); if (Asserts.isNotNull(flinkVersion)) { flinkDeploymentSpec.setFlinkVersion(flinkVersion); @@ -246,7 +230,6 @@ private void initMetadata() { String nameSpace = kubernetsConfiguration.get("kubernetes.namespace"); logger.info("\njobName is :{} \n namespce is : {}", jobName, nameSpace); - process.info(String.format("\njobName is :%s \n namespce is : %s", jobName, nameSpace)); // set Meta info , include pod name, namespace conf ObjectMeta objectMeta = new ObjectMeta(); diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubetnetsApplicationOperatorGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubetnetsApplicationOperatorGateway.java index 8688c05dc0..946cadd00f 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubetnetsApplicationOperatorGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/operator/KubetnetsApplicationOperatorGateway.java @@ -24,7 +24,6 @@ import org.dinky.gateway.kubernetes.operator.api.FlinkDeployment; import org.dinky.gateway.result.GatewayResult; import org.dinky.gateway.result.KubernetesResult; -import org.dinky.process.model.ProcessEntity; import org.dinky.utils.LogUtil; import java.util.Collections; @@ -52,8 +51,7 @@ public GatewayType getType() { @Override public GatewayResult submitJar() { - ProcessEntity process = getProcess(); - process.info(String.format("start submit flink jar use %s", getType())); + // TODO 改为ProcessStep注释 logger.info("start submit flink jar use {}", getType()); KubernetesResult result = KubernetesResult.build(getType()); @@ -64,12 +62,8 @@ public GatewayResult submitJar() { KubernetesClient kubernetesClient = getKubernetesClient(); FlinkDeployment flinkDeployment = getFlinkDeployment(); - process.info("start delete old cluster "); kubernetesClient.resource(flinkDeployment).delete(); kubernetesClient.resource(flinkDeployment).waitUntilCondition(Objects::isNull, 1, TimeUnit.MINUTES); - - process.info("start create cluster "); - kubernetesClient.resource(flinkDeployment).createOrReplace(); FlinkDeployment flinkDeploymentResult = kubernetesClient @@ -82,17 +76,14 @@ public GatewayResult submitJar() { String status = String.valueOf( flinkDeployment1.getStatus().getJobManagerDeploymentStatus()); logger.info("deploy kubernetes , status is : {}", status); - process.info(String.format("deploy kubernetes , status is : %s", status)); String error = flinkDeployment1.getStatus().getError(); if (Asserts.isNotNullString(error)) { logger.info("deploy kubernetes error :{}", error); - process.info(String.format("deploy kubernetes error :%s", error)); throw new RuntimeException(error); } if (status.equals("READY")) { logger.info("deploy kubernetes success "); - process.info("deploy kubernetes success "); String jobId = flinkDeployment1 .getStatus() .getJobStatus() @@ -133,14 +124,10 @@ public GatewayResult submitJar() { } catch (KubernetesClientException e) { // some error while connecting to kube cluster result.fail(LogUtil.getError(e)); - process.error(LogUtil.getError(e)); e.printStackTrace(); } logger.info( "submit {} job finish, web url is {}, jobid is {}", getType(), result.getWebURL(), result.getJids()); - process.info(String.format( - "submit %s job finish, web url is %s, jobid is %s", getType(), result.getWebURL(), result.getJids())); - return result; } } diff --git a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java index a341fafeed..4710dfe29e 100644 --- a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java +++ b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java @@ -33,8 +33,6 @@ import org.dinky.data.result.SqlExplainResult; import org.dinky.metadata.query.IDBQuery; import org.dinky.metadata.result.JdbcSelectResult; -import org.dinky.process.context.ProcessContextHolder; -import org.dinky.process.model.ProcessEntity; import org.dinky.utils.LogUtil; import org.dinky.utils.TextUtil; @@ -59,25 +57,22 @@ import java.util.TreeSet; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidPooledConnection; import com.alibaba.druid.sql.SQLUtils; import com.alibaba.druid.sql.ast.SQLStatement; import cn.hutool.core.text.CharSequenceUtil; +import lombok.extern.slf4j.Slf4j; /** * AbstractJdbcDriver * * @since 2021/7/20 14:09 */ +@Slf4j public abstract class AbstractJdbcDriver extends AbstractDriver { - protected static Logger logger = LoggerFactory.getLogger(AbstractJdbcDriver.class); - protected ThreadLocal conn = new ThreadLocal<>(); private DruidDataSource dataSource; @@ -93,7 +88,7 @@ public String test() { DriverManager.getConnection(config.getUrl(), config.getUsername(), config.getPassword()) .close(); } catch (Exception e) { - logger.error("Jdbc链接测试失败!错误信息为:" + e.getMessage(), e); + log.error("Jdbc链接测试失败!错误信息为:" + e.getMessage(), e); return e.getMessage(); } return CommonConstant.HEALTHY; @@ -553,7 +548,7 @@ public StringBuilder genQueryOption(QueryData queryData) { @Override public JdbcSelectResult query(String sql, Integer limit) { - ProcessEntity process = ProcessContextHolder.getProcess(); + // TODO 改为ProcessStep注释 if (Asserts.isNull(limit)) { limit = 100; } @@ -606,7 +601,7 @@ public JdbcSelectResult query(String sql, Integer limit) { } catch (Exception e) { result.setError(LogUtil.getError(e)); result.setSuccess(false); - process.error(e.getMessage()); + log.error(e.getMessage()); } finally { close(preparedStatement, results); result.setRowData(datas); @@ -623,39 +618,39 @@ public JdbcSelectResult query(String sql, Integer limit) { */ @Override public JdbcSelectResult executeSql(String sql, Integer limit) { - ProcessEntity process = ProcessContextHolder.getProcess(); - process.info("Start parse sql..."); + // TODO 改为ProcessStep注释 + log.info("Start parse sql..."); List stmtList = SQLUtils.parseStatements(sql, config.getType().toLowerCase()); - process.info(CharSequenceUtil.format("A total of {} statement have been Parsed.", stmtList.size())); + log.info(CharSequenceUtil.format("A total of {} statement have been Parsed.", stmtList.size())); List resList = new ArrayList<>(); JdbcSelectResult result = JdbcSelectResult.buildResult(); - process.info("Start execute sql..."); + log.info("Start execute sql..."); for (SQLStatement item : stmtList) { String type = item.getClass().getSimpleName(); if (type.toUpperCase().contains("SELECT") || type.toUpperCase().contains("SHOW") || type.toUpperCase().contains("DESC") || type.toUpperCase().contains("SQLEXPLAINSTATEMENT")) { - process.info("Execute query."); + log.info("Execute query."); result = query(item.toString(), limit); } else if (type.toUpperCase().contains("INSERT") || type.toUpperCase().contains("UPDATE") || type.toUpperCase().contains("DELETE")) { try { - process.info("Execute update."); + log.info("Execute update."); resList.add(executeUpdate(item.toString())); result.setStatusList(resList); } catch (Exception e) { resList.add(0); result.setStatusList(resList); result.error(LogUtil.getError(e)); - process.error(e.getMessage()); + log.error(e.getMessage()); return result; } } else { try { - process.info("Execute DDL."); + log.info("Execute DDL."); execute(item.toString()); resList.add(1); result.setStatusList(resList); @@ -663,7 +658,7 @@ public JdbcSelectResult executeSql(String sql, Integer limit) { resList.add(0); result.setStatusList(resList); result.error(LogUtil.getError(e)); - process.error(e.getMessage()); + log.error(e.getMessage()); return result; } } @@ -674,10 +669,10 @@ public JdbcSelectResult executeSql(String sql, Integer limit) { @Override public List explain(String sql) { - ProcessEntity process = ProcessContextHolder.getProcess(); + // TODO 改为ProcessStep注释 List sqlExplainResults = new ArrayList<>(); String current = null; - process.info("Start check sql..."); + log.info("Start check sql..."); try { List stmtList = SQLUtils.parseStatements(sql, config.getType().toLowerCase()); @@ -686,10 +681,10 @@ public List explain(String sql) { String type = item.getClass().getSimpleName(); sqlExplainResults.add(SqlExplainResult.success(type, current, null)); } - process.info("Sql is correct."); + log.info("Sql is correct."); } catch (Exception e) { sqlExplainResults.add(SqlExplainResult.fail(current, LogUtil.getError(e))); - process.error(e.getMessage()); + log.error(e.getMessage()); } finally { return sqlExplainResults; } @@ -778,7 +773,7 @@ && contains(tableName, x.get(dbQuery.tableName()))) SimpleDateFormat.getDateInstance().parse(updateTime)); } } catch (ParseException ignored) { - logger.warn("set date fail"); + log.warn("set date fail"); } TableType tableType = TableType.type( isSplit(x.get(dbQuery.schemaName()), splitConfig), diff --git a/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/driver/DorisDriver.java b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/driver/DorisDriver.java index cf1dd1db90..ae7cedc961 100644 --- a/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/driver/DorisDriver.java +++ b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/driver/DorisDriver.java @@ -24,8 +24,6 @@ import org.dinky.metadata.query.DorisQuery; import org.dinky.metadata.query.IDBQuery; import org.dinky.metadata.result.JdbcSelectResult; -import org.dinky.process.context.ProcessContextHolder; -import org.dinky.process.model.ProcessEntity; import org.dinky.utils.LogUtil; import org.dinky.utils.SqlUtil; @@ -35,7 +33,9 @@ import java.util.Map; import cn.hutool.core.text.CharSequenceUtil; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class DorisDriver extends AbstractJdbcDriver { @Override @@ -65,21 +65,21 @@ public String getName() { @Override public JdbcSelectResult executeSql(String sql, Integer limit) { - ProcessEntity process = ProcessContextHolder.getProcess(); - process.info("Start parse sql..."); + // TODO 改为ProcessStep注释 + log.info("Start parse sql..."); String[] statements = SqlUtil.getStatements(SqlUtil.removeNote(sql)); - process.info(CharSequenceUtil.format("A total of {} statement have been Parsed.", statements.length)); + log.info(CharSequenceUtil.format("A total of {} statement have been Parsed.", statements.length)); List resList = new ArrayList<>(); JdbcSelectResult result = JdbcSelectResult.buildResult(); - process.info("Start execute sql..."); + log.info("Start execute sql..."); for (String item : statements) { String type = item.toUpperCase(); if (type.startsWith("SELECT") || type.startsWith("SHOW") || type.startsWith("DESC")) { - process.info("Execute query."); + log.info("Execute query."); result = query(item, limit); } else if (type.startsWith("INSERT") || type.startsWith("UPDATE") || type.startsWith("DELETE")) { try { - process.info("Execute update."); + log.info("Execute update."); resList.add(executeUpdate(item)); result.setStatusList(resList); result.success(); @@ -87,12 +87,12 @@ public JdbcSelectResult executeSql(String sql, Integer limit) { resList.add(0); result.setStatusList(resList); result.error(LogUtil.getError(e)); - process.error(e.getMessage()); + log.error(e.getMessage()); return result; } } else { try { - process.info("Execute DDL."); + log.info("Execute DDL."); execute(item); resList.add(1); result.setStatusList(resList); @@ -101,7 +101,7 @@ public JdbcSelectResult executeSql(String sql, Integer limit) { resList.add(0); result.setStatusList(resList); result.error(LogUtil.getError(e)); - process.error(e.getMessage()); + log.error(e.getMessage()); return result; } } diff --git a/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/driver/MySqlDriver.java b/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/driver/MySqlDriver.java index e8abdafc03..fbf6c4dcc6 100644 --- a/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/driver/MySqlDriver.java +++ b/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/driver/MySqlDriver.java @@ -35,11 +35,14 @@ import java.util.Map; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + /** * MysqlDriver * * @since 2021/7/20 14:06 */ +@Slf4j public class MySqlDriver extends AbstractJdbcDriver { @Override @@ -80,7 +83,7 @@ public Map getFlinkColumnTypeConversion() { @Override public String generateCreateTableSql(Table table) { String genTableSql = genTable(table); - logger.info("Auto generateCreateTableSql {}", genTableSql); + log.info("Auto generateCreateTableSql {}", genTableSql); return genTableSql; } diff --git a/dinky-process/src/main/java/org/dinky/process/pool/ConsolePool.java b/dinky-process/src/main/java/org/dinky/process/annotations/ExecuteProcess.java similarity index 55% rename from dinky-process/src/main/java/org/dinky/process/pool/ConsolePool.java rename to dinky-process/src/main/java/org/dinky/process/annotations/ExecuteProcess.java index fd6b047921..7917a6cd45 100644 --- a/dinky-process/src/main/java/org/dinky/process/pool/ConsolePool.java +++ b/dinky-process/src/main/java/org/dinky/process/annotations/ExecuteProcess.java @@ -17,27 +17,20 @@ * */ -package org.dinky.process.pool; +package org.dinky.process.annotations; -import org.dinky.pool.AbstractPool; +import org.dinky.process.enums.ProcessType; -/** - * ConsolePool - * - * @since 2022/10/18 22:51 - */ -public class ConsolePool extends AbstractPool { - - public static final ConsolePool INSTANCE = new ConsolePool(); +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; - public static void write(String str, Integer userId) { - String user = String.valueOf(userId); - INSTANCE.computeIfAbsent(user, k -> new StringBuilder("Dinky User Console:")) - .append(str); - } +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface ExecuteProcess { - public static void clear(Integer userId) { - String user = String.valueOf(userId); - INSTANCE.put(user, new StringBuilder("Dinky User Console:")); - } + ProcessType type(); } diff --git a/dinky-process/src/main/java/org/dinky/process/pool/ProcessPool.java b/dinky-process/src/main/java/org/dinky/process/annotations/ProcessId.java similarity index 68% rename from dinky-process/src/main/java/org/dinky/process/pool/ProcessPool.java rename to dinky-process/src/main/java/org/dinky/process/annotations/ProcessId.java index 7d7437a871..d4a12657e0 100644 --- a/dinky-process/src/main/java/org/dinky/process/pool/ProcessPool.java +++ b/dinky-process/src/main/java/org/dinky/process/annotations/ProcessId.java @@ -17,20 +17,15 @@ * */ -package org.dinky.process.pool; +package org.dinky.process.annotations; -import org.dinky.pool.AbstractPool; -import org.dinky.process.model.ProcessEntity; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; -/** - * ProcessPool - * - * @since 2022/10/16 17:00 - */ -public class ProcessPool extends AbstractPool { - - public static final ProcessPool INSTANCE = new ProcessPool(); - - @Override - public void refresh(ProcessEntity entity) {} -} +@Target(ElementType.PARAMETER) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface ProcessId {} diff --git a/dinky-admin/src/main/java/org/dinky/service/ProcessService.java b/dinky-process/src/main/java/org/dinky/process/annotations/ProcessStep.java similarity index 53% rename from dinky-admin/src/main/java/org/dinky/service/ProcessService.java rename to dinky-process/src/main/java/org/dinky/process/annotations/ProcessStep.java index 89eb4cf975..61130110c8 100644 --- a/dinky-admin/src/main/java/org/dinky/service/ProcessService.java +++ b/dinky-process/src/main/java/org/dinky/process/annotations/ProcessStep.java @@ -17,35 +17,19 @@ * */ -package org.dinky.service; +package org.dinky.process.annotations; -import org.dinky.process.model.ProcessEntity; +import org.dinky.process.enums.ProcessStepType; -import java.util.List; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; -/** ProcessService */ -public interface ProcessService { - - /** - * List all process - * - * @param active true: list active process, false: list inactive process {@link Boolean} - * @return {@link List}<{@link ProcessEntity}> - */ - List listAllProcess(boolean active); - - /** - * get log by user id - * - * @param userId user id {@link Integer} - * @return {@link String} - */ - String getConsoleByUserId(Integer userId); - - /** - * clear log by user id - * - * @param userId user id {@link Integer} - */ - void clearConsoleByUserId(Integer userId); +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface ProcessStep { + ProcessStepType type(); } diff --git a/dinky-process/src/main/java/org/dinky/process/context/ProcessContextHolder.java b/dinky-process/src/main/java/org/dinky/process/context/ProcessContextHolder.java deleted file mode 100644 index 7c163f74bd..0000000000 --- a/dinky-process/src/main/java/org/dinky/process/context/ProcessContextHolder.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.process.context; - -import org.dinky.assertion.Asserts; -import org.dinky.process.model.ProcessEntity; -import org.dinky.process.pool.ProcessPool; - -/** - * ProcessContextHolder - * - * @since 2022/10/16 16:57 - */ -public class ProcessContextHolder { - - private static final ThreadLocal PROCESS_CONTEXT = new ThreadLocal<>(); - - public static void setProcess(ProcessEntity process) { - PROCESS_CONTEXT.set(process); - } - - public static ProcessEntity getProcess() { - if (Asserts.isNull(PROCESS_CONTEXT.get())) { - return ProcessEntity.NULL_PROCESS; - } - return PROCESS_CONTEXT.get(); - } - - public static void clear() { - PROCESS_CONTEXT.remove(); - } - - public static ProcessEntity registerProcess(ProcessEntity process) { - Asserts.checkNull(process, "Process can not be null."); - setProcess(process); - ProcessPool.INSTANCE.put(process.getName(), process); - return process; - } -} diff --git a/dinky-process/src/main/java/org/dinky/process/enums/ProcessStepType.java b/dinky-process/src/main/java/org/dinky/process/enums/ProcessStepType.java new file mode 100644 index 0000000000..caa8163628 --- /dev/null +++ b/dinky-process/src/main/java/org/dinky/process/enums/ProcessStepType.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.process.enums; + +import org.dinky.assertion.Asserts; +import org.dinky.data.enums.Status; + +import lombok.Getter; + +@Getter +public enum ProcessStepType { + SUBMIT_TASK("SUBMIT_TASK", null, Status.PROCESS_SUBMIT_SUBMITTASK), + SUBMIT_PRECHECK("SUBMIT_PRECHECK", SUBMIT_TASK, Status.PROCESS_SUBMIT_CHECKSQL), + SUBMIT_EXECUTE("SUBMIT_EXECUTE", SUBMIT_TASK, Status.PROCESS_SUBMIT_EXECUTE), + SUBMIT_BUILD_CONFIG("SUBMIT_BUILD_CONFIG", SUBMIT_EXECUTE, Status.PROCESS_SUBMIT_BUILDCONFIG), + SUBMIT_EXECUTE_COMMON_SQL("SUBMIT_EXECUTE_COMMON_SQL", SUBMIT_BUILD_CONFIG, Status.PROCESS_SUBMIT_EXECUTECOMMSQL), + SUBMIT_EXECUTE_FLINK_SQL("SUBMIT_EXECUTE_FLINK_SQL", SUBMIT_BUILD_CONFIG, Status.PROCESS_SUBMIT_EXECUTEFLINKSQL), + UNKNOWN("UNKNOWN", null, Status.UNKNOWN_ERROR), + ; + + private final String value; + private final ProcessStepType parentStep; + private final Status desc; + + ProcessStepType(String type, ProcessStepType parentStep, Status desc) { + this.value = type; + this.parentStep = parentStep; + this.desc = desc; + } + + public static ProcessStepType get(String value) { + for (ProcessStepType type : ProcessStepType.values()) { + if (Asserts.isEquals(type.getValue(), value)) { + return type; + } + } + return ProcessStepType.UNKNOWN; + } +} diff --git a/dinky-process/src/main/java/org/dinky/process/exception/DinkyException.java b/dinky-process/src/main/java/org/dinky/process/exception/DinkyException.java index 54c6ebd007..3d4f317dcd 100644 --- a/dinky-process/src/main/java/org/dinky/process/exception/DinkyException.java +++ b/dinky-process/src/main/java/org/dinky/process/exception/DinkyException.java @@ -19,9 +19,6 @@ package org.dinky.process.exception; -import org.dinky.process.context.ProcessContextHolder; -import org.dinky.utils.LogUtil; - /** @since 0.7.0 */ public class DinkyException extends RuntimeException { @@ -29,21 +26,17 @@ public DinkyException() {} public DinkyException(String message) { super(message); - ProcessContextHolder.getProcess().error(message); } public DinkyException(String message, Throwable cause) { super(message, cause); - ProcessContextHolder.getProcess().error(LogUtil.getError(cause)); } public DinkyException(Throwable cause) { super(cause); - ProcessContextHolder.getProcess().error(cause.toString()); } public DinkyException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); - ProcessContextHolder.getProcess().error(LogUtil.getError(cause)); } } diff --git a/dinky-process/src/main/java/org/dinky/process/exception/ExcuteException.java b/dinky-process/src/main/java/org/dinky/process/exception/ExcuteException.java index ba29fecbfe..3c3cf7c056 100644 --- a/dinky-process/src/main/java/org/dinky/process/exception/ExcuteException.java +++ b/dinky-process/src/main/java/org/dinky/process/exception/ExcuteException.java @@ -19,30 +19,23 @@ package org.dinky.process.exception; -import org.dinky.process.context.ProcessContextHolder; -import org.dinky.utils.LogUtil; - public class ExcuteException extends Exception { public ExcuteException() {} public ExcuteException(String message) { super(message); - ProcessContextHolder.getProcess().error(message); } public ExcuteException(String message, Throwable cause) { super(message, cause); - ProcessContextHolder.getProcess().error(LogUtil.getError(cause)); } public ExcuteException(Throwable cause) { super(cause); - ProcessContextHolder.getProcess().error(cause.toString()); } public ExcuteException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); - ProcessContextHolder.getProcess().error(LogUtil.getError(cause)); } } diff --git a/dinky-process/src/main/java/org/dinky/process/model/ProcessEntity.java b/dinky-process/src/main/java/org/dinky/process/model/ProcessEntity.java index 318d0a2c9d..f9ae7c4d13 100644 --- a/dinky-process/src/main/java/org/dinky/process/model/ProcessEntity.java +++ b/dinky-process/src/main/java/org/dinky/process/model/ProcessEntity.java @@ -19,285 +19,43 @@ package org.dinky.process.model; -import org.dinky.assertion.Asserts; +import org.dinky.data.constant.CommonConstant; import org.dinky.process.enums.ProcessStatus; import org.dinky.process.enums.ProcessType; -import org.dinky.process.pool.ConsolePool; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; +import java.util.LinkedHashMap; -import cn.hutool.core.text.CharSequenceUtil; -import cn.hutool.core.text.StrFormatter; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; /** * Process * - * @since 2022/10/16 16:30 */ +@Builder +@Data +@AllArgsConstructor +@NoArgsConstructor public class ProcessEntity { - private String pid; private String name; - private Integer taskId; + private StringBuilder log; + private StringBuilder errLog; private ProcessType type; private ProcessStatus status; private LocalDateTime startTime; private LocalDateTime endTime; private long time; - private int stepIndex = 0; - private List steps; - private Integer userId; - - public static final ProcessEntity NULL_PROCESS = new ProcessEntity(); - - public ProcessEntity() {} - - public ProcessEntity(String pid, String name, Integer taskId, ProcessType type, Integer userId) { - this(pid, name, taskId, type, null, null, null, 0, null, userId); - } - - public ProcessEntity( - String name, - Integer taskId, - ProcessType type, - ProcessStatus status, - LocalDateTime startTime, - LocalDateTime endTime, - long time, - List steps, - Integer userId) { - this(null, name, taskId, type, status, startTime, endTime, time, steps, userId); - } - - public ProcessEntity( - String pid, - String name, - Integer taskId, - ProcessType type, - ProcessStatus status, - LocalDateTime startTime, - LocalDateTime endTime, - long time, - List steps, - Integer userId) { - this.pid = pid; - this.name = name; - this.taskId = taskId; - this.type = type; - this.status = status; - this.startTime = startTime; - this.endTime = endTime; - this.time = time; - this.steps = steps; - this.userId = userId; - } - - public static ProcessEntity init(ProcessType type, Integer userId) { - return init(type.getValue() + "_TEMP", null, type, userId); - } - - public static ProcessEntity init(Integer taskId, ProcessType type, Integer userId) { - return init(type.getValue() + taskId, taskId, type, userId); - } - - public static ProcessEntity init(String name, Integer taskId, ProcessType type, Integer userId) { - ProcessEntity process = new ProcessEntity(UUID.randomUUID().toString(), name, taskId, type, userId); - process.setStatus(ProcessStatus.INITIALIZING); - process.setStartTime(LocalDateTime.now()); - process.setSteps(new ArrayList<>()); - process.getSteps().add(ProcessStep.init()); - process.nextStep(); - return process; - } - - public void start() { - if (isNullProcess()) { - return; - } - steps.get(stepIndex - 1).setEndTime(LocalDateTime.now()); - setStatus(ProcessStatus.RUNNING); - steps.add(ProcessStep.run()); - nextStep(); - } - - public void finish() { - if (isNullProcess()) { - return; - } - steps.get(stepIndex - 1).setEndTime(LocalDateTime.now()); - setStatus(ProcessStatus.FINISHED); - setEndTime(LocalDateTime.now()); - setTime(getEndTime().compareTo(getStartTime())); - } - - public void finish(String str) { - if (isNullProcess()) { - return; - } - steps.get(stepIndex - 1).setEndTime(LocalDateTime.now()); - String message = CharSequenceUtil.format("\n[{}] {} INFO: {}", type.getValue(), LocalDateTime.now(), str); - steps.get(stepIndex - 1).appendInfo(message); - setStatus(ProcessStatus.FINISHED); - setEndTime(LocalDateTime.now()); - setTime(getEndTime().compareTo(getStartTime())); - ConsolePool.write(message, userId); - } - - public void config(String str) { - if (isNullProcess()) { - return; - } - String message = CharSequenceUtil.format("\n[{}] {} CONFIG: {}", type.getValue(), LocalDateTime.now(), str); - steps.get(stepIndex - 1).appendInfo(message); - ConsolePool.write(message, userId); - } - - public void info(String str) { - if (isNullProcess()) { - return; - } - String message = CharSequenceUtil.format("\n[{}] {} INFO: {}", type.getValue(), LocalDateTime.now(), str); - steps.get(stepIndex - 1).appendInfo(message); - ConsolePool.write(message, userId); - } - - public void info(String strPattern, Object... argArray) { - info(StrFormatter.format(strPattern, argArray)); - } - - public void infoSuccess() { - if (isNullProcess()) { - return; - } - steps.get(stepIndex - 1).appendInfo("Success."); - ConsolePool.write("Success.", userId); - } - - public void infoFail() { - if (isNullProcess()) { - return; - } - steps.get(stepIndex - 1).appendInfo("Fail."); - ConsolePool.write("Fail.", userId); - } - - public void error(String str) { - if (isNullProcess()) { - return; - } - String message = CharSequenceUtil.format("\n[{}] {} ERROR: {}", type.getValue(), LocalDateTime.now(), str); - steps.get(stepIndex - 1).appendInfo(message); - steps.get(stepIndex - 1).appendError(message); - ConsolePool.write(message, userId); - } - - public void error(String strPattern, Object... argArray) { - error(StrFormatter.format(strPattern, argArray)); - } - - public void nextStep() { - if (isNullProcess()) { - return; - } - stepIndex++; - } - - public boolean isNullProcess() { - return Asserts.isNullString(pid); - } - - public boolean isActiveProcess() { - return status.isActiveStatus(); - } - - public String getPid() { - return pid; - } - - public void setPid(String pid) { - this.pid = pid; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public Integer getTaskId() { - return taskId; - } - - public void setTaskId(Integer taskId) { - this.taskId = taskId; - } - - public ProcessType getType() { - return type; - } - - public void setType(ProcessType type) { - this.type = type; - } - - public ProcessStatus getStatus() { - return status; - } - - public void setStatus(ProcessStatus status) { - this.status = status; - } - - public LocalDateTime getStartTime() { - return startTime; - } - - public void setStartTime(LocalDateTime startTime) { - this.startTime = startTime; - } - - public LocalDateTime getEndTime() { - return endTime; - } - - public void setEndTime(LocalDateTime endTime) { - this.endTime = endTime; - } - - public long getTime() { - return time; - } - - public void setTime(long time) { - this.time = time; - } - - public Integer getUserId() { - return userId; - } - - public void setUserId(Integer userId) { - this.userId = userId; - } - - public List getSteps() { - return steps; - } - - public void setSteps(List steps) { - this.steps = steps; - } + private LinkedHashMap stepsMap; - public int getStepIndex() { - return stepIndex; + public void appendLog(String str) { + log.append(str).append(CommonConstant.LineSep); } - public void setStepIndex(int stepIndex) { - this.stepIndex = stepIndex; + public void appendErrLog(String str) { + errLog.append(str).append(CommonConstant.LineSep); } } diff --git a/dinky-process/src/main/java/org/dinky/process/model/ProcessStep.java b/dinky-process/src/main/java/org/dinky/process/model/ProcessStep.java index a87ac84fdf..d456cadbb0 100644 --- a/dinky-process/src/main/java/org/dinky/process/model/ProcessStep.java +++ b/dinky-process/src/main/java/org/dinky/process/model/ProcessStep.java @@ -19,117 +19,44 @@ package org.dinky.process.model; +import org.dinky.data.constant.CommonConstant; import org.dinky.process.enums.ProcessStatus; +import org.dinky.process.enums.ProcessStepType; import java.time.LocalDateTime; +import java.util.LinkedHashMap; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; /** * ProcessStep * * @since 2022/10/16 16:46 */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder public class ProcessStep { + private String name; private ProcessStatus stepStatus; + private ProcessStepType type; private LocalDateTime startTime; private LocalDateTime endTime; private long time; - private StringBuilder info = new StringBuilder(); - private StringBuilder error = new StringBuilder(); - private boolean isError = false; - - public ProcessStep() {} - - public ProcessStep(ProcessStatus stepStatus, LocalDateTime startTime) { - this(stepStatus, startTime, null, 0, new StringBuilder(), new StringBuilder()); - } - - public ProcessStep( - ProcessStatus stepStatus, - LocalDateTime startTime, - LocalDateTime endTime, - long time, - StringBuilder info, - StringBuilder error) { - this.stepStatus = stepStatus; - this.startTime = startTime; - this.endTime = endTime; - this.time = time; - this.info = info; - this.error = error; - } - - public static ProcessStep init() { - return new ProcessStep(ProcessStatus.INITIALIZING, LocalDateTime.now()); - } - - public static ProcessStep run() { - return new ProcessStep(ProcessStatus.RUNNING, LocalDateTime.now()); - } - - public void appendInfo(String str) { - info.append(str); - } - - public void appendError(String str) { - error.append(str); - isError = true; - } - - public ProcessStatus getStepStatus() { - return stepStatus; - } - - public void setStepStatus(ProcessStatus stepStatus) { - this.stepStatus = stepStatus; - } - - public LocalDateTime getStartTime() { - return startTime; - } - - public void setStartTime(LocalDateTime startTime) { - this.startTime = startTime; - } - - public LocalDateTime getEndTime() { - return endTime; - } - - public void setEndTime(LocalDateTime endTime) { - this.endTime = endTime; - this.time = endTime.compareTo(startTime); - } - - public long getTime() { - return time; - } - - public void setTime(long time) { - this.time = time; - } - - public StringBuilder getInfo() { - return info; - } - - public void setInfo(StringBuilder info) { - this.info = info; - } - - public StringBuilder getError() { - return error; - } - - public void setError(StringBuilder error) { - this.error = error; - } + private StringBuilder log = new StringBuilder(); + private StringBuilder errLog = new StringBuilder(); + private LinkedHashMap childStepsMap; - public boolean isError() { - return isError; + public void appendLog(String str) { + log.append(str).append(CommonConstant.LineSep); } - public void setError(boolean error) { - isError = error; + public void appendErrLog(String str) { + errLog.append(str).append(CommonConstant.LineSep); } } diff --git a/dinky-web/src/pages/DataStudio/BottomContainer/Console/service.tsx b/dinky-web/src/pages/DataStudio/BottomContainer/Console/service.tsx index b5f9cfe559..ec9e79f6f3 100644 --- a/dinky-web/src/pages/DataStudio/BottomContainer/Console/service.tsx +++ b/dinky-web/src/pages/DataStudio/BottomContainer/Console/service.tsx @@ -1,5 +1,5 @@ import { queryDataByParams } from '@/services/BusinessCrud'; export function getConsoleData() { - return queryDataByParams('api/process/getConsoleByUserId'); + // return queryDataByParams('api/process/getConsoleByUserId'); }