Skip to content

Commit

Permalink
Refactor MigrationJobAPI (#29213)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 26, 2023
1 parent d442e25 commit 41db48e
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.migration.distsql.handler.query;

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
Expand All @@ -35,11 +36,11 @@
*/
public final class ShowMigrationSourceStorageUnitsExecutor implements QueryableRALExecutor<ShowMigrationSourceStorageUnitsStatement> {

private final MigrationJobOption jobOption = new MigrationJobOption();
private final MigrationJobManager jobManager = new MigrationJobManager(new MigrationJobOption());

@Override
public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationSourceStorageUnitsStatement sqlStatement) {
Iterator<Collection<Object>> data = jobOption.listMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY)).iterator();
Iterator<Collection<Object>> data = jobManager.listMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY)).iterator();
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
while (data.hasNext()) {
result.add(new LocalDataQueryResultRow((List<Object>) data.next()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequiredTargetDatabaseException;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
Expand All @@ -32,13 +33,13 @@
@Slf4j
public final class MigrateTableUpdater implements RALUpdater<MigrateTableStatement> {

private final MigrationJobOption jobOption = new MigrationJobOption();
private final MigrationJobManager jobManager = new MigrationJobManager(new MigrationJobOption());

@Override
public void executeUpdate(final String databaseName, final MigrateTableStatement sqlStatement) {
String targetDatabaseName = null == sqlStatement.getTargetDatabaseName() ? databaseName : sqlStatement.getTargetDatabaseName();
ShardingSpherePreconditions.checkNotNull(targetDatabaseName, MissingRequiredTargetDatabaseException::new);
jobOption.createJobAndStart(new PipelineContextKey(InstanceType.PROXY), new MigrateTableStatement(sqlStatement.getSourceTargetEntries(), targetDatabaseName));
jobManager.start(new PipelineContextKey(InstanceType.PROXY), new MigrateTableStatement(sqlStatement.getSourceTargetEntries(), targetDatabaseName));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.distsql.handler.validate.DataSourcePoolPropertiesValidateHandler;
Expand All @@ -42,7 +43,7 @@
*/
public final class RegisterMigrationSourceStorageUnitUpdater implements RALUpdater<RegisterMigrationSourceStorageUnitStatement> {

private final MigrationJobOption jobOption = new MigrationJobOption();
private final MigrationJobManager jobManager = new MigrationJobManager(new MigrationJobOption());

private final DataSourcePoolPropertiesValidateHandler validateHandler = new DataSourcePoolPropertiesValidateHandler();

Expand All @@ -55,7 +56,7 @@ public void executeUpdate(final String databaseName, final RegisterMigrationSour
DatabaseType databaseType = DatabaseTypeFactory.get(urlBasedDataSourceSegment.getUrl());
Map<String, DataSourcePoolProperties> propsMap = DataSourceSegmentsConverter.convert(databaseType, dataSources);
validateHandler.validate(propsMap);
jobOption.addMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), propsMap);
jobManager.addMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), propsMap);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;

import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
Expand All @@ -28,11 +29,11 @@
*/
public final class UnregisterMigrationSourceStorageUnitUpdater implements RALUpdater<UnregisterMigrationSourceStorageUnitStatement> {

private final MigrationJobOption jobOption = new MigrationJobOption();
private final MigrationJobManager jobManager = new MigrationJobManager(new MigrationJobOption());

@Override
public void executeUpdate(final String databaseName, final UnregisterMigrationSourceStorageUnitStatement sqlStatement) {
jobOption.dropMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames());
jobManager.dropMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,11 @@ public void stop(final String parentJobId) {
* @param parentJobId parent job id
*/
public void drop(final String parentJobId) {
jobManager.stop(parentJobId);
String latestCheckJobId = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
jobManager.stop(latestCheckJobId);
PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(parentJobId);
PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
Collection<String> checkJobIds = governanceFacade.getJobFacade().getCheck().listCheckJobIds(parentJobId);
String latestCheckJobId = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
Optional<Integer> previousSequence = ConsistencyCheckSequence.getPreviousSequence(
checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()), ConsistencyCheckJobId.parseSequence(latestCheckJobId));
if (previousSequence.isPresent()) {
Expand Down
Loading

0 comments on commit 41db48e

Please sign in to comment.