From 34c75675ff69f50cee7cd5454ef4e4f3da6ca2a0 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 19 Nov 2023 15:34:29 +0800 Subject: [PATCH] Add DataConsistencyCheckAlgorithmInfoRegistry --- ...ConsistencyCheckAlgorithmInfoRegistry.java | 61 +++++++++++++++++++ .../InventoryIncrementalJobManager.java | 32 +--------- .../ShowMigrationCheckAlgorithmsExecutor.java | 8 +-- 3 files changed, 65 insertions(+), 36 deletions(-) create mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/DataConsistencyCheckAlgorithmInfoRegistry.java diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/DataConsistencyCheckAlgorithmInfoRegistry.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/DataConsistencyCheckAlgorithmInfoRegistry.java new file mode 100644 index 0000000000000..efde834078ded --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/DataConsistencyCheckAlgorithmInfoRegistry.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.common.pojo; + +import lombok.NoArgsConstructor; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker; +import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader; +import org.apache.shardingsphere.infra.spi.annotation.SPIDescription; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.stream.Collectors; + +/** + * Data consistency check algorithm info registry. + */ +@NoArgsConstructor +public final class DataConsistencyCheckAlgorithmInfoRegistry { + + private static final Collection ALGORITHM_INFOS = loadAllAlgorithms(); + + private static Collection loadAllAlgorithms() { + Collection result = new LinkedList<>(); + for (TableDataConsistencyChecker each : ShardingSphereServiceLoader.getServiceInstances(TableDataConsistencyChecker.class)) { + SPIDescription description = each.getClass().getAnnotation(SPIDescription.class); + String typeAliases = each.getTypeAliases().stream().map(Object::toString).collect(Collectors.joining(",")); + result.add( + new DataConsistencyCheckAlgorithmInfo(each.getType(), typeAliases, getSupportedDatabaseTypes(each.getSupportedDatabaseTypes()), null == description ? "" : description.value())); + } + return result; + } + + private static Collection getSupportedDatabaseTypes(final Collection supportedDatabaseTypes) { + return supportedDatabaseTypes.isEmpty() ? ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class) : supportedDatabaseTypes; + } + + /** + * Get all data consistency check algorithm infos. + * + * @return all data consistency check algorithm infos + */ + public static Collection getAllAlgorithmInfos() { + return ALGORITHM_INFOS; + } +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java index 764866c717e7a..2ef78b7e8bf25 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java @@ -28,27 +28,20 @@ import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper; -import org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; -import org.apache.shardingsphere.infra.database.core.type.DatabaseType; -import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader; -import org.apache.shardingsphere.infra.spi.annotation.SPIDescription; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; -import java.util.Collection; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.stream.Collectors; import java.util.stream.IntStream; /** @@ -89,11 +82,9 @@ public PipelineProcessConfiguration showProcessConfiguration(final PipelineConte * @return job item infos */ public List getJobItemInfos(final String jobId) { - PipelineJobManager jobManager = new PipelineJobManager(jobAPI); - PipelineJobConfiguration jobConfig = jobManager.getJobConfiguration(jobId); + PipelineJobConfiguration jobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(jobId); long startTimeMillis = Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0")); - InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager(jobAPI); - Map jobProgress = inventoryIncrementalJobManager.getJobProgress(jobConfig); + Map jobProgress = getJobProgress(jobConfig); List result = new LinkedList<>(); PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); for (Entry entry : jobProgress.entrySet()) { @@ -155,25 +146,6 @@ public JobOffsetInfo getJobOffsetInfo(final String jobId) { return new YamlJobOffsetInfoSwapper().swapToObject(offsetInfo.isPresent() ? YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class) : new YamlJobOffsetInfo()); } - /** - * List all data consistency check algorithms from SPI. - * - * @return data consistency check algorithms - */ - public Collection listDataConsistencyCheckAlgorithms() { - Collection result = new LinkedList<>(); - for (TableDataConsistencyChecker each : ShardingSphereServiceLoader.getServiceInstances(TableDataConsistencyChecker.class)) { - SPIDescription description = each.getClass().getAnnotation(SPIDescription.class); - String typeAliases = each.getTypeAliases().stream().map(Object::toString).collect(Collectors.joining(",")); - result.add(new DataConsistencyCheckAlgorithmInfo(each.getType(), typeAliases, getSupportedDatabaseTypes(each.getSupportedDatabaseTypes()), null == description ? "" : description.value())); - } - return result; - } - - private Collection getSupportedDatabaseTypes(final Collection supportedDatabaseTypes) { - return supportedDatabaseTypes.isEmpty() ? ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class) : supportedDatabaseTypes; - } - /** * Aggregate data consistency check results. * diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java index 425d94e993bbd..23ac3a43c7ed0 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java @@ -17,13 +17,10 @@ package org.apache.shardingsphere.migration.distsql.handler.query; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; +import org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfoRegistry; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow; -import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement; import java.util.Arrays; @@ -37,8 +34,7 @@ public final class ShowMigrationCheckAlgorithmsExecutor implements QueryableRALE @Override public Collection getRows(final ShowMigrationCheckAlgorithmsStatement sqlStatement) { - InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION")); - return inventoryIncrementalJobManager.listDataConsistencyCheckAlgorithms().stream().map( + return DataConsistencyCheckAlgorithmInfoRegistry.getAllAlgorithmInfos().stream().map( each -> new LocalDataQueryResultRow(each.getType(), each.getTypeAliases(), each.getSupportedDatabaseTypes().stream().map(DatabaseType::getType).collect(Collectors.joining(",")), each.getDescription())) .collect(Collectors.toList());