Skip to content

Commit

Permalink
Break success process if job is stopping (#28476)
Browse files Browse the repository at this point in the history
* Break success process if job is stopping

* Add run_count in job props

* Update IncrementalExecuteCallback

* Update unit test

* Remove log
  • Loading branch information
sandynz authored Sep 20, 2023
1 parent 6fa387a commit 3b19fe7
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,11 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* Pipeline data source sink.
Expand Down Expand Up @@ -274,10 +272,7 @@ private void executeBatchDelete(final Connection connection, final List<DataReco
}
preparedStatement.addBatch();
}
int[] counts = preparedStatement.executeBatch();
if (IntStream.of(counts).anyMatch(value -> 1 != value)) {
log.warn("batchDelete failed, counts={}, sql={}, dataRecords={}", Arrays.toString(counts), deleteSQL, dataRecords);
}
preparedStatement.executeBatch();
} finally {
batchDeleteStatement.set(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ protected JobConfigurationPOJO convertJobConfiguration(final PipelineJobConfigur
String createTimeFormat = LocalDateTime.now().format(DATE_TIME_FORMATTER);
result.getProps().setProperty("create_time", createTimeFormat);
result.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
result.getProps().setProperty("run_count", "1");
result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
return result;
}
Expand All @@ -125,6 +126,7 @@ public void startDisabledJob(final String jobId) {
jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
jobConfigPOJO.getProps().remove("stop_time");
jobConfigPOJO.getProps().remove("stop_time_millis");
jobConfigPOJO.getProps().setProperty("run_count", String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count", "0")) + 1));
String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
pipelineDistributedBarrier.register(barrierEnablePath, jobConfigPOJO.getShardingTotalCount());
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
Expand Down Expand Up @@ -154,7 +156,13 @@ protected void dropJob(final String jobId) {
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).deleteJob(jobId);
}

protected final JobConfigurationPOJO getElasticJobConfigPOJO(final String jobId) {
/**
* Get ElasticJob configuration POJO.
*
* @param jobId job id
* @return ElasticJob configuration POJO
*/
public final JobConfigurationPOJO getElasticJobConfigPOJO(final String jobId) {
JobConfigurationPOJO result = PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobConfiguration(jobId);
ShardingSpherePreconditions.checkNotNull(result, () -> new PipelineJobNotFoundException(jobId));
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ private final class InventoryTaskExecuteCallback implements ExecuteCallback {

@Override
public void onSuccess() {
if (jobItemContext.isStopping()) {
log.info("Inventory task onSuccess, stopping true, ignore");
return;
}
inventorySuccessCallback();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ private final class CDCExecuteCallback implements ExecuteCallback {

@Override
public void onSuccess() {
if (jobItemContext.isStopping()) {
log.info("onSuccess, stopping true, ignore");
return;
}
log.info("onSuccess, all {} tasks finished.", identifier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,11 @@ protected void runBlocking() {
parentJobConfig, jobAPI.buildPipelineProcessContext(parentJobConfig), jobItemContext.getProgressContext());
consistencyChecker.set(checker);
Map<String, TableDataConsistencyCheckResult> checkResultMap = checker.check(checkJobConfig.getAlgorithmTypeName(), checkJobConfig.getAlgorithmProps());
log.info("job {} with check algorithm '{}' data consistency checker result: {}", parentJobId, checkJobConfig.getAlgorithmTypeName(), checkResultMap);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).persistCheckJobResult(parentJobId, checkJobId, checkResultMap);
log.info("job {} with check algorithm '{}' data consistency checker result: {}, stopping: {}",
parentJobId, checkJobConfig.getAlgorithmTypeName(), checkResultMap, jobItemContext.isStopping());
if (!jobItemContext.isStopping()) {
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).persistCheckJobResult(parentJobId, checkJobId, checkResultMap);
}
} finally {
jobItemContext.getProgressContext().setCheckEndTimeMillis(System.currentTimeMillis());
}
Expand All @@ -121,6 +124,10 @@ private final class CheckExecuteCallback implements ExecuteCallback {

@Override
public void onSuccess() {
if (jobItemContext.isStopping()) {
log.info("onSuccess, stopping true, ignore");
return;
}
log.info("onSuccess, check job id: {}, parent job id: {}", checkJobId, parentJobId);
jobItemContext.setStatus(JobStatus.FINISHED);
checkJobAPI.persistJobItemProgress(jobItemContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ void assertCountAndDataCheck() throws SQLException {
governanceRepositoryAPI.persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(), 0, "");
Map<String, TableDataConsistencyCheckResult> actual = new MigrationDataConsistencyChecker(jobConfig, new MigrationProcessContext(jobConfig.getJobId(), null),
createConsistencyCheckJobItemProgressContext()).check("FIXTURE", null);
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE", null);
String checkKey = "t_order";
assertTrue(actual.get(checkKey).getCountCheckResult().isMatched());
assertThat(actual.get(checkKey).getCountCheckResult().getSourceRecordsCount(), is(actual.get(checkKey).getCountCheckResult().getTargetRecordsCount()));
assertTrue(actual.get(checkKey).getContentCheckResult().isMatched());
}

private ConsistencyCheckJobItemProgressContext createConsistencyCheckJobItemProgressContext() {
return new ConsistencyCheckJobItemProgressContext("", 0, "H2");
private ConsistencyCheckJobItemProgressContext createConsistencyCheckJobItemProgressContext(final String jobId) {
return new ConsistencyCheckJobItemProgressContext(jobId, 0, "H2");
}

private MigrationJobConfiguration createJobConfiguration() throws SQLException {
Expand Down

0 comments on commit 3b19fe7

Please sign in to comment.