Skip to content

Commit

Permalink
Use PipelineJobAPI.getPipelineJobClass() to instead of AbstractPipeli…
Browse files Browse the repository at this point in the history
…neJobAPIImpl.getJobClassName() (#29018)
  • Loading branch information
terrymanu authored Nov 12, 2023
1 parent d4379eb commit e70c639
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
Expand Down Expand Up @@ -160,6 +161,13 @@ public interface PipelineJobAPI extends TypedSPI {
*/
void cleanJobItemErrorMessage(String jobId, int shardingItem);

/**
* Get pipeline job class.
*
* @return pipeline job class
*/
Class<? extends PipelineJob> getPipelineJobClass();

@Override
String getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,11 @@ public Optional<String> start(final PipelineJobConfiguration jobConfig) {
log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
return Optional.of(jobId);
}
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), getJobClassName());
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), getPipelineJobClass().getName());
repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(convertJobConfiguration(jobConfig)));
return Optional.of(jobId);
}

protected abstract String getJobClassName();

protected JobConfigurationPOJO convertJobConfiguration(final PipelineJobConfiguration jobConfig) {
JobConfigurationPOJO result = new JobConfigurationPOJO();
result.setJobName(jobConfig.getJobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public String createJob(final StreamDataParameter param, final CDCSinkType sinkT
if (repositoryAPI.isExisted(jobConfigKey)) {
log.warn("CDC job already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
} else {
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getJobClassName());
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getPipelineJobClass().getName());
JobConfigurationPOJO jobConfigPOJO = convertJobConfiguration(jobConfig);
jobConfigPOJO.setDisabled(true);
repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
Expand Down Expand Up @@ -345,8 +345,8 @@ public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final
}

@Override
protected String getJobClassName() {
return CDCJob.class.getName();
public Class<CDCJob> getPipelineJobClass() {
return CDCJob.class;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ public PipelineJobInfo getJobInfo(final String jobId) {
}

@Override
protected String getJobClassName() {
return ConsistencyCheckJob.class.getName();
public Class<ConsistencyCheckJob> getPipelineJobClass() {
return ConsistencyCheckJob.class;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,12 +473,12 @@ public void refreshTableMetadata(final String jobId, final String databaseName)
}

@Override
public String getType() {
return "MIGRATION";
public Class<MigrationJob> getPipelineJobClass() {
return MigrationJob.class;
}

@Override
protected String getJobClassName() {
return MigrationJob.class.getName();
public String getType() {
return "MIGRATION";
}
}

0 comments on commit e70c639

Please sign in to comment.