diff --git a/docs/en/seatunnel-engine/hybrid-cluster-deployment.md b/docs/en/seatunnel-engine/hybrid-cluster-deployment.md index c3724a30bed..9eab87d923e 100644 --- a/docs/en/seatunnel-engine/hybrid-cluster-deployment.md +++ b/docs/en/seatunnel-engine/hybrid-cluster-deployment.md @@ -209,6 +209,24 @@ but setting it too high can introduce additional overhead in distribution and me The partition count should be configured before starting a job. Changing the partition count after a job has started may result in metric key mismatches, so it is recommended to restart Seatunnel after modifying this option. +### 4.9 Physical DAG Enable (This parameter is invalid on the Worker node) + +The configuration option `physical-dag-enabled` determines which type of DAG information (physical or logical) is displayed to the user for visualization. + +- Default: `true` (physical DAG info is displayed) +- Usage: Set this to `false` to display the logical DAG info (the structure as defined by the user) instead of the physical DAG info. + +Example: + +```yaml +seatunnel: + engine: + physical-dag-enabled: false +``` + +> **Note:** This parameter only affects which DAG info is exposed for display. +> It does **not** change the execution plan, which is always based on the physical DAG. + ## 5. Configure The SeaTunnel Engine Network Service All SeaTunnel Engine network-related configurations are in the `hazelcast.yaml` file. diff --git a/docs/en/seatunnel-engine/separated-cluster-deployment.md b/docs/en/seatunnel-engine/separated-cluster-deployment.md index 798192420b0..e5323eebc31 100644 --- a/docs/en/seatunnel-engine/separated-cluster-deployment.md +++ b/docs/en/seatunnel-engine/separated-cluster-deployment.md @@ -353,6 +353,24 @@ but setting it too high can introduce additional overhead in distribution and me The partition count should be configured before starting a job. Changing the partition count after a job has started may result in metric key mismatches, so it is recommended to restart Seatunnel after modifying this option. +### 4.10 Physical DAG Enable (This parameter is invalid on the Worker node) + +The configuration option `physical-dag-enabled` determines which type of DAG information (physical or logical) is displayed to the user for visualization. + +- Default: `true` (physical DAG info is displayed) +- Usage: Set this to `false` to display the logical DAG info (the structure as defined by the user) instead of the physical DAG info. + +Example: + +```yaml +seatunnel: + engine: + physical-dag-enabled: false +``` + +> **Note:** This parameter only affects which DAG info is exposed for display. +> It does **not** change the execution plan, which is always based on the physical DAG. + ## 5. Configuring SeaTunnel Engine Network Services All network-related configurations of the SeaTunnel Engine are in the `hazelcast-master.yaml` and `hazelcast-worker.yaml` files. diff --git a/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md b/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md index 22f28ab7299..c1c35a94725 100644 --- a/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md +++ b/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md @@ -205,6 +205,23 @@ job-metrics-partition-count: 4 在高并发竞争的情况下,增加分区数量可能会提高并行度;但如果设置过大,会引入额外的分布与合并开销,从而降低整体性能。 分区数量应在作业启动前进行配置。如果在作业已启动后更改,可能导致指标键不匹配,因此建议在修改此选项后重启 SeaTunnel。 +### 4.9 启用物理 DAG(该参数在 Worker 节点无效) + +配置项 `physical-dag-enabled` 用于决定在可视化时向用户展示哪种类型的 DAG 信息(物理 DAG 或逻辑 DAG)。 + +- 默认值:`true`(显示物理 DAG 信息) +- 使用方法:将其设置为 `false`,则会展示逻辑 DAG 信息(即用户定义的 DAG 结构),而不是物理 DAG 信息。 + +示例: + +```yaml +seatunnel: + engine: + physical-dag-enabled: false +``` + +> **注意:** 该参数仅影响对外展示的 DAG 信息类型,**不会影响实际执行计划,作业始终基于物理 DAG 运行**。 + ## 5. 配置 SeaTunnel Engine 网络服务 所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast.yaml` 文件中. diff --git a/docs/zh/seatunnel-engine/separated-cluster-deployment.md b/docs/zh/seatunnel-engine/separated-cluster-deployment.md index bbc96c68348..8151275ed06 100644 --- a/docs/zh/seatunnel-engine/separated-cluster-deployment.md +++ b/docs/zh/seatunnel-engine/separated-cluster-deployment.md @@ -353,6 +353,23 @@ job-metrics-partition-count: 4 在高并发竞争的情况下,增加分区数量可能会提高并行度;但如果设置过大,会引入额外的分布与合并开销,从而降低整体性能。 分区数量应在作业启动前进行配置。如果在作业已启动后更改,可能导致指标键不匹配,因此建议在修改此选项后重启 SeaTunnel。 +### 4.10 启用物理 DAG(该参数在 Worker 节点无效) + +配置项 `physical-dag-enabled` 用于决定在可视化时向用户展示哪种类型的 DAG 信息(物理 DAG 或逻辑 DAG)。 + +- 默认值:`true`(显示物理 DAG 信息) +- 使用方法:将其设置为 `false`,则会展示逻辑 DAG 信息(即用户定义的 DAG 结构),而不是物理 DAG 信息。 + +示例: + +```yaml +seatunnel: + engine: + physical-dag-enabled: false +``` + +> **注意:** 该参数仅影响对外展示的 DAG 信息类型,**不会影响实际执行计划,作业始终基于物理 DAG 运行**。 + ## 5. 配置 SeaTunnel Engine 网络服务 所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast-master.yaml`和`hazelcast-worker.yaml` 文件中. diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java index 79a4d0ace0d..7b5f5867927 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java @@ -59,6 +59,9 @@ public class EngineConfig { ServerConfigOptions.MasterServerConfigOptions.JOB_METRICS_PARTITION_COUNT .defaultValue(); + private boolean physicalDAGEnabled = + ServerConfigOptions.MasterServerConfigOptions.PHYSICAL_DAG_ENABLED.defaultValue(); + private ThreadShareMode taskExecutionThreadShareMode = ServerConfigOptions.WorkerServerConfigOptions.TASK_EXECUTION_THREAD_SHARE_MODE .defaultValue(); @@ -140,6 +143,10 @@ public void setJobMetricsPartitionCount(int jobMetricsPartitionCount) { this.jobMetricsPartitionCount = jobMetricsPartitionCount; } + public void setPhysicalDAGEnabled(boolean physicalDAGEnabled) { + this.physicalDAGEnabled = physicalDAGEnabled; + } + public void setTaskExecutionThreadShareMode(ThreadShareMode taskExecutionThreadShareMode) { checkNotNull(queueType); this.taskExecutionThreadShareMode = taskExecutionThreadShareMode; diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java index 2794862a5ac..a39acdef468 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java @@ -188,6 +188,10 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) { .JOB_METRICS_PARTITION_COUNT .key(), getTextContent(node))); + } else if (ServerConfigOptions.MasterServerConfigOptions.PHYSICAL_DAG_ENABLED + .key() + .equals(name)) { + engineConfig.setPhysicalDAGEnabled(getBooleanValue(getTextContent(node))); } else if (ServerConfigOptions.WorkerServerConfigOptions .TASK_EXECUTION_THREAD_SHARE_MODE .key() diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index 631267c7e30..29579d2b0f3 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -96,6 +96,14 @@ public static class MasterServerConfigOptions { .intType() .defaultValue(1) .withDescription("Number of partitions for storing job metrics in IMap."); + + public static final Option PHYSICAL_DAG_ENABLED = + Options.key("physical-dag-enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to display the physical DAG execution plan. " + + "If set to false, the system will display the logical DAG."); ///////////////////////////////////////////////// // The options about Hazelcast IMAP store start public static final Option BACKUP_COUNT = diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java index 96b307edc67..68f13b5ebdb 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.common.exception.CommonError; import org.apache.seatunnel.engine.common.config.EngineConfig; import org.apache.seatunnel.engine.core.classloader.ClassLoaderService; import org.apache.seatunnel.engine.core.dag.actions.Action; @@ -53,6 +54,7 @@ @Slf4j public class DAGUtils { + private DAGUtils() {} public static LogicalDag restoreLogicalDag( JobImmutableInformation jobImmutableInformation, @@ -173,12 +175,19 @@ public static JobDAGInfo getJobDAGInfo( return pipelines.stream() .filter( p -> - p.getActions() - .containsKey( + p.getActionsId() + .contains( info.getAction() .getId())) .findFirst() - .get() + .orElseThrow( + () -> + CommonError.illegalArgument( + pipelines + .toString(), + "Can't find pipeline for vertex " + + info.getAction() + .getId())) .getId(); }, Collectors.toList())); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java index e482a190e37..7fcee05b696 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java @@ -20,10 +20,14 @@ import org.apache.seatunnel.engine.core.dag.actions.Action; import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey; +import lombok.Data; + import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; +@Data public class Pipeline { /** The ID of the pipeline. */ @@ -39,23 +43,15 @@ public class Pipeline { this.vertexes = vertexes; } - public Integer getId() { - return id; - } - - public List getEdges() { - return edges; - } - - public Map getVertexes() { - return vertexes; - } - public Map getActions() { return vertexes.values().stream() .map(ExecutionVertex::getAction) - .collect( - Collectors.toMap( - action -> ActionStateKey.of(action), Action::getParallelism)); + .collect(Collectors.toMap(ActionStateKey::of, Action::getParallelism)); + } + + public Set getActionsId() { + return vertexes.values().stream() + .map(v -> v.getAction().getId()) + .collect(Collectors.toSet()); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 816b571b792..19b42407f9f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -150,8 +150,7 @@ public class JobMaster { private final IMap runningJobStateTimestampsIMap; - // TODO add config to change value - private boolean isPhysicalDAGInfo = true; + private final boolean isPhysicalDAGInfo; private final EngineConfig engineConfig; @@ -204,6 +203,7 @@ public JobMaster( this.jobHistoryService = jobHistoryService; this.runningJobStateIMap = runningJobStateIMap; this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap; + this.isPhysicalDAGInfo = engineConfig.isPhysicalDAGEnabled(); this.runningJobInfoIMap = runningJobInfoIMap; this.engineConfig = engineConfig; this.seaTunnelServer = seaTunnelServer; diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java index dfe16f2dea1..ae6297c7a54 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.common.job.JobStatus; import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture; import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; +import org.apache.seatunnel.engine.core.job.JobDAGInfo; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.core.job.PipelineStatus; import org.apache.seatunnel.engine.server.execution.TaskLocation; @@ -336,6 +337,63 @@ void testCleanupPendingJobMasterMapWhenJobSubmitFutureIsExceptionally() { .contains(jobInformation.jobId))); } + @Test + void testLogicalDAGConfig() { + setConfigFile("seatunnel_logical_dag.yaml"); + JobInformation jobInformation1 = + submitJob( + "test_logical_dag_config", + "batch_fake_to_inmemory.conf", + "test_logical_dag_config_1"); + CoordinatorService coordinatorService1 = jobInformation1.coordinatorService; + await().atMost(10000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertEquals( + JobStatus.RUNNING, + coordinatorService1.getJobStatus(jobInformation1.jobId)); + JobMaster jobMaster = + coordinatorService1.getJobMaster(jobInformation1.jobId); + Assertions.assertNotNull(jobMaster); + Assertions.assertTrue( + jobMaster + .getRunningJobStateIMap() + .containsKey(jobInformation1.jobId)); + }); + + JobDAGInfo jobInfo1 = coordinatorService1.getJobInfo(jobInformation1.jobId); + + coordinatorService1.clearCoordinatorService(); + jobInformation1.coordinatorServiceTest.shutdown(); + setDefaultConfigFile(); + + JobInformation jobInformation2 = + submitJob( + "test_logical_dag_config", + "batch_fake_to_inmemory.conf", + "test_logical_dag_config_2"); + CoordinatorService coordinatorService2 = jobInformation2.coordinatorService; + await().atMost(10000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertEquals( + JobStatus.RUNNING, + coordinatorService2.getJobStatus(jobInformation2.jobId)); + JobMaster jobMaster = + coordinatorService2.getJobMaster(jobInformation2.jobId); + Assertions.assertNotNull(jobMaster); + Assertions.assertTrue( + jobMaster + .getRunningJobStateIMap() + .containsKey(jobInformation2.jobId)); + }); + JobDAGInfo jobInfo2 = coordinatorService2.getJobInfo(jobInformation2.jobId); + Assertions.assertNotEquals(jobInfo1.getPipelineEdges(), jobInfo2.getPipelineEdges()); + + coordinatorService2.clearCoordinatorService(); + jobInformation2.coordinatorServiceTest.shutdown(); + } + private void setDefaultConfigFile() { setConfigFile("seatunnel.yaml"); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel_logical_dag.yaml b/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel_logical_dag.yaml new file mode 100644 index 00000000000..70c2cc8ef63 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel_logical_dag.yaml @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +seatunnel: + engine: + physical-dag-enabled: false + backup-count: 1 + print-execution-info-interval: 10 + slot-service: + dynamic-slot: true + slot-num: 5 + checkpoint: + interval: 6000 + timeout: 7000 + storage: + type: hdfs + max-retained: 3 + plugin-config: + namespace: /tmp/seatunnel/checkpoint_snapshot + storage.type: hdfs + fs.defaultFS: file:/// # Ensure that the directory has written permission \ No newline at end of file