diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java index 513788cf60..619027f1b6 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java @@ -177,8 +177,8 @@ public SqlExplainResult explain(JobStatement jobStatement) { @Override public StreamGraph getStreamGraph(JobStatement jobStatement) { + statements.add(jobStatement); if (!jobStatement.isFinalExecutableStatement()) { - statements.add(jobStatement); return null; } if (!statements.isEmpty()) { @@ -189,8 +189,8 @@ public StreamGraph getStreamGraph(JobStatement jobStatement) { @Override public JobPlanInfo getJobPlanInfo(JobStatement jobStatement) { + statements.add(jobStatement); if (!jobStatement.isFinalExecutableStatement()) { - statements.add(jobStatement); return null; } if (!statements.isEmpty()) { diff --git a/dinky-core/src/test/java/org/dinky/job/JobManagerTest.java b/dinky-core/src/test/java/org/dinky/job/JobManagerTest.java index 8b69ccfdc6..ace3b28d94 100644 --- a/dinky-core/src/test/java/org/dinky/job/JobManagerTest.java +++ b/dinky-core/src/test/java/org/dinky/job/JobManagerTest.java @@ -82,18 +82,21 @@ void initLocalBatchPlanEnvironment() { @Test void testExplainSql() throws Exception { checkExplainStreamSqlFromFile("flink/sql/statement-set-stream.sql", 16); + checkExplainStreamSqlFromFile("flink/sql/variable.sql", 3); checkExplainBatchSqlFromFile("flink/sql/statement-set-batch.sql", 16); } @Test void testGetStreamGraph() throws Exception { checkGetStreamGraphFromFile("flink/sql/statement-set-stream.sql"); + checkGetStreamGraphFromFile("flink/sql/variable.sql"); checkGetBatchStreamGraphFromFile("flink/sql/statement-set-batch.sql"); } @Test void testGetJobPlanJson() throws Exception { checkGetStreamJobPlanJsonFromFile("flink/sql/statement-set-stream.sql"); + checkGetStreamJobPlanJsonFromFile("flink/sql/variable.sql"); checkGetBatchJobPlanJsonFromFile("flink/sql/statement-set-batch.sql"); } diff --git a/dinky-core/src/test/resources/flink/sql/variable.sql b/dinky-core/src/test/resources/flink/sql/variable.sql new file mode 100644 index 0000000000..cd4b7256ec --- /dev/null +++ b/dinky-core/src/test/resources/flink/sql/variable.sql @@ -0,0 +1,27 @@ +tb:=datagen_source; +CREATE TABLE datagen_source +( + id BIGINT, + name STRING, + sex INT, + age INT +) WITH ( + 'connector' = 'datagen' + ); + +CREATE TABLE print_sink +( + id BIGINT, + name STRING, + sex INT, + age INT +) WITH ( + 'connector' = 'print' + ); + +INSERT INTO print_sink +SELECT id, + name, + sex, + age +from ${tb}; \ No newline at end of file