Skip to content

Commit

Permalink
Merge pull request #73 from Lanzhengjian98/AddTurboPlugin
Browse files Browse the repository at this point in the history
Add turbo plugin function
  • Loading branch information
YORYOR authored Nov 29, 2024
2 parents d8c3d79 + 5de54e1 commit 5957c7d
Show file tree
Hide file tree
Showing 106 changed files with 6,270 additions and 184 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@

All notable changes to this project will be documented in this file.

## [1.2.0] - 2024-11-21

Support plugin extension function
- Add `node_type` field to the `ei_node_instance` table in the database to save the node type.
- Add `NodeExecuteResult` inner class to the `RuntimeResult` class, and move the `activeTaskInstance` and `variables` fields to the inner class.
- Add `properties` variable to the `CommonPO` entity class to store extended data.
- Add `ExtendRuntimeContext` class to store extended branch context information.

Support parallel gateway and inclusive gateway through plugins

### Bugfix
- Fix v1.1.1 release issues

## [1.1.1] - 2023-06-26
### Bugfix
- Fix v1.1.0 release issues
Expand Down
6 changes: 3 additions & 3 deletions demo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@
<parent>
<groupId>com.didiglobal.turbo</groupId>
<artifactId>turbo</artifactId>
<version>1.1.1</version>
<version>1.2.0</version>
</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>demo</artifactId>
<version>1.1.1</version>
<version>1.2.0</version>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>

<spring-boot.version>1.5.10.RELEASE</spring-boot.version>
<turbo.engine.version>1.1.1</turbo.engine.version>
<turbo.engine.version>1.2.0</turbo.engine.version>
<h2.version>1.4.200</h2.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,16 @@ public void updateFlow() {
}

public void deployFlow() {
// Deploy main flow
DeployFlowParam deployMainFlowParam = new DeployFlowParam(tenant, caller);
deployMainFlowParam.setFlowModuleId(createMainFlowResult.getFlowModuleId());
deployMainFlowResult = processEngine.deployFlow(deployMainFlowParam);
LOGGER.info("deployMainFlow.||deployMainFlowResult={}", deployMainFlowResult);
// Deploy sub flow
DeployFlowParam deploySubFlowParam = new DeployFlowParam(tenant, caller);
deploySubFlowParam.setFlowModuleId(createSubFlowResult.getFlowModuleId());
deploySubFlowResult = processEngine.deployFlow(deploySubFlowParam);
LOGGER.info("deploySubFlow.||deploySubFlowResult={}", deploySubFlowResult);
// Deploy main flow
DeployFlowParam deployMainFlowParam = new DeployFlowParam(tenant, caller);
deployMainFlowParam.setFlowModuleId(createMainFlowResult.getFlowModuleId());
deployMainFlowResult = processEngine.deployFlow(deployMainFlowParam);
LOGGER.info("deployMainFlow.||deployMainFlowResult={}", deployMainFlowResult);
}

