Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/en/seatunnel-engine/hybrid-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,24 @@ but setting it too high can introduce additional overhead in distribution and me
The partition count should be configured before starting a job.
Changing the partition count after a job has started may result in metric key mismatches, so it is recommended to restart Seatunnel after modifying this option.

### 4.9 Physical DAG Enable (This parameter is invalid on the Worker node)

The configuration option `physical-dag-enabled` determines which type of DAG information (physical or logical) is displayed to the user for visualization.

- Default: `true` (physical DAG info is displayed)
- Usage: Set this to `false` to display the logical DAG info (the structure as defined by the user) instead of the physical DAG info.

Example:

```yaml
seatunnel:
engine:
physical-dag-enabled: false
```

> **Note:** This parameter only affects which DAG info is exposed for display.
> It does **not** change the execution plan, which is always based on the physical DAG.

## 5. Configure The SeaTunnel Engine Network Service

All SeaTunnel Engine network-related configurations are in the `hazelcast.yaml` file.
Expand Down
18 changes: 18 additions & 0 deletions docs/en/seatunnel-engine/separated-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,24 @@ but setting it too high can introduce additional overhead in distribution and me
The partition count should be configured before starting a job.
Changing the partition count after a job has started may result in metric key mismatches, so it is recommended to restart Seatunnel after modifying this option.

### 4.10 Physical DAG Enable (This parameter is invalid on the Worker node)

The configuration option `physical-dag-enabled` determines which type of DAG information (physical or logical) is displayed to the user for visualization.

- Default: `true` (physical DAG info is displayed)
- Usage: Set this to `false` to display the logical DAG info (the structure as defined by the user) instead of the physical DAG info.

Example:

```yaml
seatunnel:
engine:
physical-dag-enabled: false
```

> **Note:** This parameter only affects which DAG info is exposed for display.
> It does **not** change the execution plan, which is always based on the physical DAG.

## 5. Configuring SeaTunnel Engine Network Services

All network-related configurations of the SeaTunnel Engine are in the `hazelcast-master.yaml` and `hazelcast-worker.yaml` files.
Expand Down
17 changes: 17 additions & 0 deletions docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,23 @@ job-metrics-partition-count: 4
在高并发竞争的情况下,增加分区数量可能会提高并行度;但如果设置过大,会引入额外的分布与合并开销,从而降低整体性能。
分区数量应在作业启动前进行配置。如果在作业已启动后更改,可能导致指标键不匹配,因此建议在修改此选项后重启 SeaTunnel。

### 4.9 启用物理 DAG(该参数在 Worker 节点无效)

配置项 `physical-dag-enabled` 用于决定在可视化时向用户展示哪种类型的 DAG 信息(物理 DAG 或逻辑 DAG)。

- 默认值:`true`(显示物理 DAG 信息)
- 使用方法:将其设置为 `false`,则会展示逻辑 DAG 信息(即用户定义的 DAG 结构),而不是物理 DAG 信息。

示例:

```yaml
seatunnel:
engine:
physical-dag-enabled: false
```

> **注意:** 该参数仅影响对外展示的 DAG 信息类型,**不会影响实际执行计划,作业始终基于物理 DAG 运行**。

## 5. 配置 SeaTunnel Engine 网络服务

