Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add turbo plugin function #73

Merged
merged 4 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@

All notable changes to this project will be documented in this file.

## [1.2.0] - 2024-11-21

Support plugin extension function
- Add `node_type` field to the `ei_node_instance` table in the database to save the node type.
- Add `NodeExecuteResult` inner class to the `RuntimeResult` class, and move the `activeTaskInstance` and `variables` fields to the inner class.
- Add `properties` variable to the `CommonPO` entity class to store extended data.
- Add `ExtendRuntimeContext` class to store extended branch context information.

Support parallel gateway and inclusive gateway through plugins

### Bugfix
- Fix v1.1.1 release issues

## [1.1.1] - 2023-06-26
### Bugfix
- Fix v1.1.0 release issues
Expand Down
6 changes: 3 additions & 3 deletions demo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@
<parent>
<groupId>com.didiglobal.turbo</groupId>
<artifactId>turbo</artifactId>
<version>1.1.1</version>
<version>1.2.0</version>
</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>demo</artifactId>
<version>1.1.1</version>
<version>1.2.0</version>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>

<spring-boot.version>1.5.10.RELEASE</spring-boot.version>
<turbo.engine.version>1.1.1</turbo.engine.version>
<turbo.engine.version>1.2.0</turbo.engine.version>
<h2.version>1.4.200</h2.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,16 @@ public void updateFlow() {
}

public void deployFlow() {
// Deploy main flow
DeployFlowParam deployMainFlowParam = new DeployFlowParam(tenant, caller);
deployMainFlowParam.setFlowModuleId(createMainFlowResult.getFlowModuleId());
deployMainFlowResult = processEngine.deployFlow(deployMainFlowParam);
LOGGER.info("deployMainFlow.||deployMainFlowResult={}", deployMainFlowResult);
// Deploy sub flow
DeployFlowParam deploySubFlowParam = new DeployFlowParam(tenant, caller);
deploySubFlowParam.setFlowModuleId(createSubFlowResult.getFlowModuleId());
deploySubFlowResult = processEngine.deployFlow(deploySubFlowParam);
LOGGER.info("deploySubFlow.||deploySubFlowResult={}", deploySubFlowResult);
// Deploy main flow
DeployFlowParam deployMainFlowParam = new DeployFlowParam(tenant, caller);
deployMainFlowParam.setFlowModuleId(createMainFlowResult.getFlowModuleId());
deployMainFlowResult = processEngine.deployFlow(deployMainFlowParam);
LOGGER.info("deployMainFlow.||deployMainFlowResult={}", deployMainFlowResult);
}