public void startProcessToEnd(boolean auth) {
Expand All @@ -134,7 +134,7 @@ public void startProcessToEnd(boolean auth) {
// Now it is stuck in the second user node of the parent process, 'Generate Work Order', driving the completion of the parent process
commitTaskResult = commitMainFlowUserTask2(commitTaskResult);

assert commitTaskResult.getStatus() == ErrorEnum.SUCCESS.getErrNo();
assert commitTaskResult.getErrCode() == ErrorEnum.SUCCESS.getErrNo();
}

private StartProcessResult startProcessToCallActivity() {
Expand Down
195 changes: 108 additions & 87 deletions demo/src/test/resources/script/turbo-mysql-h2-ddl.sql

Large diffs are not rendered by default.

108 changes: 108 additions & 0 deletions docs/Parallel&InclusiveGateway.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
## 并行网关&包容网关
### 1. 概述
本插件为Turbo提供“并行网关”和“包容网关”的多分支并行处理能力,使开发者可以在工作流中灵活处理分支流程。

**🌟🌟🌟注意**:并行网关与包容网关均不支持跨网关的节点回滚操作
### 2. 功能介绍
#### 2.1 并行网关
* 支持在流程节点处创建多个并行任务。
* 所有分支任务完成后,流程继续向下执行。
* 应用场景:同时启动多个独立任务,如审批、数据处理。
#### 2.2 包容网关
* 支持有选择性地激活部分分支任务。
* 允许多个分支执行完毕后合并,未执行的分支不影响主流程。
* 应用场景:根据条件选择性地执行某些任务,如特定条件下的审批链。
### 3. 插件依赖
* Turbo 1.2.0+
### 4. 插件配置(plugin.properties)
* 数据库连接配置
```properties
# JDBC config
turbo.plugin.jdbc.url=jdbc:mysql://127.0.0.1:3306/t_engine?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&autoReconnect=true
turbo.plugin.jdbc.username=username
turbo.plugin.jdbc.password=password
turbo.plugin.jdbc.driver=com.mysql.jdbc.Driver
turbo.plugin.jdbc.maximumPoolSize=10
```
* 并行网关节点与包容网关节点配置
```properties
# 自定义设置并行网关与包容网关NodeType。并行网关默认为9,包容网关默认为10。如非覆盖Turbo原有执行器插件,请不要设置为1-8
turbo.plugin.element_type.ParallelGatewayElementPlugin=9
turbo.plugin.element_type.InclusiveGatewayElementPlugin=10
# 并行网关与包容网关的开关配置。默认为true开启
turbo.plugin.support.ParallelGatewayElementPlugin=true
turbo.plugin.support.InclusiveGatewayElementPlugin=true
```
### 5. 插件使用
#### 5.1 分支汇聚策略
并行网关与包容网关都支持指定分支汇聚策略,目前支持的策略有:
* JoinAll(默认):所有分支任务完成后到达汇聚节点,继续向下执行。
* AnyOne:任意一个分支任务完成后到达汇聚节点,继续向下执行。
* Custom:自定义策略,需继承`com.didiglobal.turbo.plugin.executor.BranchMergeCustom`类,重写`joinFirst``joinMerge`方法,并在该类上添加`@Primary`注解。
#### 5.2 数据汇聚策略
并行网关与包容网关都支持指定分支数据合并策略,目前支持的策略有:
* All(默认): 将所有分支的数据合并到一个Map中,并作为参数传递给下游节点。需要注意的是,如果key相同的情况下,后到达的分支数据会覆盖之前到达的分支数据。
* None: 不进行数据合并,使用分支fork时的数据作为参数传递给下游节点。
* Custom: 自定义策略,需继承`com.didiglobal.turbo.plugin.executor.DataMergeCustom`类,重写`merge`方法,并在该类上添加`@Primary`注解。
#### 5.3 并行网关节点示例
```java
{
ParallelGateway parallelGateway = new ParallelGateway();
// 设置节点key, 节点唯一标识
parallelGateway.setKey("ParallelGateway_38ad233");
// 设置节点类型, 默认为9
parallelGateway.setType(ExtendFlowElementType.PARALLEL_GATEWAY);

List<String> egIncomings = new ArrayList<>();
egIncomings.add("SequenceFlow_2gugjee");
parallelGateway.setIncoming(egIncomings);

// 设置多个分支出口
List<String> egOutgoings = new ArrayList<>();
egOutgoings.add("SequenceFlow_12rbl6u");
egOutgoings.add("SequenceFlow_3ih7eta");
parallelGateway.setOutgoing(egOutgoings);

Map<String, Object> properties = new HashMap<>();
Map<String, String> forkJoinMatch = new HashMap<>();
// 记录分支Fork节点
forkJoinMatch.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.FORK, "ParallelGateway_38ad233");
// 记录分支Join节点
forkJoinMatch.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.JOIN, "ParallelGateway_10lo44j");
properties.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.FORK_JOIN_MATCH, JSONArray.toJSON(forkJoinMatch));
parallelGateway.setProperties(properties);
}
```
#### 5.4 包容网关节点示例
```java
{
InclusiveGateway inclusiveGateway = new InclusiveGateway();
// 设置节点key, 节点唯一标识
inclusiveGateway.setKey("InclusiveGateway_3a1nn9f");
// 设置节点类型, 默认为10
inclusiveGateway.setType(ExtendFlowElementType.INCLUSIVE_GATEWAY);

// 多个分支入口
List<String> egIncomings = new ArrayList<>();
egIncomings.add("SequenceFlow_1h65e8t");
egIncomings.add("SequenceFlow_25kdv36");
inclusiveGateway.setIncoming(egIncomings);

List<String> egOutgoings = new ArrayList<>();
egOutgoings.add("SequenceFlow_3jkd63g");
inclusiveGateway.setOutgoing(egOutgoings);

Map<String, Object> properties = new HashMap<>();
Map<String, String> forkJoinMatch = new HashMap<>();
// 记录分支Fork节点
forkJoinMatch.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.FORK, "InclusiveGateway_1djgrgp");
// 记录分支Join节点
forkJoinMatch.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.JOIN, "InclusiveGateway_3a1nn9f");
properties.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.FORK_JOIN_MATCH, JSONArray.toJSON(forkJoinMatch));
// 设置分支汇聚策略(在汇聚节点设置)
properties.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.BRANCH_MERGE, MergeStrategy.BRANCH_MERGE.ANY_ONE);
// 设置分支数据合并策略(在汇聚节点设置)
properties.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.DATA_MERGE, MergeStrategy.DATA_MERGE.NONE);
inclusiveGateway.setProperties(properties);
}
```
201 changes: 201 additions & 0 deletions docs/PluginDevelopGuide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
## Turbo插件开发指南

### 1. **概述**
Turbo支持插件扩展功能,开发者可以通过编写插件实现自定义逻辑,而无需修改主框架代码。插件使用 SPI 机制进行加载,支持在运行时动态发现和加载。

插件目前支持应用场景包括:
- 增加新的元素节点处理能力
- 使用自定义的ID生成器
- 使用自定义的表达式计算器
---
### 2. **插件结构和要求**
#### 插件目录结构
```
src/
└── main/
└── resources/
├── plugin.properties
└── META-INF/
└── services/
├── com.didiglobal.turbo.engine.plugin.ElementPlugin
├── com.didiglobal.turbo.engine.plugin.ExpressionCalculatorPlugin
└── com.didiglobal.turbo.engine.plugin.IdGeneratorPlugin
```
#### 根据插件类型需要实现的接口
**插件顶层接口**
```java
public interface Plugin {
// turbo插件开关配置格式建议统一为turbo.plugin.support.${pluginName}
String PLUGIN_SUPPORT_PREFIX = "turbo.plugin.support.";
// turbo插件初始化文件配置格式建议统一为turbo.plugin.init_sql.${pluginName}
String PLUGIN_INIT_SQL_FILE_PREFIX = "turbo.plugin.init_sql.";
/**
* 插件名称,唯一标识
*/
String getName();
/**
* 插件开关
*/
Boolean support();
/**
* 插件初始化
*/
Boolean init();
}
```
- **ElementPlugin**:实现该接口,扩展新的元素节点处理能力。
```java
public interface ElementPlugin extends Plugin{
String ELEMENT_TYPE_PREFIX = "turbo.plugin.element_type.";
ElementExecutor getElementExecutor();
ElementValidator getElementValidator();
Integer getFlowElementType();
}
```
- **ExpressionCalculatorPlugin**:实现该接口,使用自定义的表达式计算器。
```java
public interface ExpressionCalculatorPlugin extends Plugin{
ExpressionCalculator getExpressionCalculator();
}
```
- **IdGeneratorPlugin**:实现该接口,使用自定义的ID生成器。
```java
public interface IdGeneratorPlugin extends Plugin{
IdGenerator getIdGenerator();
}
```
---
### 3. **开发流程**
- 创建项目(可以用 Maven/Gradle 或直接在现有项目中新增模块)。
- 实现插件接口或继承插件基类。
- 编写配置文件,声明插件初始化等信息。
- 测试插件功能。

#### 步骤 1:创建插件项目
使用 Maven 构建插件项目:
```shell
mvn archetype:generate -DgroupId=com.example.plugin -DartifactId=MyPlugin
```

#### 步骤 2:实现插件功能
示例:
```java
public class MyPlugin implements IdGeneratorPlugin {
@Override
public String getName() {
return "MyPluginName";
}
@Override
public Boolean support() {
return true;
}
@Override
public Boolean init() {
System.out.println("MyPlugin initialized");
return true;
}
@Override
public IdGenerator getIdGenerator() {
return new MyDefinedIdGenerator();
}
}
```

#### 步骤 3:添加配置文件,指定加载插件类
`src/main/resources/plugin.properties` 中定义插件必要信息,如初始化脚本文件路径等:
```
turbo.plugin.init_sql.ParallelGatewayElementPlugin=sql/parallelGateway.sql
```
`src/main/resources/META-INF/services/` 文件夹下创建扩展插件类型接口对应全路径路名文件,并指定插件实现类:

创建 `src/main/resources/META-INF/services/com.didiglobal.turbo.engine.plugin.ElementPlugin` 文件,并写入:
```
com.didiglobal.turbo.plugin.ParallelGatewayElementPlugin
```
#### 步骤 4:测试插件功能
- 初始化测试:
- 检查插件的初始化逻辑是否正确执行。
- 验证插件是否能正确加载配置(如 plugin.properties 或其他配置文件)。
- 功能点测试:
- 调用插件的主要功能方法,验证输出是否符合预期。
- 如果插件涉及外部接口或服务,检查是否能正常连接并获取数据。
---
### 4. **插件加载机制**
#### 插件发现与加载
主应用会在初始化时通过 SPI 机制自动发现并加载插件。确保以下条件满足:
- `src/main/resources/META-INF/services/`文件夹下存在对应插件类型的全路径类名文件。
- 全路径类名文件中指定了插件实现类的全路径
#### 插件启动过程
1. 通过 `ServiceLoader` 加载所有插件。
2. 调用插件的 `getName` 方法,检查是否存在插件名称冲突。
3. 调用插件的 `support` 方法,判断是否需要使用该插件。
1. 如果为元素节点插件,会调用 `getFlowElementType` 方法,判断该类型元素节点是否存在冲突。
4. 调用插件的 `init` 方法,进行初始化操作。
---
### 5. **插件DAO扩展**
Turbo为维持DAO层的简洁,未提供直接在DAO层的扩展能力。为解决部分插件需要在原有DAO层进行扩展,Turbo提供通过Mybatis拦截器的方式进行扩展。
#### 步骤 1:实现`CustomOperationHandler`接口
示例:
```java
public class ParallelNodeInstanceHandler implements CustomOperationHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ParallelNodeInstanceHandler.class);
@Override
public void handle(SqlCommandType commandType, MappedStatement mappedStatement, Object parameterObject, Object originalResult, SqlSessionFactory sqlSessionFactory) {
SqlSession sqlSession = sqlSessionFactory.openSession();
try {
ParallelNodeInstanceMapper mapper = sqlSession.getMapper(ParallelNodeInstanceMapper.class);
switch (commandType) {
case INSERT:
handleInsert(parameterObject, mapper);
break;
case UPDATE:
handleUpdate(parameterObject, mapper);
break;
case DELETE:
handleDelete(parameterObject, mapper);
break;
case SELECT:
handleSelect(originalResult, mapper);
break;
default:
LOGGER.warn("Unhandled command type: {}", commandType);
break;
}
} catch (Exception e) {
LOGGER.error("Exception occurred during handling. CommandType={} | ParameterObject={} | OriginalResult={}",
commandType, parameterObject, originalResult, e);
} finally {
sqlSession.close();
}
}
}
```
#### 步骤 2:注册自定义操作处理器,并指定处理的PO类型
示例:
```java
@Configuration
@ComponentScan("com.didiglobal.turbo.plugin")
@MapperScan("com.didiglobal.turbo.plugin.dao")
@EnableAutoConfiguration(exclude = {DruidDataSourceAutoConfigure.class})
public class ParallelPluginConfig {

@PostConstruct
public void init() {
CustomOperationHandlerRegistry.registerHandler(EntityPOEnum.NODE_INSTANCE, new ParallelNodeInstanceHandler());
CustomOperationHandlerRegistry.registerHandler(EntityPOEnum.NODE_INSTANCE_LOG, new ParallelNodeInstanceLogHandler());
}
}
```
---
### 6. **示例插件**
[并行网关插件](../parallel-plugin/src/main/java/com/didiglobal/turbo/plugin/ParallelGatewayElementPlugin.java)

---
### 7. **插件相关配置**
以下是我们希望插件开发者遵循的一些配置项规范

| 配置项 | 配置名称 | 示例 | 配置说明 |
|----------------------------------|------------|----------------------------------------------------------------------------|--------------------------------------|
|turbo.plugin.support.${pluginName}| 插件开关配置 | turbo.plugin.support.ParallelGatewayElementPlugin=false | 用于控制support方法的返回值,默认返回true |
|turbo.plugin.init_sql.${pluginName}| 数据库初始化脚本路径 | turbo.plugin.init_sql.ParallelGatewayElementPlugin=sql/parallelGateway.sql | 用于指定初始化脚本位置,这个脚本应该是幂等的 |
|turbo.plugin.element_type.${pluginName}| 元素节点类型 | turbo.plugin.element_type.ParallelGatewayElementPlugin=9 | 支持插件使用方自己去指定元素节点类型,避免多个插件使用相同的元素类型导致冲突 |
Loading

0 comments on commit 5957c7d

Please sign in to comment.