Skip to content

Commit

Permalink
[Refactor ][admin]Refactor process (#2391)
Browse files Browse the repository at this point in the history
* refactor process

* refactor process

* refactor processController

* refactor processController

* add i81n

* merge dev

* remove err annoation

* fix precheck error

* fix precheck error

* fix precheck error

* fix precheck error

* fix api controller error

* formate code
  • Loading branch information
gaoyan1998 authored Oct 17, 2023
1 parent cd7470a commit 7ebe855
Show file tree
Hide file tree
Showing 44 changed files with 924 additions and 878 deletions.
4 changes: 1 addition & 3 deletions dinky-admin/src/main/java/org/dinky/aop/LogAspect.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ protected void handleCommonLogic(final JoinPoint joinPoint, final Exception e, O

if (e != null) {
operLog.setStatus(BusinessStatus.FAIL.ordinal());
log.error("pre doAfterThrowing Exception:{}", e.getMessage());
operLog.setErrorMsg(StringUtils.substring(e.getMessage(), 0, 2000));
}
operLog.setStatus(BusinessStatus.SUCCESS.ordinal());
Expand All @@ -137,8 +136,7 @@ protected void handleCommonLogic(final JoinPoint joinPoint, final Exception e, O

} catch (Exception exp) {
// 记录本地异常日志
log.error("pre doAfterThrowing Exception:{}", exp.getMessage());
exp.printStackTrace();
log.error("pre doAfterThrowing Exception:", exp);
}
}

Expand Down
129 changes: 129 additions & 0 deletions dinky-admin/src/main/java/org/dinky/aop/ProcessAspect.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.aop;

import org.dinky.context.ConsoleContextHolder;
import org.dinky.process.annotations.ExecuteProcess;
import org.dinky.process.annotations.ProcessId;
import org.dinky.process.annotations.ProcessStep;
import org.dinky.process.enums.ProcessStatus;
import org.dinky.process.enums.ProcessStepType;
import org.dinky.process.enums.ProcessType;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Aspect
@Slf4j
@Component
public class ProcessAspect {

public static String PROCESS_NAME = "name";
public static String PROCESS_STEP = "step";
public ConsoleContextHolder contextHolder = ConsoleContextHolder.getInstances();

/**
* Block all {@link ExecuteProcess} annotations,
* As the beginning of the process, set all initialization information
*/
@Around(value = "@annotation(executeProcess)")
public Object processAround(ProceedingJoinPoint joinPoint, ExecuteProcess executeProcess) throws Throwable {

Object result;
Object processId = getProcessId(joinPoint);
String name = executeProcess.type() + String.valueOf(processId);
ProcessType type = executeProcess.type();
contextHolder.registerProcess(type, name);
MDC.put(PROCESS_NAME, name);

try {
result = joinPoint.proceed();
contextHolder.finishedProcess(name, ProcessStatus.FINISHED, null);
} catch (Throwable e) {
contextHolder.finishedProcess(name, ProcessStatus.FAILED, e);
throw e;
} finally {
// Note that this must be cleaned up,Otherwise, the situation of OOM may occur
MDC.clear();
}
return result;
}

/**
* Block all {@link ProcessStep} annotations,
* As a specific task step
*/
@Around(value = "@annotation(processStep)")
public Object processStepAround(ProceedingJoinPoint joinPoint, ProcessStep processStep) throws Throwable {

Object result;
// Record the current step and restore it after the execution is completed
String parentStep = MDC.get(PROCESS_STEP);
ProcessStepType processStepType = processStep.type();
MDC.put(PROCESS_STEP, processStepType.getValue());
contextHolder.registerProcessStep(processStepType, MDC.get(PROCESS_NAME));

try {
result = joinPoint.proceed();
contextHolder.finishedStep(MDC.get(PROCESS_NAME), processStepType, ProcessStatus.FINISHED, null);
} catch (Exception e) {
contextHolder.finishedStep(MDC.get(PROCESS_NAME), processStepType, ProcessStatus.FAILED, e);
throw e;
} finally {
// If a parent step exists, it is restored after the execution is complete
if (parentStep != null) {
MDC.put(PROCESS_STEP, parentStep);
}
}
return result;
}

private Object getProcessId(ProceedingJoinPoint joinPoint) {
Object[] params = joinPoint.getArgs();
if (params.length == 0) {
throw new IllegalArgumentException("Must have ProcessId params");
}

// Get the method, here you can convert the signature strong to MethodSignature
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();

Annotation[][] annotations = method.getParameterAnnotations();
for (int i = 0; i < annotations.length; i++) {
Object param = params[i];
Annotation[] paramAnn = annotations[i];
for (Annotation annotation : paramAnn) {
if (annotation instanceof ProcessId) {
return param;
}
}
}
throw new IllegalArgumentException("Must have ProcessId annoation params");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void updateState() {
metrics.setContent(metricsTotal);
metrics.setHeartTime(now);
metrics.setModel(MetricsType.LOCAL.getType());
MetricsContextHolder.sendAsync(metrics);
MetricsContextHolder.getInstances().sendAsync(metrics.getModel(), metrics);

log.debug("Collecting jvm information ends.");
}
Expand Down
105 changes: 105 additions & 0 deletions dinky-admin/src/main/java/org/dinky/context/BaseSseContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.context;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalNotification;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class BaseSseContext<K, V> {

/**
* Cache that stores SseEmitter objects for sending metric data,
* prevents OOM with LoadingCache, and is automatically removed when objects
* in the cache are not accessed or used for more than 60 seconds.
*/
protected LoadingCache<K, List<SseEmitter>> sseList = CacheBuilder.newBuilder()
.expireAfterAccess(60, TimeUnit.SECONDS)
.removalListener(this::onRemove)
.build(CacheLoader.from(key -> new ArrayList<>()));

/**
* Called during an asynchronous send procedure
*/
public abstract void append(K key, V o);

/**
* send data asynchronously.
*/
public void sendAsync(K key, V o) {
CompletableFuture.runAsync(() -> {
append(key, o);
send(key, o);
});
}

/**
* send data.
*/
protected void send(K key, V o) {
List<SseEmitter> sseEmitters = sseList.getIfPresent(key);
if (sseEmitters != null) {
sseEmitters.forEach(sseEmitter -> {
try {
sseEmitter.send(o);
} catch (Exception e) {
log.warn("send metrics error:{}", e.getMessage());
closeSse(sseEmitter);
sseEmitters.remove(sseEmitter);
}
});
}
}

/**
* remove the SseEmitter object from the cache
* When the connection times out or actively exits.
*
* @param removalNotification RemovalNotification object
*/
protected void onRemove(RemovalNotification<K, List<SseEmitter>> removalNotification) {
assert removalNotification.getValue() != null;
removalNotification.getValue().forEach(this::closeSse);
}

/**
* close the SseEmitter object.
*
* @param sseEmitter SseEmitter object
*/
protected void closeSse(SseEmitter sseEmitter) {
try {
sseEmitter.complete();
} catch (Exception e) {
log.warn("complete sseEmitter failed:{}", e.getMessage());
}
}
}
Loading

0 comments on commit 7ebe855

Please sign in to comment.