public void startProcessToEnd(boolean auth) {
Expand All @@ -134,7 +134,7 @@ public void startProcessToEnd(boolean auth) {
// Now it is stuck in the second user node of the parent process, 'Generate Work Order', driving the completion of the parent process
commitTaskResult = commitMainFlowUserTask2(commitTaskResult);

assert commitTaskResult.getStatus() == ErrorEnum.SUCCESS.getErrNo();
assert commitTaskResult.getErrCode() == ErrorEnum.SUCCESS.getErrNo();
}

private StartProcessResult startProcessToCallActivity() {
Expand Down
195 changes: 108 additions & 87 deletions demo/src/test/resources/script/turbo-mysql-h2-ddl.sql

Large diffs are not rendered by default.

108 changes: 108 additions & 0 deletions docs/Parallel&InclusiveGateway.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
## 并行网关&包容网关
### 1. 概述
本插件为Turbo提供“并行网关”和“包容网关”的多分支并行处理能力,使开发者可以在工作流中灵活处理分支流程。

**🌟🌟🌟注意**:并行网关与包容网关均不支持跨网关的节点回滚操作
### 2. 功能介绍
#### 2.1 并行网关
* 支持在流程节点处创建多个并行任务。
* 所有分支任务完成后,流程继续向下执行。
* 应用场景:同时启动多个独立任务,如审批、数据处理。
#### 2.2 包容网关
* 支持有选择性地激活部分分支任务。
* 允许多个分支执行完毕后合并,未执行的分支不影响主流程。
* 应用场景:根据条件选择性地执行某些任务,如特定条件下的审批链。
### 3. 插件依赖
* Turbo 1.2.0+
### 4. 插件配置(plugin.properties)
* 数据库连接配置
```properties
# JDBC config
turbo.plugin.jdbc.url=jdbc:mysql://127.0.0.1:3306/t_engine?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&autoReconnect=true
turbo.plugin.jdbc.username=username
turbo.plugin.jdbc.password=password
turbo.plugin.jdbc.driver=com.mysql.jdbc.Driver
turbo.plugin.jdbc.maximumPoolSize=10
```
* 并行网关节点与包容网关节点配置
```properties
# 自定义设置并行网关与包容网关NodeType。并行网关默认为9,包容网关默认为10。如非覆盖Turbo原有执行器插件,请不要设置为1-8
turbo.plugin.element_type.ParallelGatewayElementPlugin=9
turbo.plugin.element_type.InclusiveGatewayElementPlugin=10
# 并行网关与包容网关的开关配置。默认为true开启
turbo.plugin.support.ParallelGatewayElementPlugin=true
turbo.plugin.support.InclusiveGatewayElementPlugin=true
```
### 5. 插件使用
#### 5.1 分支汇聚策略
并行网关与包容网关都支持指定分支汇聚策略,目前支持的策略有:
* JoinAll(默认):所有分支任务完成后到达汇聚节点,继续向下执行。
* AnyOne:任意一个分支任务完成后到达汇聚节点,继续向下执行。
* Custom:自定义策略,需继承`com.didiglobal.turbo.plugin.executor.BranchMergeCustom`类,重写`joinFirst``joinMerge`方法,并在该类上添加`@Primary`注解。
#### 5.2 数据汇聚策略
并行网关与包容网关都支持指定分支数据合并策略,目前支持的策略有:
* All(默认): 将所有分支的数据合并到一个Map中,并作为参数传递给下游节点。需要注意的是,如果key相同的情况下,后到达的分支数据会覆盖之前到达的分支数据。
* None: 不进行数据合并,使用分支fork时的数据作为参数传递给下游节点。
* Custom: 自定义策略,需继承`com.didiglobal.turbo.plugin.executor.DataMergeCustom`类,重写`merge`方法,并在该类上添加`@Primary`注解。
#### 5.3 并行网关节点示例
```java
{
ParallelGateway parallelGateway = new ParallelGateway();
// 设置节点key, 节点唯一标识
parallelGateway.setKey("ParallelGateway_38ad233");
// 设置节点类型, 默认为9
parallelGateway.setType(ExtendFlowElementType.PARALLEL_GATEWAY);

List<String> egIncomings = new ArrayList<>();
egIncomings.add("SequenceFlow_2gugjee");
parallelGateway.setIncoming(egIncomings);

// 设置多个分支出口
List<String> egOutgoings = new ArrayList<>();
egOutgoings.add("SequenceFlow_12rbl6u");
egOutgoings.add("SequenceFlow_3ih7eta");
parallelGateway.setOutgoing(egOutgoings);

Map<String, Object> properties = new HashMap<>();
Map<String, String> forkJoinMatch = new HashMap<>();
// 记录分支Fork节点
forkJoinMatch.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.FORK, "ParallelGateway_38ad233");
// 记录分支Join节点
forkJoinMatch.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.JOIN, "ParallelGateway_10lo44j");
properties.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.FORK_JOIN_MATCH, JSONArray.toJSON(forkJoinMatch));
parallelGateway.setProperties(properties);
}
```
#### 5.4 包容网关节点示例
```java
{
InclusiveGateway inclusiveGateway = new InclusiveGateway();
// 设置节点key, 节点唯一标识
inclusiveGateway.setKey("InclusiveGateway_3a1nn9f");
// 设置节点类型, 默认为10
inclusiveGateway.setType(ExtendFlowElementType.INCLUSIVE_GATEWAY);

// 多个分支入口
List<String> egIncomings = new ArrayList<>();
egIncomings.add("SequenceFlow_1h65e8t");
egIncomings.add("SequenceFlow_25kdv36");
inclusiveGateway.setIncoming(egIncomings);

List<String> egOutgoings = new ArrayList<>();
egOutgoings.add("SequenceFlow_3jkd63g");
inclusiveGateway.setOutgoing(egOutgoings);

Map<String, Object> properties = new HashMap<>();
Map<String, String> forkJoinMatch = new HashMap<>();
// 记录分支Fork节点
forkJoinMatch.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.FORK, "InclusiveGateway_1djgrgp");
// 记录分支Join节点
forkJoinMatch.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.JOIN, "InclusiveGateway_3a1nn9f");
properties.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.FORK_JOIN_MATCH, JSONArray.toJSON(forkJoinMatch));
// 设置分支汇聚策略(在汇聚节点设置)
properties.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.BRANCH_MERGE, MergeStrategy.BRANCH_MERGE.ANY_ONE);
// 设置分支数据合并策略(在汇聚节点设置)
properties.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.DATA_MERGE, MergeStrategy.DATA_MERGE.NONE);
inclusiveGateway.setProperties(properties);
}
```
201 changes: 201 additions & 0 deletions docs/PluginDevelopGuide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
## Turbo插件开发指南

### 1. **概述**
Turbo支持插件扩展功能,开发者可以通过编写插件实现自定义逻辑,而无需修改主框架代码。插件使用 SPI 机制进行加载,支持在运行时动态发现和加载。

插件目前支持应用场景包括:
- 增加新的元素节点处理能力
- 使用自定义的ID生成器
- 使用自定义的表达式计算器
---
### 2. **插件结构和要求**
#### 插件目录结构
```
src/
└── main/
└── resources/
├── plugin.properties
└── META-INF/
└── services/
├── com.didiglobal.turbo.engine.plugin.ElementPlugin
├── com.didiglobal.turbo.engine.plugin.ExpressionCalculatorPlugin
└── com.didiglobal.turbo.engine.plugin.IdGeneratorPlugin
```
#### 根据插件类型需要实现的接口
**插件顶层接口**
```java
public interface Plugin {
// turbo插件开关配置格式建议统一为turbo.plugin.support.${pluginName}
String PLUGIN_SUPPORT_PREFIX = "turbo.plugin.support.";
// turbo插件初始化文件配置格式建议统一为turbo.plugin.init_sql.${pluginName}
String PLUGIN_INIT_SQL_FILE_PREFIX = "turbo.plugin.init_sql.";
/**
* 插件名称,唯一标识
*/
String getName();
/**
* 插件开关
*/
Boolean support();
/**
* 插件初始化
*/
Boolean init();
}
```
- **ElementPlugin**:实现该接口,扩展新的元素节点处理能力。
```java
public interface ElementPlugin extends Plugin{
String ELEMENT_TYPE_PREFIX = "turbo.plugin.element_type.";
ElementExecutor getElementExecutor();
ElementValidator getElementValidator();
Integer getFlowElementType();
}
```
- **ExpressionCalculatorPlugin**:实现该接口,使用自定义的表达式计算器。
```java
public interface ExpressionCalculatorPlugin extends Plugin{
ExpressionCalculator getExpressionCalculator();
}
```
- **IdGeneratorPlugin**:实现该接口,使用自定义的ID生成器。
```java
public interface IdGeneratorPlugin extends Plugin{
IdGenerator getIdGenerator();
}
```
---
### 3. **开发流程**
- 创建项目(可以用 Maven/Gradle 或直接在现有项目中新增模块)。
- 实现插件接口或继承插件基类。
- 编写配置文件,声明插件初始化等信息。
- 测试插件功能。

#### 步骤 1:创建插件项目
使用 Maven 构建插件项目:
```shell
mvn archetype:generate -DgroupId=com.example.plugin -DartifactId=MyPlugin
```

#### 步骤 2:实现插件功能
示例:
```java
public class MyPlugin implements IdGeneratorPlugin {
@Override
public String getName() {
return "MyPluginName";
}
@Override
public Boolean support() {
return true;
}
@Override
public Boolean init() {
System.out.println("MyPlugin initialized");
return true;
}
@Override
public IdGenerator getIdGenerator() {
return new MyDefinedIdGenerator();
}
}
```

#### 步骤 3:添加配置文件,指定加载插件类
`src/main/resources/plugin.properties` 中定义插件必要信息,如初始化脚本文件路径等:
```
turbo.plugin.init_sql.ParallelGatewayElementPlugin=sql/parallelGateway.sql
```
`src/main/resources/META-INF/services/` 文件夹下创建扩展插件类型接口对应全路径路名文件,并指定插件实现类:

创建 `src/main/resources/META-INF/services/com.didiglobal.turbo.engine.plugin.ElementPlugin` 文件,并写入:
```
com.didiglobal.turbo.plugin.ParallelGatewayElementPlugin
```
#### 步骤 4:测试插件功能
- 初始化测试:
- 检查插件的初始化逻辑是否正确执行。
- 验证插件是否能正确加载配置(如 plugin.properties 或其他配置文件)。
- 功能点测试:
- 调用插件的主要功能方法,验证输出是否符合预期。
- 如果插件涉及外部接口或服务,检查是否能正常连接并获取数据。
---
### 4. **插件加载机制**
#### 插件发现与加载
主应用会在初始化时通过 SPI 机制自动发现并加载插件。确保以下条件满足:
- `src/main/resources/META-INF/services/`文件夹下存在对应插件类型的全路径类名文件。
- 全路径类名文件中指定了插件实现类的全路径
#### 插件启动过程
1. 通过 `ServiceLoader` 加载所有插件。
2. 调用插件的 `getName` 方法,检查是否存在插件名称冲突。
3. 调用插件的 `support` 方法,判断是否需要使用该插件。
1. 如果为元素节点插件,会调用 `getFlowElementType` 方法,判断该类型元素节点是否存在冲突。
4. 调用插件的 `init` 方法,进行初始化操作。
---
### 5. **插件DAO扩展**
Turbo为维持DAO层的简洁,未提供直接在DAO层的扩展能力。为解决部分插件需要在原有DAO层进行扩展,Turbo提供通过Mybatis拦截器的方式进行扩展。
#### 步骤 1:实现`CustomOperationHandler`接口
示例:
```java
public class ParallelNodeInstanceHandler implements CustomOperationHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ParallelNodeInstanceHandler.class);
@Override
public void handle(SqlCommandType commandType, MappedStatement mappedStatement, Object parameterObject, Object originalResult, SqlSessionFactory sqlSessionFactory) {
SqlSession sqlSession = sqlSessionFactory.openSession();
try {
ParallelNodeInstanceMapper mapper = sqlSession.getMapper(ParallelNodeInstanceMapper.class);
switch (commandType) {
case INSERT:
handleInsert(parameterObject, mapper);
break;
case UPDATE:
handleUpdate(parameterObject, mapper);
break;
case DELETE:
handleDelete(parameterObject, mapper);
break;
case SELECT:
handleSelect(originalResult, mapper);
break;
default:
LOGGER.warn("Unhandled command type: {}", commandType);
break;
}
} catch (Exception e) {
LOGGER.error("Exception occurred during handling. CommandType={} | ParameterObject={} | OriginalResult={}",
commandType, parameterObject, originalResult, e);
} finally {
sqlSession.close();
}
}
}
```
#### 步骤 2:注册自定义操作处理器,并指定处理的PO类型
示例:
```java
@Configuration
@ComponentScan("com.didiglobal.turbo.plugin")
@MapperScan("com.didiglobal.turbo.plugin.dao")
@EnableAutoConfiguration(exclude = {DruidDataSourceAutoConfigure.class})
public class ParallelPluginConfig {

@PostConstruct
public void init() {
CustomOperationHandlerRegistry.registerHandler(EntityPOEnum.NODE_INSTANCE, new ParallelNodeInstanceHandler());
CustomOperationHandlerRegistry.registerHandler(EntityPOEnum.NODE_INSTANCE_LOG, new ParallelNodeInstanceLogHandler());
}
}
```
---
### 6. **示例插件**
[并行网关插件](../parallel-plugin/src/main/java/com/didiglobal/turbo/plugin/ParallelGatewayElementPlugin.java)

---
### 7. **插件相关配置**
以下是我们希望插件开发者遵循的一些配置项规范

| 配置项 | 配置名称 | 示例 | 配置说明 |
|----------------------------------|------------|----------------------------------------------------------------------------|--------------------------------------|
|turbo.plugin.support.${pluginName}| 插件开关配置 | turbo.plugin.support.ParallelGatewayElementPlugin=false | 用于控制support方法的返回值,默认返回true |
|turbo.plugin.init_sql.${pluginName}| 数据库初始化脚本路径 | turbo.plugin.init_sql.ParallelGatewayElementPlugin=sql/parallelGateway.sql | 用于指定初始化脚本位置,这个脚本应该是幂等的 |
|turbo.plugin.element_type.${pluginName}| 元素节点类型 | turbo.plugin.element_type.ParallelGatewayElementPlugin=9 | 支持插件使用方自己去指定元素节点类型,避免多个插件使用相同的元素类型导致冲突 |
Loading