diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java index 676509fbb5ce8..7b63bbca28ade 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java @@ -63,7 +63,7 @@ public final class ImporterConfiguration { * @return logic table names */ public Collection getLogicTableNames() { - return Collections.unmodifiableList(shardingColumnsMap.keySet().stream().map(LogicTableName::getOriginal).collect(Collectors.toList())); + return Collections.unmodifiableList(shardingColumnsMap.keySet().stream().map(LogicTableName::toString).collect(Collectors.toList())); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/IdentifierName.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifier.java similarity index 77% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/IdentifierName.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifier.java index 3d3b26511cc6b..3955c6864b01f 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/IdentifierName.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifier.java @@ -18,25 +18,21 @@ package org.apache.shardingsphere.data.pipeline.common.metadata; import lombok.EqualsAndHashCode; -import lombok.Getter; /** - * Identifier name. - *

It might be schema name or table name, etc.

- *

It's case-insensitive.

+ * Case insensitive identifier. */ -@Getter // TODO table name case-sensitive for some database @EqualsAndHashCode(of = "lowercase") -public class IdentifierName { +public class CaseInsensitiveIdentifier { private final String original; private final String lowercase; - public IdentifierName(final String identifierName) { - original = identifierName; - lowercase = null == identifierName ? null : identifierName.toLowerCase(); + public CaseInsensitiveIdentifier(final String identifier) { + original = identifier; + lowercase = null == identifier ? null : identifier.toLowerCase(); } @Override diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaName.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaName.java index 09b8f47d41bf9..3ae4a740fdc84 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaName.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaName.java @@ -24,7 +24,7 @@ *

It might be null.

*

It's case-insensitive.

*/ -public class SchemaName extends IdentifierName { +public class SchemaName extends CaseInsensitiveIdentifier { public SchemaName(@Nullable final String schemaName) { super(schemaName); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaTableName.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaTableName.java index dbe50815e4e9b..56c3e70abed37 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaTableName.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaTableName.java @@ -49,7 +49,7 @@ public SchemaTableName(final String schemaName, final String tableName) { * @return text */ public String marshal() { - String schemaName = this.schemaName.getOriginal(); - return null == schemaName ? tableName.getOriginal() : schemaName + "." + tableName.getOriginal(); + String schemaName = this.schemaName.toString(); + return null == schemaName ? tableName.toString() : schemaName + "." + tableName; } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/TableName.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/TableName.java index 3ce327bea4129..26f2c17be14d0 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/TableName.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/TableName.java @@ -23,7 +23,7 @@ *

It might be logic table name or actual table name.

*

It's case-insensitive.

*/ -public class TableName extends IdentifierName { +public class TableName extends CaseInsensitiveIdentifier { public TableName(final String tableName) { super(tableName); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java index 2bea6b52dc573..ab2b84017dd13 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java @@ -29,7 +29,7 @@ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class PipelineMetaDataNode { - private static final String JOB_PATTERN_PREFIX = DataPipelineNodePath.DATA_PIPELINE_ROOT + "/jobs/(j\\d{2}\\d{2}[0-9a-z]+)"; + private static final String JOB_PATTERN_PREFIX = PipelineNodePath.DATA_PIPELINE_ROOT + "/jobs/(j\\d{2}\\d{2}[0-9a-z]+)"; public static final Pattern CONFIG_PATTERN = Pattern.compile(JOB_PATTERN_PREFIX + "/config"); @@ -47,8 +47,8 @@ public static String getMetaDataDataSourcesPath(final JobType jobType) { private static String getMetaDataRootPath(final JobType jobType) { return null == jobType - ? String.join("/", DataPipelineNodePath.DATA_PIPELINE_ROOT, "metadata") - : String.join("/", DataPipelineNodePath.DATA_PIPELINE_ROOT, jobType.getType().toLowerCase(), "metadata"); + ? String.join("/", PipelineNodePath.DATA_PIPELINE_ROOT, "metadata") + : String.join("/", PipelineNodePath.DATA_PIPELINE_ROOT, jobType.getType().toLowerCase(), "metadata"); } /** @@ -72,7 +72,7 @@ public static String getElasticJobNamespace() { } private static String getJobsPath() { - return String.join("/", DataPipelineNodePath.DATA_PIPELINE_ROOT, "jobs"); + return String.join("/", PipelineNodePath.DATA_PIPELINE_ROOT, "jobs"); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java index cb29fad0f59fa..39f5c0a064ba1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java @@ -50,7 +50,7 @@ public final class PipelineMetaDataNodeWatcher { private PipelineMetaDataNodeWatcher(final PipelineContextKey contextKey) { listenerMap.putAll(ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedEventHandler.class) .stream().collect(Collectors.toMap(PipelineMetaDataChangedEventHandler::getKeyPattern, each -> each))); - PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watch(DataPipelineNodePath.DATA_PIPELINE_ROOT, this::dispatchEvent); + PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watch(PipelineNodePath.DATA_PIPELINE_ROOT, this::dispatchEvent); } private void dispatchEvent(final DataChangedEvent event) { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/DataPipelineNodePath.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineNodePath.java similarity index 94% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/DataPipelineNodePath.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineNodePath.java index eb900e08f9dd8..7ad6e64eb6102 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/DataPipelineNodePath.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineNodePath.java @@ -21,10 +21,10 @@ import lombok.NoArgsConstructor; /** - * Data pipeline node path. + * Pipeline node path. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class DataPipelineNodePath { +public final class PipelineNodePath { /** * Data pipeline root path. diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java index 08fe31fb3f5aa..5962dc683f6db 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java @@ -69,9 +69,9 @@ public TableDataConsistencyCheckResult checkSingleTableInventoryData() { private TableDataConsistencyCheckResult checkSingleTableInventoryData(final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) { SingleTableInventoryCalculateParameter sourceParam = new SingleTableInventoryCalculateParameter(param.getSourceDataSource(), param.getSourceTable(), - param.getColumnNames(), param.getUniqueKeys(), param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName().getOriginal())); + param.getColumnNames(), param.getUniqueKeys(), param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName().toString())); SingleTableInventoryCalculateParameter targetParam = new SingleTableInventoryCalculateParameter(param.getTargetDataSource(), param.getTargetTable(), - param.getColumnNames(), param.getUniqueKeys(), param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName().getOriginal())); + param.getColumnNames(), param.getUniqueKeys(), param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName().toString())); SingleTableInventoryCalculator sourceCalculator = buildSingleTableInventoryCalculator(); calculators.add(sourceCalculator); SingleTableInventoryCalculator targetCalculator = buildSingleTableInventoryCalculator(); @@ -104,10 +104,10 @@ private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Iter break; } if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) { - param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName().getOriginal(), sourceCalculatedResult.getMaxUniqueKeyValue().get()); + param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName().toString(), sourceCalculatedResult.getMaxUniqueKeyValue().get()); } if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) { - param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName().getOriginal(), targetCalculatedResult.getMaxUniqueKeyValue().get()); + param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName().toString(), targetCalculatedResult.getMaxUniqueKeyValue().get()); } param.getProgressContext().onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount())); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java index ce4067da62e1c..1b17a1fe01889 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java @@ -69,7 +69,7 @@ public DatabaseType getDatabaseType() { * @return schema name */ public String getSchemaName() { - return table.getSchemaName().getOriginal(); + return table.getSchemaName().toString(); } /** @@ -78,7 +78,7 @@ public String getSchemaName() { * @return logic table name */ public String getLogicTableName() { - return table.getTableName().getOriginal(); + return table.getTableName().toString(); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java index 0845a42c5e05b..4a3eaa395cd79 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java @@ -113,8 +113,8 @@ private Collection splitByTable(final InventoryDumperCon dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().forEach((key, value) -> { InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext.getCommonContext()); // use original table name, for metadata loader, since some database table name case-sensitive - inventoryDumperContext.setActualTableName(key.getOriginal()); - inventoryDumperContext.setLogicTableName(value.getOriginal()); + inventoryDumperContext.setActualTableName(key.toString()); + inventoryDumperContext.setLogicTableName(value.toString()); inventoryDumperContext.getCommonContext().setPosition(new PlaceholderPosition()); inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames()); inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns()); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java index 200ee9f6793cc..d7b584de2722a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java @@ -61,7 +61,7 @@ public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) thro PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(targetDatabaseType); Collection createdSchemaNames = new HashSet<>(); for (CreateTableEntry each : createTableConfig.getCreateTableEntries()) { - String targetSchemaName = each.getTargetName().getSchemaName().getOriginal(); + String targetSchemaName = each.getTargetName().getSchemaName().toString(); if (null == targetSchemaName || targetSchemaName.equalsIgnoreCase(defaultSchema) || createdSchemaNames.contains(targetSchemaName)) { continue; } @@ -117,9 +117,9 @@ protected final String getCreateTargetTableSQL(final CreateTableEntry createTabl final SQLParserEngine sqlParserEngine) throws SQLException { DatabaseType databaseType = createTableEntry.getSourceDataSourceConfig().getDatabaseType(); DataSource sourceDataSource = dataSourceManager.getDataSource(createTableEntry.getSourceDataSourceConfig()); - String schemaName = createTableEntry.getSourceName().getSchemaName().getOriginal(); - String sourceTableName = createTableEntry.getSourceName().getTableName().getOriginal(); - String targetTableName = createTableEntry.getTargetName().getTableName().getOriginal(); + String schemaName = createTableEntry.getSourceName().getSchemaName().toString(); + String sourceTableName = createTableEntry.getSourceName().getTableName().toString(); + String targetTableName = createTableEntry.getTargetName().getTableName().toString(); PipelineDDLGenerator generator = new PipelineDDLGenerator(); return generator.generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine); } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index d64fd12236177..98fedbed77422 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -218,7 +218,7 @@ private Serializable handleValue(final PipelineColumnMetaData columnMetaData, fi } private DataRecord createDataRecord(final String type, final AbstractRowsEvent rowsEvent, final int columnCount) { - String tableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal(); + String tableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).toString(); IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()); DataRecord result = new DataRecord(type, tableName, position, columnCount); result.setActualTableName(rowsEvent.getTableName()); diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java index c67d1422797b2..56fcf76793ac4 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java @@ -118,7 +118,7 @@ private DataRecord handleDeleteRowEvent(final DeleteRowEvent event, final Pipeli } private DataRecord createDataRecord(final String type, final AbstractRowEvent rowsEvent, final int columnCount) { - String tableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal(); + String tableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).toString(); DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(), tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount); result.setActualTableName(rowsEvent.getTableName()); result.setCsn(rowsEvent.getCsn()); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java index bfb58d92922a8..18ba1c25c9a87 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java @@ -127,7 +127,7 @@ private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Stri ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(dataNode.getSchemaName(), dataNode.getTableName())); List columnNames = tableMetaData.getColumnNames(); List uniqueKeys = PipelineTableMetaDataUtils.getUniqueKeyColumns( - sourceTable.getSchemaName().getOriginal(), sourceTable.getTableName().getOriginal(), metaDataLoader); + sourceTable.getSchemaName().toString(), sourceTable.getTableName().toString(), metaDataLoader); TableInventoryCheckParameter param = new TableInventoryCheckParameter( jobConfig.getJobId(), sourceDataSource, targetDataSource, sourceTable, targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm, progressContext); TableInventoryChecker tableInventoryChecker = tableChecker.buildTableInventoryChecker(param); diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java index 33af5f101a3f8..9fd06b84566a8 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java @@ -194,7 +194,7 @@ private String getOrderTableNameWithSchema(final DialectDatabaseMetaData dialect private void assertDataMatched(final PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final SchemaTableName schemaTableName) { StandardPipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(targetDataSource); - PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().getOriginal(), schemaTableName.getTableName().getOriginal()); + PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().toString(), schemaTableName.getTableName().toString()); List uniqueKeys = Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0))); ConsistencyCheckJobItemProgressContext progressContext = new ConsistencyCheckJobItemProgressContext("", 0, sourceDataSource.getDatabaseType().getType()); TableInventoryCheckParameter param = new TableInventoryCheckParameter("", sourceDataSource, targetDataSource, schemaTableName, schemaTableName, diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java index 5472c52644cac..dc881b3dd153b 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java @@ -19,7 +19,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper; -import org.apache.shardingsphere.data.pipeline.common.metadata.node.DataPipelineNodePath; +import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath; import org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition; import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; @@ -70,8 +70,8 @@ static void beforeClass() { } private static void watch() { - governanceRepositoryAPI.watch(DataPipelineNodePath.DATA_PIPELINE_ROOT, event -> { - if ((DataPipelineNodePath.DATA_PIPELINE_ROOT + "/1").equals(event.getKey())) { + governanceRepositoryAPI.watch(PipelineNodePath.DATA_PIPELINE_ROOT, event -> { + if ((PipelineNodePath.DATA_PIPELINE_ROOT + "/1").equals(event.getKey())) { EVENT_ATOMIC_REFERENCE.set(event); COUNT_DOWN_LATCH.countDown(); } @@ -114,7 +114,7 @@ void assertPersistJobCheckResult() { @Test void assertDeleteJob() { - governanceRepositoryAPI.persist(DataPipelineNodePath.DATA_PIPELINE_ROOT + "/1", ""); + governanceRepositoryAPI.persist(PipelineNodePath.DATA_PIPELINE_ROOT + "/1", ""); governanceRepositoryAPI.deleteJob("1"); Optional actual = governanceRepositoryAPI.getJobItemProgress("1", 0); assertFalse(actual.isPresent()); @@ -122,15 +122,15 @@ void assertDeleteJob() { @Test void assertGetChildrenKeys() { - governanceRepositoryAPI.persist(DataPipelineNodePath.DATA_PIPELINE_ROOT + "/1", ""); - List actual = governanceRepositoryAPI.getChildrenKeys(DataPipelineNodePath.DATA_PIPELINE_ROOT); + governanceRepositoryAPI.persist(PipelineNodePath.DATA_PIPELINE_ROOT + "/1", ""); + List actual = governanceRepositoryAPI.getChildrenKeys(PipelineNodePath.DATA_PIPELINE_ROOT); assertFalse(actual.isEmpty()); assertTrue(actual.contains("1")); } @Test void assertWatch() throws InterruptedException { - String key = DataPipelineNodePath.DATA_PIPELINE_ROOT + "/1"; + String key = PipelineNodePath.DATA_PIPELINE_ROOT + "/1"; governanceRepositoryAPI.persist(key, ""); boolean awaitResult = COUNT_DOWN_LATCH.await(10, TimeUnit.SECONDS); assertTrue(awaitResult);