Skip to content

Commit

Permalink
Improve stop_time display at show streaming list of CDC (#29090)
Browse files Browse the repository at this point in the history
* Improve CDC job stop_time display

* Notify client when job close
  • Loading branch information
azexcy authored Nov 20, 2023
1 parent a203da9 commit dc9d8be
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;

import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -223,6 +224,7 @@ public void updateJobConfigurationDisabled(final String jobId, final boolean dis
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
jobConfigPOJO.setDisabled(disabled);
if (disabled) {
jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(PipelineJobConfiguration.DATE_TIME_FORMATTER));
jobConfigPOJO.getProps().setProperty("stop_time_millis", String.valueOf(System.currentTimeMillis()));
} else {
jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtils;
import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;

import java.io.IOException;
Expand Down Expand Up @@ -117,5 +118,6 @@ private void doAwait() {

@Override
public void close() throws IOException {
channel.writeAndFlush(CDCResponseUtils.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), "The socket channel is closed."));
}
}

0 comments on commit dc9d8be

Please sign in to comment.