Skip to content

Commit

Permalink
Rename TableNameSchemaNameMapping to TableAndSchemaNameMapper (#28939)
Browse files Browse the repository at this point in the history
* Refactor ImporterConfiguration.findSchemaName()

* Add more constructor on TableNameSchemaNameMapping

* Rename TableNameSchemaNameMapping to TableAndSchemaNameMapper
  • Loading branch information
terrymanu authored Nov 4, 2023
1 parent b7169ef commit dbf81f2
Show file tree
Hide file tree
Showing 16 changed files with 75 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,30 @@
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

/**
* Table name and schema name mapping.
* Table and schema name mapper.
*/
@ToString
public final class TableNameSchemaNameMapping {
public final class TableAndSchemaNameMapper {

private final Map<LogicTableName, String> mapping;

public TableNameSchemaNameMapping(final Map<String, String> tableSchemaMap) {
public TableAndSchemaNameMapper(final Map<String, String> tableSchemaMap) {
mapping = null == tableSchemaMap ? Collections.emptyMap() : getLogicTableNameMap(tableSchemaMap);
}

public TableAndSchemaNameMapper(final Collection<String> tableNames) {
Map<String, String> tableNameSchemaMap = tableNames.stream().map(each -> each.split("\\.")).filter(split -> split.length > 1).collect(Collectors.toMap(split -> split[1], split -> split[0]));
mapping = getLogicTableNameMap(tableNameSchemaMap);
}

private Map<LogicTableName, String> getLogicTableNameMap(final Map<String, String> tableSchemaMap) {
Map<LogicTableName, String> result = new HashMap<>(tableSchemaMap.size(), 1F);
for (Entry<String, String> entry : tableSchemaMap.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
Expand All @@ -33,7 +33,7 @@
*/
@Getter
@Setter
@ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"})
@ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"})
public abstract class DumperCommonContext {

private String dataSourceName;
Expand All @@ -42,7 +42,7 @@ public abstract class DumperCommonContext {

private Map<ActualTableName, LogicTableName> tableNameMap;

private TableNameSchemaNameMapping tableNameSchemaNameMapping;
private TableAndSchemaNameMapper tableAndSchemaNameMapper;

private IngestPosition position;

Expand Down Expand Up @@ -77,7 +77,7 @@ public boolean containsTable(final String actualTableName) {
* @return schema name. nullable
*/
public String getSchemaName(final LogicTableName logicTableName) {
return tableNameSchemaNameMapping.getSchemaName(logicTableName);
return tableAndSchemaNameMapper.getSchemaName(logicTableName);
}

/**
Expand All @@ -87,6 +87,6 @@ public String getSchemaName(final LogicTableName logicTableName) {
* @return schema name, can be nullable
*/
public String getSchemaName(final ActualTableName actualTableName) {
return tableNameSchemaNameMapping.getSchemaName(getLogicTableName(actualTableName));
return tableAndSchemaNameMapper.getSchemaName(getLogicTableName(actualTableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public InventoryDumperContext(final DumperCommonContext dumperContext) {
setDataSourceName(dumperContext.getDataSourceName());
setDataSourceConfig(dumperContext.getDataSourceConfig());
setTableNameMap(dumperContext.getTableNameMap());
setTableNameSchemaNameMapping(dumperContext.getTableNameSchemaNameMapping());
setTableAndSchemaNameMapper(dumperContext.getTableAndSchemaNameMapper());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -30,16 +32,21 @@ class TableNameSchemaNameMappingTest {

@Test
void assertConstructFromNull() {
assertDoesNotThrow(() -> new TableNameSchemaNameMapping(null));
assertDoesNotThrow(() -> new TableAndSchemaNameMapper((Map<String, String>) null));
}

@Test
void assertConstructFromValueNullMap() {
assertNull(new TableNameSchemaNameMapping(Collections.singletonMap("t_order", null)).getSchemaName("t_order"));
assertNull(new TableAndSchemaNameMapper(Collections.singletonMap("t_order", null)).getSchemaName("t_order"));
}

@Test
void assertConstructFromMap() {
assertThat(new TableNameSchemaNameMapping(Collections.singletonMap("t_order", "public")).getSchemaName("t_order"), is("public"));
assertThat(new TableAndSchemaNameMapper(Collections.singletonMap("t_order", "public")).getSchemaName("t_order"), is("public"));
}

@Test
void assertConstructFromCollection() {
assertThat(new TableAndSchemaNameMapper(Arrays.asList("public.t_order", "t_order_item")).getSchemaName("t_order"), is("public"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
Expand All @@ -30,6 +30,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -38,15 +39,15 @@
*/
@RequiredArgsConstructor
@Getter
@ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"})
@ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"})
public final class ImporterConfiguration {

private final PipelineDataSourceConfiguration dataSourceConfig;

// TODO columnName case-insensitive?
private final Map<LogicTableName, Set<String>> shardingColumnsMap;

private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
private final TableAndSchemaNameMapper tableAndSchemaNameMapper;

private final int batchSize;

Expand Down Expand Up @@ -76,13 +77,13 @@ public Set<String> getShardingColumns(final String logicTableName) {
}

/**
* Get schema name.
* Find schema name.
*
* @param logicTableName logic table name
* @return schema name. nullable
* @return schema name
*/
public String getSchemaName(final LogicTableName logicTableName) {
public Optional<String> findSchemaName(final String logicTableName) {
DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(dataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
return dialectDatabaseMetaData.isSchemaAvailable() ? tableNameSchemaNameMapping.getSchemaName(logicTableName) : null;
return dialectDatabaseMetaData.isSchemaAvailable() ? Optional.of(tableAndSchemaNameMapper.getSchemaName(logicTableName)) : Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
Expand Down Expand Up @@ -169,7 +168,7 @@ private void doFlush(final DataSource dataSource, final List<DataRecord> buffer)

private void executeBatchInsert(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
DataRecord dataRecord = dataRecords.get(0);
String insertSql = importSQLBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), dataRecord);
String insertSql = importSQLBuilder.buildInsertSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord);
try (PreparedStatement preparedStatement = connection.prepareStatement(insertSql)) {
batchInsertStatement.set(preparedStatement);
preparedStatement.setQueryTimeout(30);
Expand All @@ -185,10 +184,6 @@ private void executeBatchInsert(final Connection connection, final List<DataReco
}
}

private String getSchemaName(final String logicTableName) {
return getImporterConfig().getSchemaName(new LogicTableName(logicTableName));
}

private void executeUpdate(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
for (DataRecord each : dataRecords) {
executeUpdate(connection, each);
Expand All @@ -199,7 +194,7 @@ private void executeUpdate(final Connection connection, final DataRecord dataRec
Set<String> shardingColumns = importerConfig.getShardingColumns(dataRecord.getTableName());
List<Column> conditionColumns = RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
List<Column> setColumns = dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
String updateSql = importSQLBuilder.buildUpdateSQL(getSchemaName(dataRecord.getTableName()), dataRecord, conditionColumns);
String updateSql = importSQLBuilder.buildUpdateSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord, conditionColumns);
try (PreparedStatement preparedStatement = connection.prepareStatement(updateSql)) {
updateStatement.set(preparedStatement);
for (int i = 0; i < setColumns.size(); i++) {
Expand All @@ -226,7 +221,7 @@ private void executeUpdate(final Connection connection, final DataRecord dataRec

private void executeBatchDelete(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
DataRecord dataRecord = dataRecords.get(0);
String deleteSQL = importSQLBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()), dataRecord,
String deleteSQL = importSQLBuilder.buildDeleteSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord,
RecordUtils.extractConditionColumns(dataRecord, importerConfig.getShardingColumns(dataRecord.getTableName())));
try (PreparedStatement preparedStatement = connection.prepareStatement(deleteSQL)) {
batchDeleteStatement.set(preparedStatement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public static void checkTargetDataSource(final DatabaseType databaseType, final
}
DataSourceCheckEngine dataSourceCheckEngine = new DataSourceCheckEngine(databaseType);
dataSourceCheckEngine.checkConnection(targetDataSources);
dataSourceCheckEngine.checkTargetTable(targetDataSources, importerConfig.getTableNameSchemaNameMapping(), importerConfig.getLogicTableNames());
dataSourceCheckEngine.checkTargetTable(targetDataSources, importerConfig.getTableAndSchemaNameMapper(), importerConfig.getLogicTableNames());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;

import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
Expand Down Expand Up @@ -66,17 +66,17 @@ public void checkConnection(final Collection<? extends DataSource> dataSources)
* Check table is empty.
*
* @param dataSources data sources
* @param tableNameSchemaNameMapping mapping
* @param tableAndSchemaNameMapper mapping
* @param logicTableNames logic table names
* @throws PrepareJobWithInvalidConnectionException prepare job with invalid connection exception
*/
// TODO rename to common usage name
// TODO Merge schemaName and tableNames
public void checkTargetTable(final Collection<? extends DataSource> dataSources, final TableNameSchemaNameMapping tableNameSchemaNameMapping, final Collection<String> logicTableNames) {
public void checkTargetTable(final Collection<? extends DataSource> dataSources, final TableAndSchemaNameMapper tableAndSchemaNameMapper, final Collection<String> logicTableNames) {
try {
for (DataSource each : dataSources) {
for (String tableName : logicTableNames) {
if (!checkEmpty(each, tableNameSchemaNameMapping.getSchemaName(tableName), tableName)) {
if (!checkEmpty(each, tableAndSchemaNameMapper.getSchemaName(tableName), tableName)) {
throw new PrepareJobWithTargetTableNotEmptyException(tableName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;

import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
Expand Down Expand Up @@ -85,7 +85,7 @@ void assertCheckTargetTable() throws SQLException {
when(dataSource.getConnection()).thenReturn(connection);
when(connection.prepareStatement("SELECT * FROM t_order LIMIT 1")).thenReturn(preparedStatement);
when(preparedStatement.executeQuery()).thenReturn(resultSet);
dataSourceCheckEngine.checkTargetTable(dataSources, new TableNameSchemaNameMapping(Collections.emptyMap()), Collections.singletonList("t_order"));
dataSourceCheckEngine.checkTargetTable(dataSources, new TableAndSchemaNameMapper(Collections.emptyMap()), Collections.singletonList("t_order"));
}

@Test
Expand All @@ -95,6 +95,6 @@ void assertCheckTargetTableFailed() throws SQLException {
when(preparedStatement.executeQuery()).thenReturn(resultSet);
when(resultSet.next()).thenReturn(true);
assertThrows(PrepareJobWithTargetTableNotEmptyException.class,
() -> dataSourceCheckEngine.checkTargetTable(dataSources, new TableNameSchemaNameMapping(Collections.emptyMap()), Collections.singletonList("t_order")));
() -> dataSourceCheckEngine.checkTargetTable(dataSources, new TableAndSchemaNameMapper(Collections.emptyMap()), Collections.singletonList("t_order")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.mysql.ingest;

import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
Expand Down Expand Up @@ -92,7 +92,7 @@ private IncrementalDumperContext createDumperContext() {
IncrementalDumperContext result = new IncrementalDumperContext();
result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root"));
result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")));
result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap()));
result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap()));
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.postgresql.ingest;

import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
Expand Down Expand Up @@ -108,7 +108,7 @@ private IncrementalDumperContext createDumperContext(final String jdbcUrl, final
result.setJobId("0101123456");
result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password));
result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order")));
result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap()));
result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap()));
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;

import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
Expand Down Expand Up @@ -87,7 +87,7 @@ private IncrementalDumperContext mockDumperContext() {
IncrementalDumperContext result = new IncrementalDumperContext();
result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"));
result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")));
result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap()));
result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap()));
return result;
}

Expand Down
Loading

0 comments on commit dbf81f2

Please sign in to comment.