Skip to content

Commit

Permalink
Use JobConfigurationChangedProcessEngine to instead of AbstractJobCon…
Browse files Browse the repository at this point in the history
…figurationChangedProcessor (#29379)

* Refactor AbstractJobConfigurationChangedProcessor

* Refactor AbstractJobConfigurationChangedProcessor

* Use JobConfigurationChangedProcessEngine to instead of AbstractJobConfigurationChangedProcessor
  • Loading branch information
terrymanu authored Dec 12, 2023
1 parent 5a7eb85 commit f32600e
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,37 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.impl;
package org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.JobConfigurationChangedProcessor;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;

/**
* Abstract job configuration changed processor.
* Job configuration changed process engine.
*/
@Slf4j
public abstract class AbstractJobConfigurationChangedProcessor implements JobConfigurationChangedProcessor {
public final class JobConfigurationChangedProcessEngine {

@Override
public final void process(final Type eventType, final JobConfiguration jobConfig) {
/**
* Process changed job configuration.
*
* @param eventType event type
* @param jobConfig pipeline job configuration
* @param processor pipeline job configuration changed processor
*/
public void process(final Type eventType, final JobConfiguration jobConfig, final JobConfigurationChangedProcessor processor) {
String jobId = jobConfig.getJobName();
if (jobConfig.isDisabled()) {
PipelineJobRegistry.stop(jobId);
onDisabled(jobId);
disableJob(jobId);
return;
}
switch (eventType) {
Expand All @@ -50,42 +54,31 @@ public final void process(final Type eventType, final JobConfiguration jobConfig
if (PipelineJobRegistry.isExisting(jobId)) {
log.info("{} added to executing jobs failed since it already exists", jobId);
} else {
executeJob(jobConfig);
executeJob(jobConfig, processor);
}
break;
case DELETED:
PipelineJobRegistry.stop(jobId);
onDeleted(jobConfig);
processor.clean(jobConfig);
break;
default:
break;
}
}

private void onDisabled(final String jobId) {
private void disableJob(final String jobId) {
PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
for (Integer each : PipelineJobRegistry.getShardingItems(jobId)) {
distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), each);
}
}

protected void executeJob(final JobConfiguration jobConfig) {
PipelineJob job = buildJob();
private void executeJob(final JobConfiguration jobConfig, final JobConfigurationChangedProcessor processor) {
PipelineJob job = processor.createJob();
String jobId = jobConfig.getJobName();
PipelineJobRegistry.add(jobId, job);
OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)), job, jobConfig);
job.getJobRunnerManager().setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
}

protected abstract PipelineJob buildJob();

protected abstract void onDeleted(JobConfiguration jobConfig);

protected abstract PipelineJobType getJobType();

@Override
public String getType() {
return getJobType().getType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,29 @@

package org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor;

import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;

/**
* Job configuration changed processor.
*/
public interface JobConfigurationChangedProcessor extends TypedSPI {

/**
* Process changed job configuration.
*
* @param eventType event type
* @param jobConfig job configuration
* Create pipeline job.
*
* @return pipeline job
*/
void process(Type eventType, JobConfiguration jobConfig);
PipelineJob createJob();

/**
* Clean pipeline job.
*
* @param jobConfig pipeline job configuration
*/
void clean(JobConfiguration jobConfig);

@Override
String getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.impl;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.JobConfigurationChangedProcessEngine;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.JobConfigurationChangedProcessor;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
Expand Down Expand Up @@ -53,7 +54,7 @@ public void handle(final String jobId, final DataChangedEvent event) {
return;
}
log.info("{} job configuration: {}, disabled={}", event.getType(), event.getKey(), jobConfig.isDisabled());
TypedSPILoader.findService(JobConfigurationChangedProcessor.class, PipelineJobIdUtils.parseJobType(jobConfig.getJobName()).getType())
.ifPresent(optional -> optional.process(event.getType(), jobConfig));
TypedSPILoader.findService(JobConfigurationChangedProcessor.class, PipelineJobIdUtils.parseJobType(jobConfig.getJobName()).getType()).ifPresent(
optional -> new JobConfigurationChangedProcessEngine().process(event.getType(), jobConfig, optional));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,27 @@

package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.impl.AbstractJobConfigurationChangedProcessor;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.JobConfigurationChangedProcessor;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;

/**
* Consistency check job configuration changed processor.
*/
@Slf4j
public final class ConsistencyCheckJobConfigurationChangedProcessor extends AbstractJobConfigurationChangedProcessor {
public final class ConsistencyCheckJobConfigurationChangedProcessor implements JobConfigurationChangedProcessor {

@Override
protected void onDeleted(final JobConfiguration jobConfig) {
public PipelineJob createJob() {
return new ConsistencyCheckJob();
}

@Override
protected PipelineJob buildJob() {
return new ConsistencyCheckJob();
public void clean(final JobConfiguration jobConfig) {
}

@Override
protected PipelineJobType getJobType() {
return new ConsistencyCheckJobType();
public String getType() {
return "CONSISTENCY_CHECK";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,30 @@

package org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.impl.AbstractJobConfigurationChangedProcessor;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.JobConfigurationChangedProcessor;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;

/**
* Migration job configuration changed processor.
*/
@Slf4j
public final class MigrationJobConfigurationChangedProcessor extends AbstractJobConfigurationChangedProcessor {
public final class MigrationJobConfigurationChangedProcessor implements JobConfigurationChangedProcessor {

@Override
protected void onDeleted(final JobConfiguration jobConfig) {
new MigrationJobPreparer().cleanup(new YamlMigrationJobConfigurationSwapper().swapToObject(jobConfig.getJobParameter()));
public PipelineJob createJob() {
return new MigrationJob();
}

@Override
protected PipelineJob buildJob() {
return new MigrationJob();
public void clean(final JobConfiguration jobConfig) {
new MigrationJobPreparer().cleanup(new YamlMigrationJobConfigurationSwapper().swapToObject(jobConfig.getJobParameter()));
}

@Override
protected PipelineJobType getJobType() {
return new MigrationJobType();
public String getType() {
return "MIGRATION";
}
}

0 comments on commit f32600e

Please sign in to comment.