Skip to content

Commit

Permalink
Remove SPI from PipelineJobOption (#29206)
Browse files Browse the repository at this point in the history
* Refactor PipelineJobConfigurationManager

* Refactor MigrationJobAPI

* Remove SPI from PipelineJobOption

* Remove SPI from PipelineJobOption
  • Loading branch information
terrymanu authored Nov 25, 2023
1 parent 14cdd1f commit 40e396d
Show file tree
Hide file tree
Showing 35 changed files with 108 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.common.job;

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;

/**
* Pipeline job id.
Expand All @@ -30,7 +30,7 @@ public interface PipelineJobId {
*
* @return type
*/
JobType getJobType();
PipelineJobType getJobType();

/**
* Get format version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
@Slf4j
public final class JobCodeRegistry {

private static final Map<String, JobType> JOB_CODE_AND_TYPE_MAP = new HashMap<>();
private static final Map<String, PipelineJobType> JOB_CODE_AND_TYPE_MAP = new HashMap<>();

static {
for (JobType each : ShardingSphereServiceLoader.getServiceInstances(JobType.class)) {
for (PipelineJobType each : ShardingSphereServiceLoader.getServiceInstances(PipelineJobType.class)) {
Preconditions.checkArgument(2 == each.getCode().length(), "Job type code length is not 2.");
JOB_CODE_AND_TYPE_MAP.put(each.getCode(), each);
}
Expand All @@ -48,7 +48,7 @@ public final class JobCodeRegistry {
* @param jobTypeCode job type code
* @return job type
*/
public static JobType getJobType(final String jobTypeCode) {
public static PipelineJobType getJobType(final String jobTypeCode) {
Preconditions.checkArgument(JOB_CODE_AND_TYPE_MAP.containsKey(jobTypeCode), "Can not get job type by `%s`.", jobTypeCode);
return JOB_CODE_AND_TYPE_MAP.get(jobTypeCode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.shardingsphere.data.pipeline.common.job.type;

import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;

/**
* Job type.
* Pipeline job type.
*/
@SingletonSPI
public interface JobType extends TypedSPI {
public interface PipelineJobType extends TypedSPI {

/**
* Get job type code.
Expand All @@ -33,6 +34,13 @@ public interface JobType extends TypedSPI {
*/
String getCode();

/**
* Get job option.
*
* @return job option
*/
PipelineJobOption getOption();

@Override
String getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.common.metadata.node.config.processor.impl;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.config.processor.JobConfigurationChangedProcessor;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
Expand Down Expand Up @@ -89,7 +89,7 @@ protected void executeJob(final JobConfiguration jobConfig) {

protected abstract AbstractPipelineJob buildPipelineJob(String jobId);

protected abstract JobType getJobType();
protected abstract PipelineJobType getJobType();

@Override
public String getType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
Expand Down Expand Up @@ -65,7 +66,7 @@ public abstract class AbstractPipelineJob implements PipelineJob {

protected AbstractPipelineJob(final String jobId) {
this.jobId = jobId;
jobOption = TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobId).getType());
jobOption = TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobId).getType()).getOption();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;

/**
* Abstract pipeline job id.
Expand All @@ -33,7 +33,7 @@ public abstract class AbstractPipelineJobId implements PipelineJobId {

public static final String CURRENT_VERSION = "02";

private final JobType jobType;
private final PipelineJobType jobType;

private final PipelineContextKey contextKey;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.common.util.InstanceTypeUtils;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
Expand Down Expand Up @@ -64,7 +64,7 @@ public static String marshalJobIdCommonPrefix(final PipelineJobId pipelineJobId)
* @param jobId job id
* @return job type
*/
public static JobType parseJobType(final String jobId) {
public static PipelineJobType parseJobType(final String jobId) {
verifyJobId(jobId);
return JobCodeRegistry.getJobType(jobId.substring(1, 3));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;

import java.util.Optional;
Expand All @@ -33,7 +32,7 @@
* Pipeline job option.
*/
@SingletonSPI
public interface PipelineJobOption extends TypedSPI {
public interface PipelineJobOption {

/**
* Get YAML pipeline job configuration swapper.
Expand Down Expand Up @@ -95,6 +94,10 @@ default boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
*/
Class<? extends PipelineJob> getJobClass();

@Override
/**
* Get job type.
*
* @return job type
*/
String getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
Expand Down Expand Up @@ -130,7 +130,7 @@ private static synchronized void persist(final String jobId, final int shardingI
}
persistContext.getHasNewEvents().set(false);
long startTimeMillis = System.currentTimeMillis();
new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobId).getType())
new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobId).getType()).getOption()
.getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
persistContext.getBeforePersistingProgressMillis().set(null);
if (6 == ThreadLocalRandom.current().nextInt(100)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ public <T extends PipelineJobConfiguration> T getJobConfiguration(final String j
public JobConfigurationPOJO convertToJobConfigurationPOJO(final PipelineJobConfiguration jobConfig) {
JobConfigurationPOJO result = new JobConfigurationPOJO();
result.setJobName(jobConfig.getJobId());
int shardingTotalCount = jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : jobConfig.getJobShardingCount();
result.setShardingTotalCount(shardingTotalCount);
result.setShardingTotalCount(jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : jobConfig.getJobShardingCount());
result.setJobParameter(YamlEngine.marshal(jobOption.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
String createTimeFormat = LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter());
result.getProps().setProperty("create_time", createTimeFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
Expand Down Expand Up @@ -109,7 +110,7 @@ private void startCurrentDisabledJob(final String jobId) {
private void startNextDisabledJob(final String jobId, final String toBeStartDisabledNextJobType) {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional -> {
try {
new PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, toBeStartDisabledNextJobType)).startDisabledJob(optional);
new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStartDisabledNextJobType).getOption()).startDisabledJob(optional);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
Expand All @@ -131,7 +132,7 @@ public void stop(final String jobId) {
private void stopPreviousJob(final String jobId, final String toBeStoppedPreviousJobType) {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional -> {
try {
new PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, toBeStoppedPreviousJobType)).stop(optional);
new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStoppedPreviousJobType).getOption()).stop(optional);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
Expand Down Expand Up @@ -66,7 +67,7 @@ public TransmissionTasksRunner(final TransmissionJobItemContext jobItemContext)
this.jobItemContext = jobItemContext;
inventoryTasks = jobItemContext.getInventoryTasks();
incrementalTasks = jobItemContext.getIncrementalTasks();
jobOption = TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType());
jobOption = TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption();
jobManager = new PipelineJobManager(jobOption);
jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
}
Expand All @@ -89,7 +90,7 @@ public void start() {
if (jobItemContext.isStopping()) {
return;
}
new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption()
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
if (PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) {
log.info("All inventory tasks finished.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,25 @@

package org.apache.shardingsphere.data.pipeline.common.job.type;

import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;

import static org.mockito.Mockito.mock;

/**
* Fixture job type.
*/
public final class FixtureJobType implements JobType {
public final class FixtureJobType implements PipelineJobType {

@Override
public String getCode() {
return "00";
}

@Override
public PipelineJobOption getOption() {
return mock(PipelineJobOption.class);
}

@Override
public String getType() {
return "FIXTURE";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
Expand All @@ -41,7 +41,7 @@ public final class ShowStreamingJobStatusExecutor implements QueryableRALExecuto

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingStatusStatement sqlStatement) {
TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobOption.class, new CDCJobType().getType());
TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, new CDCJobType().getType()).getOption();
List<TransmissionJobItemInfo> jobItemInfos = new TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
Expand All @@ -40,7 +40,7 @@ public final class ShowStreamingRuleExecutor implements QueryableRALExecutor<Sho

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowStreamingRuleStatement sqlStatement) {
PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobOption) TypedSPILoader.getService(PipelineJobOption.class, "STREAMING"))
PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, "STREAMING").getOption())
.showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY));
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.shardingsphere.migration.distsql.handler.query;

import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
Expand All @@ -40,7 +40,7 @@ public final class ShowMigrationJobStatusExecutor implements QueryableRALExecuto

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationStatusStatement sqlStatement) {
TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobOption.class, "MIGRATION");
TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, "MIGRATION").getOption();
List<TransmissionJobItemInfo> jobItemInfos = new TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@

package org.apache.shardingsphere.data.pipeline.cdc.api.job.type;

import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;

/**
* CDC job type.
*/
public final class CDCJobType implements JobType {
public final class CDCJobType implements PipelineJobType {

@Override
public String getCode() {
return "03";
}

@Override
public PipelineJobOption getOption() {
return new CDCJobOption();
}

@Override
public String getType() {
return "STREAMING";
Expand Down
Loading

0 comments on commit 40e396d

Please sign in to comment.