diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/MigrateTableExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/MigrateTableExecutor.java index a83160b4dbd5c..78bc1f236779c 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/MigrateTableExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/MigrateTableExecutor.java @@ -20,12 +20,14 @@ import lombok.Setter; import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequiredTargetDatabaseException; +import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.MigrateTableStatement; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI; import org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorDatabaseAware; import org.apache.shardingsphere.distsql.handler.engine.update.DistSQLUpdateExecutor; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; +import org.apache.shardingsphere.infra.instance.InstanceContext; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; @@ -41,6 +43,9 @@ public final class MigrateTableExecutor implements DistSQLUpdateExecutor new PipelineInvalidParameterException(String.format("Only `Cluster` is supported now, but current mode type is `%s`", instanceContext.getModeConfiguration().getType()))); checkTargetDatabase(sqlStatement); String targetDatabaseName = null == sqlStatement.getTargetDatabaseName() ? database.getName() : sqlStatement.getTargetDatabaseName(); MigrationJobAPI jobAPI = (MigrationJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION"); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitExecutor.java index 8b0431cf032c4..904b8b658cc16 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitExecutor.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update; import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey; +import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.RegisterMigrationSourceStorageUnitStatement; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI; @@ -32,6 +33,7 @@ import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException; +import org.apache.shardingsphere.infra.instance.InstanceContext; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.mode.manager.ContextManager; @@ -51,13 +53,16 @@ public final class RegisterMigrationSourceStorageUnitExecutor implements DistSQL @Override public void executeUpdate(final RegisterMigrationSourceStorageUnitStatement sqlStatement, final ContextManager contextManager) { + InstanceContext instanceContext = contextManager.getInstanceContext(); + ShardingSpherePreconditions.checkState(instanceContext.isCluster(), + () -> new PipelineInvalidParameterException(String.format("Only `Cluster` is supported now, but current mode type is `%s`", instanceContext.getModeConfiguration().getType()))); checkDataSource(sqlStatement); List dataSources = new ArrayList<>(sqlStatement.getDataSources()); URLBasedDataSourceSegment urlBasedDataSourceSegment = (URLBasedDataSourceSegment) dataSources.get(0); DatabaseType databaseType = DatabaseTypeFactory.get(urlBasedDataSourceSegment.getUrl()); Map propsMap = DataSourceSegmentsConverter.convert(databaseType, dataSources); validateHandler.validate(propsMap); - jobAPI.addMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), propsMap); + jobAPI.registerMigrationSourceStorageUnits(new PipelineContextKey(InstanceType.PROXY), propsMap); } private void checkDataSource(final RegisterMigrationSourceStorageUnitStatement sqlStatement) { diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java index 9f104b7938a24..37758f39f8ac4 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java @@ -198,12 +198,12 @@ private Map buildTargetTableSchemaMap(final Map propsMap) { + public void registerMigrationSourceStorageUnits(final PipelineContextKey contextKey, final Map propsMap) { Map existDataSources = dataSourcePersistService.load(contextKey, getType()); Collection duplicateDataSourceNames = new HashSet<>(propsMap.size(), 1F); for (Entry entry : propsMap.entrySet()) { diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index 8bcbc33ee9e4a..362556cac2b5d 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -126,7 +126,7 @@ static void beforeClass() { props.put("jdbcUrl", jdbcUrl); props.put("username", "root"); props.put("password", "root"); - jobAPI.addMigrationSourceResources(PipelineContextUtils.getContextKey(), Collections.singletonMap("ds_0", new DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props))); + jobAPI.registerMigrationSourceStorageUnits(PipelineContextUtils.getContextKey(), Collections.singletonMap("ds_0", new DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props))); } @AfterAll