Skip to content

Commit

Permalink
Rename CaseInsensitiveIdentifier (#28981)
Browse files Browse the repository at this point in the history
* Rename PipelineNodePath

* Rename CaseInsensitiveIdentifier
  • Loading branch information
terrymanu authored Nov 7, 2023
1 parent 1c54ed8 commit a73e82d
Show file tree
Hide file tree
Showing 17 changed files with 40 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public final class ImporterConfiguration {
* @return logic table names
*/
public Collection<String> getLogicTableNames() {
return Collections.unmodifiableList(shardingColumnsMap.keySet().stream().map(LogicTableName::getOriginal).collect(Collectors.toList()));
return Collections.unmodifiableList(shardingColumnsMap.keySet().stream().map(LogicTableName::toString).collect(Collectors.toList()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,21 @@
package org.apache.shardingsphere.data.pipeline.common.metadata;

import lombok.EqualsAndHashCode;
import lombok.Getter;

/**
* Identifier name.
* <p>It might be schema name or table name, etc.</p>
* <p>It's case-insensitive.</p>
* Case insensitive identifier.
*/
@Getter
// TODO table name case-sensitive for some database
@EqualsAndHashCode(of = "lowercase")
public class IdentifierName {
public class CaseInsensitiveIdentifier {

private final String original;

private final String lowercase;

public IdentifierName(final String identifierName) {
original = identifierName;
lowercase = null == identifierName ? null : identifierName.toLowerCase();
public CaseInsensitiveIdentifier(final String identifier) {
original = identifier;
lowercase = null == identifier ? null : identifier.toLowerCase();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* <p>It might be null.</p>
* <p>It's case-insensitive.</p>
*/
public class SchemaName extends IdentifierName {
public class SchemaName extends CaseInsensitiveIdentifier {

public SchemaName(@Nullable final String schemaName) {
super(schemaName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public SchemaTableName(final String schemaName, final String tableName) {
* @return text
*/
public String marshal() {
String schemaName = this.schemaName.getOriginal();
return null == schemaName ? tableName.getOriginal() : schemaName + "." + tableName.getOriginal();
String schemaName = this.schemaName.toString();
return null == schemaName ? tableName.toString() : schemaName + "." + tableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* <p>It might be logic table name or actual table name.</p>
* <p>It's case-insensitive.</p>
*/
public class TableName extends IdentifierName {
public class TableName extends CaseInsensitiveIdentifier {

public TableName(final String tableName) {
super(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PipelineMetaDataNode {

private static final String JOB_PATTERN_PREFIX = DataPipelineNodePath.DATA_PIPELINE_ROOT + "/jobs/(j\\d{2}\\d{2}[0-9a-z]+)";
private static final String JOB_PATTERN_PREFIX = PipelineNodePath.DATA_PIPELINE_ROOT + "/jobs/(j\\d{2}\\d{2}[0-9a-z]+)";

public static final Pattern CONFIG_PATTERN = Pattern.compile(JOB_PATTERN_PREFIX + "/config");

Expand All @@ -47,8 +47,8 @@ public static String getMetaDataDataSourcesPath(final JobType jobType) {

private static String getMetaDataRootPath(final JobType jobType) {
return null == jobType
? String.join("/", DataPipelineNodePath.DATA_PIPELINE_ROOT, "metadata")
: String.join("/", DataPipelineNodePath.DATA_PIPELINE_ROOT, jobType.getType().toLowerCase(), "metadata");
? String.join("/", PipelineNodePath.DATA_PIPELINE_ROOT, "metadata")
: String.join("/", PipelineNodePath.DATA_PIPELINE_ROOT, jobType.getType().toLowerCase(), "metadata");
}

/**
Expand All @@ -72,7 +72,7 @@ public static String getElasticJobNamespace() {
}

private static String getJobsPath() {
return String.join("/", DataPipelineNodePath.DATA_PIPELINE_ROOT, "jobs");
return String.join("/", PipelineNodePath.DATA_PIPELINE_ROOT, "jobs");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public final class PipelineMetaDataNodeWatcher {
private PipelineMetaDataNodeWatcher(final PipelineContextKey contextKey) {
listenerMap.putAll(ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedEventHandler.class)
.stream().collect(Collectors.toMap(PipelineMetaDataChangedEventHandler::getKeyPattern, each -> each)));
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watch(DataPipelineNodePath.DATA_PIPELINE_ROOT, this::dispatchEvent);
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watch(PipelineNodePath.DATA_PIPELINE_ROOT, this::dispatchEvent);
}

private void dispatchEvent(final DataChangedEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import lombok.NoArgsConstructor;

/**
* Data pipeline node path.
* Pipeline node path.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class DataPipelineNodePath {
public final class PipelineNodePath {

/**
* Data pipeline root path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ public TableDataConsistencyCheckResult checkSingleTableInventoryData() {

private TableDataConsistencyCheckResult checkSingleTableInventoryData(final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
SingleTableInventoryCalculateParameter sourceParam = new SingleTableInventoryCalculateParameter(param.getSourceDataSource(), param.getSourceTable(),
param.getColumnNames(), param.getUniqueKeys(), param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName().getOriginal()));
param.getColumnNames(), param.getUniqueKeys(), param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName().toString()));
SingleTableInventoryCalculateParameter targetParam = new SingleTableInventoryCalculateParameter(param.getTargetDataSource(), param.getTargetTable(),
param.getColumnNames(), param.getUniqueKeys(), param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName().getOriginal()));
param.getColumnNames(), param.getUniqueKeys(), param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName().toString()));
SingleTableInventoryCalculator sourceCalculator = buildSingleTableInventoryCalculator();
calculators.add(sourceCalculator);
SingleTableInventoryCalculator targetCalculator = buildSingleTableInventoryCalculator();
Expand Down Expand Up @@ -104,10 +104,10 @@ private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Iter
break;
}
if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName().getOriginal(), sourceCalculatedResult.getMaxUniqueKeyValue().get());
param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName().toString(), sourceCalculatedResult.getMaxUniqueKeyValue().get());
}
if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName().getOriginal(), targetCalculatedResult.getMaxUniqueKeyValue().get());
param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName().toString(), targetCalculatedResult.getMaxUniqueKeyValue().get());
}
param.getProgressContext().onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public DatabaseType getDatabaseType() {
* @return schema name
*/
public String getSchemaName() {
return table.getSchemaName().getOriginal();
return table.getSchemaName().toString();
}

/**
Expand All @@ -78,7 +78,7 @@ public String getSchemaName() {
* @return logic table name
*/
public String getLogicTableName() {
return table.getTableName().getOriginal();
return table.getTableName().toString();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ private Collection<InventoryDumperContext> splitByTable(final InventoryDumperCon
dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().forEach((key, value) -> {
InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext.getCommonContext());
// use original table name, for metadata loader, since some database table name case-sensitive
inventoryDumperContext.setActualTableName(key.getOriginal());
inventoryDumperContext.setLogicTableName(value.getOriginal());
inventoryDumperContext.setActualTableName(key.toString());
inventoryDumperContext.setLogicTableName(value.toString());
inventoryDumperContext.getCommonContext().setPosition(new PlaceholderPosition());
inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) thro
PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(targetDatabaseType);
Collection<String> createdSchemaNames = new HashSet<>();
for (CreateTableEntry each : createTableConfig.getCreateTableEntries()) {
String targetSchemaName = each.getTargetName().getSchemaName().getOriginal();
String targetSchemaName = each.getTargetName().getSchemaName().toString();
if (null == targetSchemaName || targetSchemaName.equalsIgnoreCase(defaultSchema) || createdSchemaNames.contains(targetSchemaName)) {
continue;
}
Expand Down Expand Up @@ -117,9 +117,9 @@ protected final String getCreateTargetTableSQL(final CreateTableEntry createTabl
final SQLParserEngine sqlParserEngine) throws SQLException {
DatabaseType databaseType = createTableEntry.getSourceDataSourceConfig().getDatabaseType();
DataSource sourceDataSource = dataSourceManager.getDataSource(createTableEntry.getSourceDataSourceConfig());
String schemaName = createTableEntry.getSourceName().getSchemaName().getOriginal();
String sourceTableName = createTableEntry.getSourceName().getTableName().getOriginal();
String targetTableName = createTableEntry.getTargetName().getTableName().getOriginal();
String schemaName = createTableEntry.getSourceName().getSchemaName().toString();
String sourceTableName = createTableEntry.getSourceName().getTableName().toString();
String targetTableName = createTableEntry.getTargetName().getTableName().toString();
PipelineDDLGenerator generator = new PipelineDDLGenerator();
return generator.generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private Serializable handleValue(final PipelineColumnMetaData columnMetaData, fi
}

private DataRecord createDataRecord(final String type, final AbstractRowsEvent rowsEvent, final int columnCount) {
String tableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal();
String tableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).toString();
IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId());
DataRecord result = new DataRecord(type, tableName, position, columnCount);
result.setActualTableName(rowsEvent.getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private DataRecord handleDeleteRowEvent(final DeleteRowEvent event, final Pipeli
}

private DataRecord createDataRecord(final String type, final AbstractRowEvent rowsEvent, final int columnCount) {
String tableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal();
String tableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).toString();
DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(), tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
result.setActualTableName(rowsEvent.getTableName());
result.setCsn(rowsEvent.getCsn());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Stri
ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(dataNode.getSchemaName(), dataNode.getTableName()));
List<String> columnNames = tableMetaData.getColumnNames();
List<PipelineColumnMetaData> uniqueKeys = PipelineTableMetaDataUtils.getUniqueKeyColumns(
sourceTable.getSchemaName().getOriginal(), sourceTable.getTableName().getOriginal(), metaDataLoader);
sourceTable.getSchemaName().toString(), sourceTable.getTableName().toString(), metaDataLoader);
TableInventoryCheckParameter param = new TableInventoryCheckParameter(
jobConfig.getJobId(), sourceDataSource, targetDataSource, sourceTable, targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm, progressContext);
TableInventoryChecker tableInventoryChecker = tableChecker.buildTableInventoryChecker(param);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private String getOrderTableNameWithSchema(final DialectDatabaseMetaData dialect

private void assertDataMatched(final PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final SchemaTableName schemaTableName) {
StandardPipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(targetDataSource);
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().getOriginal(), schemaTableName.getTableName().getOriginal());
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().toString(), schemaTableName.getTableName().toString());
List<PipelineColumnMetaData> uniqueKeys = Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0)));
ConsistencyCheckJobItemProgressContext progressContext = new ConsistencyCheckJobItemProgressContext("", 0, sourceDataSource.getDatabaseType().getType());
TableInventoryCheckParameter param = new TableInventoryCheckParameter("", sourceDataSource, targetDataSource, schemaTableName, schemaTableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.DataPipelineNodePath;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
import org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
Expand Down Expand Up @@ -70,8 +70,8 @@ static void beforeClass() {
}

private static void watch() {
governanceRepositoryAPI.watch(DataPipelineNodePath.DATA_PIPELINE_ROOT, event -> {
if ((DataPipelineNodePath.DATA_PIPELINE_ROOT + "/1").equals(event.getKey())) {
governanceRepositoryAPI.watch(PipelineNodePath.DATA_PIPELINE_ROOT, event -> {
if ((PipelineNodePath.DATA_PIPELINE_ROOT + "/1").equals(event.getKey())) {
EVENT_ATOMIC_REFERENCE.set(event);
COUNT_DOWN_LATCH.countDown();
}
Expand Down Expand Up @@ -114,23 +114,23 @@ void assertPersistJobCheckResult() {

@Test
void assertDeleteJob() {
governanceRepositoryAPI.persist(DataPipelineNodePath.DATA_PIPELINE_ROOT + "/1", "");
governanceRepositoryAPI.persist(PipelineNodePath.DATA_PIPELINE_ROOT + "/1", "");
governanceRepositoryAPI.deleteJob("1");
Optional<String> actual = governanceRepositoryAPI.getJobItemProgress("1", 0);
assertFalse(actual.isPresent());
}

@Test
void assertGetChildrenKeys() {
governanceRepositoryAPI.persist(DataPipelineNodePath.DATA_PIPELINE_ROOT + "/1", "");
List<String> actual = governanceRepositoryAPI.getChildrenKeys(DataPipelineNodePath.DATA_PIPELINE_ROOT);
governanceRepositoryAPI.persist(PipelineNodePath.DATA_PIPELINE_ROOT + "/1", "");
List<String> actual = governanceRepositoryAPI.getChildrenKeys(PipelineNodePath.DATA_PIPELINE_ROOT);
assertFalse(actual.isEmpty());
assertTrue(actual.contains("1"));
}

@Test
void assertWatch() throws InterruptedException {
String key = DataPipelineNodePath.DATA_PIPELINE_ROOT + "/1";
String key = PipelineNodePath.DATA_PIPELINE_ROOT + "/1";
governanceRepositoryAPI.persist(key, "");
boolean awaitResult = COUNT_DOWN_LATCH.await(10, TimeUnit.SECONDS);
assertTrue(awaitResult);
Expand Down

0 comments on commit a73e82d

Please sign in to comment.