From dc9d8bef0e5696e38b9dec665fb4a39cd29c1062 Mon Sep 17 00:00:00 2001 From: Xinze Guo <101622833+azexcy@users.noreply.github.com> Date: Mon, 20 Nov 2023 19:52:14 +0800 Subject: [PATCH] Improve stop_time display at show streaming list of CDC (#29090) * Improve CDC job stop_time display * Notify client when job close --- .../shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java | 2 ++ .../data/pipeline/cdc/core/importer/sink/CDCSocketSink.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 0e41d2f8cac04..c6cd66526876b 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -85,6 +85,7 @@ import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine; import java.sql.SQLException; +import java.time.LocalDateTime; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -223,6 +224,7 @@ public void updateJobConfigurationDisabled(final String jobId, final boolean dis JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); jobConfigPOJO.setDisabled(disabled); if (disabled) { + jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(PipelineJobConfiguration.DATE_TIME_FORMATTER)); jobConfigPOJO.getProps().setProperty("stop_time_millis", String.valueOf(System.currentTimeMillis())); } else { jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis())); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java index d45c14b4b8c58..9c84a6a9112ae 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java @@ -29,6 +29,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtils; import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter; import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; +import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; import java.io.IOException; @@ -117,5 +118,6 @@ private void doAwait() { @Override public void close() throws IOException { + channel.writeAndFlush(CDCResponseUtils.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), "The socket channel is closed.")); } }