所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast.yaml` 文件中.
Expand Down
17 changes: 17 additions & 0 deletions docs/zh/seatunnel-engine/separated-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,23 @@ job-metrics-partition-count: 4
在高并发竞争的情况下,增加分区数量可能会提高并行度;但如果设置过大,会引入额外的分布与合并开销,从而降低整体性能。
分区数量应在作业启动前进行配置。如果在作业已启动后更改,可能导致指标键不匹配,因此建议在修改此选项后重启 SeaTunnel。

### 4.10 启用物理 DAG(该参数在 Worker 节点无效)

配置项 `physical-dag-enabled` 用于决定在可视化时向用户展示哪种类型的 DAG 信息(物理 DAG 或逻辑 DAG)。

- 默认值:`true`(显示物理 DAG 信息)
- 使用方法:将其设置为 `false`,则会展示逻辑 DAG 信息(即用户定义的 DAG 结构),而不是物理 DAG 信息。

示例:

```yaml
seatunnel:
engine:
physical-dag-enabled: false
```

> **注意:** 该参数仅影响对外展示的 DAG 信息类型,**不会影响实际执行计划,作业始终基于物理 DAG 运行**。

## 5. 配置 SeaTunnel Engine 网络服务

所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast-master.yaml`和`hazelcast-worker.yaml` 文件中.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class EngineConfig {
ServerConfigOptions.MasterServerConfigOptions.JOB_METRICS_PARTITION_COUNT
.defaultValue();

private boolean physicalDAGEnabled =
ServerConfigOptions.MasterServerConfigOptions.PHYSICAL_DAG_ENABLED.defaultValue();

private ThreadShareMode taskExecutionThreadShareMode =
ServerConfigOptions.WorkerServerConfigOptions.TASK_EXECUTION_THREAD_SHARE_MODE
.defaultValue();
Expand Down Expand Up @@ -140,6 +143,10 @@ public void setJobMetricsPartitionCount(int jobMetricsPartitionCount) {
this.jobMetricsPartitionCount = jobMetricsPartitionCount;
}

public void setPhysicalDAGEnabled(boolean physicalDAGEnabled) {
this.physicalDAGEnabled = physicalDAGEnabled;
}

public void setTaskExecutionThreadShareMode(ThreadShareMode taskExecutionThreadShareMode) {
checkNotNull(queueType);
this.taskExecutionThreadShareMode = taskExecutionThreadShareMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) {
.JOB_METRICS_PARTITION_COUNT
.key(),
getTextContent(node)));
} else if (ServerConfigOptions.MasterServerConfigOptions.PHYSICAL_DAG_ENABLED
.key()
.equals(name)) {
engineConfig.setPhysicalDAGEnabled(getBooleanValue(getTextContent(node)));
} else if (ServerConfigOptions.WorkerServerConfigOptions
.TASK_EXECUTION_THREAD_SHARE_MODE
.key()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ public static class MasterServerConfigOptions {
.intType()
.defaultValue(1)
.withDescription("Number of partitions for storing job metrics in IMap.");

public static final Option<Boolean> PHYSICAL_DAG_ENABLED =
Options.key("physical-dag-enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to display the physical DAG execution plan. "
+ "If set to false, the system will display the logical DAG.");
/////////////////////////////////////////////////
// The options about Hazelcast IMAP store start
public static final Option<Integer> BACKUP_COUNT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.dag.actions.Action;
Expand Down Expand Up @@ -53,6 +54,7 @@

@Slf4j
public class DAGUtils {
private DAGUtils() {}

public static LogicalDag restoreLogicalDag(
JobImmutableInformation jobImmutableInformation,
Expand Down Expand Up @@ -173,12 +175,19 @@ public static JobDAGInfo getJobDAGInfo(
return pipelines.stream()
.filter(
p ->
p.getActions()
.containsKey(
p.getActionsId()
.contains(
info.getAction()
.getId()))
.findFirst()
.get()
.orElseThrow(
() ->
CommonError.illegalArgument(
pipelines
.toString(),
"Can't find pipeline for vertex "
+ info.getAction()
.getId()))
.getId();
},
Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;

import lombok.Data;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

@Data
public class Pipeline {

/** The ID of the pipeline. */
Expand All @@ -39,23 +43,15 @@ public class Pipeline {
this.vertexes = vertexes;
}

public Integer getId() {
return id;
}

public List<ExecutionEdge> getEdges() {
return edges;
}

public Map<Long, ExecutionVertex> getVertexes() {
return vertexes;
}

public Map<ActionStateKey, Integer> getActions() {
return vertexes.values().stream()
.map(ExecutionVertex::getAction)
.collect(
Collectors.toMap(
action -> ActionStateKey.of(action), Action::getParallelism));
.collect(Collectors.toMap(ActionStateKey::of, Action::getParallelism));
}

public Set<Long> getActionsId() {
return vertexes.values().stream()
.map(v -> v.getAction().getId())
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ public class JobMaster {

private final IMap<Object, Object> runningJobStateTimestampsIMap;

// TODO add config to change value
private boolean isPhysicalDAGInfo = true;
private final boolean isPhysicalDAGInfo;

private final EngineConfig engineConfig;

Expand Down Expand Up @@ -204,6 +203,7 @@ public JobMaster(
this.jobHistoryService = jobHistoryService;
this.runningJobStateIMap = runningJobStateIMap;
this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
this.isPhysicalDAGInfo = engineConfig.isPhysicalDAGEnabled();
this.runningJobInfoIMap = runningJobInfoIMap;
this.engineConfig = engineConfig;
this.seaTunnelServer = seaTunnelServer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
Expand Down Expand Up @@ -336,6 +337,63 @@ void testCleanupPendingJobMasterMapWhenJobSubmitFutureIsExceptionally() {
.contains(jobInformation.jobId)));
}

@Test
void testLogicalDAGConfig() {
setConfigFile("seatunnel_logical_dag.yaml");
JobInformation jobInformation1 =
submitJob(
"test_logical_dag_config",
"batch_fake_to_inmemory.conf",
"test_logical_dag_config_1");
CoordinatorService coordinatorService1 = jobInformation1.coordinatorService;
await().atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertEquals(
JobStatus.RUNNING,
coordinatorService1.getJobStatus(jobInformation1.jobId));
JobMaster jobMaster =
coordinatorService1.getJobMaster(jobInformation1.jobId);
Assertions.assertNotNull(jobMaster);
Assertions.assertTrue(
jobMaster
.getRunningJobStateIMap()
.containsKey(jobInformation1.jobId));
});

JobDAGInfo jobInfo1 = coordinatorService1.getJobInfo(jobInformation1.jobId);

coordinatorService1.clearCoordinatorService();
jobInformation1.coordinatorServiceTest.shutdown();
setDefaultConfigFile();

JobInformation jobInformation2 =
submitJob(
"test_logical_dag_config",
"batch_fake_to_inmemory.conf",
"test_logical_dag_config_2");
CoordinatorService coordinatorService2 = jobInformation2.coordinatorService;
await().atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertEquals(
JobStatus.RUNNING,
coordinatorService2.getJobStatus(jobInformation2.jobId));
JobMaster jobMaster =
coordinatorService2.getJobMaster(jobInformation2.jobId);
Assertions.assertNotNull(jobMaster);
Assertions.assertTrue(
jobMaster
.getRunningJobStateIMap()
.containsKey(jobInformation2.jobId));
});
JobDAGInfo jobInfo2 = coordinatorService2.getJobInfo(jobInformation2.jobId);
Assertions.assertNotEquals(jobInfo1.getPipelineEdges(), jobInfo2.getPipelineEdges());

coordinatorService2.clearCoordinatorService();
jobInformation2.coordinatorServiceTest.shutdown();
}

private void setDefaultConfigFile() {
setConfigFile("seatunnel.yaml");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

seatunnel:
engine:
physical-dag-enabled: false
backup-count: 1
print-execution-info-interval: 10
slot-service:
dynamic-slot: true
slot-num: 5
checkpoint:
interval: 6000
timeout: 7000
storage:
type: hdfs
max-retained: 3
plugin-config:
namespace: /tmp/seatunnel/checkpoint_snapshot
storage.type: hdfs
fs.defaultFS: file:/// # Ensure that the directory has written permission