Skip to content

Commit

Permalink
Add unsupported mode type check for migration (#30339)
Browse files Browse the repository at this point in the history
* Add unsupported mode type check

* Replaced by cluster

* Use instanceContext.isCluster()
  • Loading branch information
azexcy authored Feb 28, 2024
1 parent cddab76 commit cca14db
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequiredTargetDatabaseException;
import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.MigrateTableStatement;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorDatabaseAware;
import org.apache.shardingsphere.distsql.handler.engine.update.DistSQLUpdateExecutor;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
Expand All @@ -41,6 +43,9 @@ public final class MigrateTableExecutor implements DistSQLUpdateExecutor<Migrate

@Override
public void executeUpdate(final MigrateTableStatement sqlStatement, final ContextManager contextManager) {
InstanceContext instanceContext = contextManager.getInstanceContext();
ShardingSpherePreconditions.checkState(instanceContext.isCluster(),
() -> new PipelineInvalidParameterException(String.format("Only `Cluster` is supported now, but current mode type is `%s`", instanceContext.getModeConfiguration().getType())));
checkTargetDatabase(sqlStatement);
String targetDatabaseName = null == sqlStatement.getTargetDatabaseName() ? database.getName() : sqlStatement.getTargetDatabaseName();
MigrationJobAPI jobAPI = (MigrationJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;

import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.RegisterMigrationSourceStorageUnitStatement;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
Expand All @@ -32,6 +33,7 @@
import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.manager.ContextManager;
Expand All @@ -51,13 +53,16 @@ public final class RegisterMigrationSourceStorageUnitExecutor implements DistSQL

@Override
public void executeUpdate(final RegisterMigrationSourceStorageUnitStatement sqlStatement, final ContextManager contextManager) {
InstanceContext instanceContext = contextManager.getInstanceContext();
ShardingSpherePreconditions.checkState(instanceContext.isCluster(),
() -> new PipelineInvalidParameterException(String.format("Only `Cluster` is supported now, but current mode type is `%s`", instanceContext.getModeConfiguration().getType())));
checkDataSource(sqlStatement);
List<DataSourceSegment> dataSources = new ArrayList<>(sqlStatement.getDataSources());
URLBasedDataSourceSegment urlBasedDataSourceSegment = (URLBasedDataSourceSegment) dataSources.get(0);
DatabaseType databaseType = DatabaseTypeFactory.get(urlBasedDataSourceSegment.getUrl());
Map<String, DataSourcePoolProperties> propsMap = DataSourceSegmentsConverter.convert(databaseType, dataSources);
validateHandler.validate(propsMap);
jobAPI.addMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), propsMap);
jobAPI.registerMigrationSourceStorageUnits(new PipelineContextKey(InstanceType.PROXY), propsMap);
}

private void checkDataSource(final RegisterMigrationSourceStorageUnitStatement sqlStatement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,12 @@ private Map<String, String> buildTargetTableSchemaMap(final Map<String, List<Dat
}

/**
* Add migration source resources.
* Register migration source storage units.
*
* @param contextKey context key
* @param propsMap data source pool properties map
*/
public void addMigrationSourceResources(final PipelineContextKey contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
public void registerMigrationSourceStorageUnits(final PipelineContextKey contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
Map<String, DataSourcePoolProperties> existDataSources = dataSourcePersistService.load(contextKey, getType());
Collection<String> duplicateDataSourceNames = new HashSet<>(propsMap.size(), 1F);
for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ static void beforeClass() {
props.put("jdbcUrl", jdbcUrl);
props.put("username", "root");
props.put("password", "root");
jobAPI.addMigrationSourceResources(PipelineContextUtils.getContextKey(), Collections.singletonMap("ds_0", new DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
jobAPI.registerMigrationSourceStorageUnits(PipelineContextUtils.getContextKey(), Collections.singletonMap("ds_0", new DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
}

@AfterAll
Expand Down

0 comments on commit cca14db

Please sign in to comment.