Skip to content

Commit

Permalink
Rename DumperConfiguration to DumperContext (#28925)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 3, 2023
1 parent 5f57f08 commit ef2647e
Show file tree
Hide file tree
Showing 42 changed files with 323 additions and 329 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.api.config;
package org.apache.shardingsphere.data.pipeline.api.context;

import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.api.config.ingest;
package org.apache.shardingsphere.data.pipeline.api.context.ingest;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
Expand All @@ -34,12 +34,12 @@
import java.util.stream.Collectors;

/**
* Base dumper configuration.
* Base dumper context.
*/
@Getter
@Setter
@ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"})
public abstract class BaseDumperConfiguration {
public abstract class BaseDumperContext {

private String dataSourceName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.api.config.ingest;
package org.apache.shardingsphere.data.pipeline.api.context.ingest;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

/**
* Incremental dumper configuration.
* Incremental dumper context.
*/
@Getter
@Setter
@ToString(callSuper = true)
public class IncrementalDumperConfiguration extends BaseDumperConfiguration {
public class IncrementalDumperContext extends BaseDumperContext {

private String jobId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.api.config.ingest;
package org.apache.shardingsphere.data.pipeline.api.context.ingest;

import lombok.Getter;
import lombok.Setter;
Expand All @@ -26,12 +26,12 @@
import java.util.List;

/**
* Inventory dumper configuration.
* Inventory dumper context.
*/
@Getter
@Setter
@ToString(callSuper = true)
public final class InventoryDumperConfiguration extends BaseDumperConfiguration {
public final class InventoryDumperContext extends BaseDumperContext {

private String actualTableName;

Expand All @@ -51,12 +51,12 @@ public final class InventoryDumperConfiguration extends BaseDumperConfiguration

private JobRateLimitAlgorithm rateLimitAlgorithm;

public InventoryDumperConfiguration(final BaseDumperConfiguration dumperConfig) {
setDataSourceName(dumperConfig.getDataSourceName());
setDataSourceConfig(dumperConfig.getDataSourceConfig());
setTableNameMap(dumperConfig.getTableNameMap());
setTableNameSchemaNameMapping(dumperConfig.getTableNameSchemaNameMapping());
setTargetTableColumnsMap(dumperConfig.getTargetTableColumnsMap());
public InventoryDumperContext(final BaseDumperContext dumperContext) {
setDataSourceName(dumperContext.getDataSourceName());
setDataSourceConfig(dumperContext.getDataSourceConfig());
setTableNameMap(dumperContext.getTableNameMap());
setTableNameSchemaNameMapping(dumperContext.getTableNameSchemaNameMapping());
setTargetTableColumnsMap(dumperContext.getTargetTableColumnsMap());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
Expand All @@ -34,13 +34,13 @@ public interface IncrementalDumperCreator extends DatabaseTypedSPI {
/**
* Create incremental dumper.
*
* @param config incremental dumper configuration
* @param context incremental dumper context
* @param position position
* @param channel channel
* @param metaDataLoader meta data loader
* @return incremental dumper
*/
IncrementalDumper createIncrementalDumper(IncrementalDumperConfiguration config, IngestPosition position, PipelineChannel channel, PipelineTableMetaDataLoader metaDataLoader);
IncrementalDumper createIncrementalDumper(IncrementalDumperContext context, IngestPosition position, PipelineChannel channel, PipelineTableMetaDataLoader metaDataLoader);

/**
* Whether support incremental dump.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.api.config;
package org.apache.shardingsphere.data.pipeline.api.context;

import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.Collections;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -36,17 +35,11 @@ void assertConstructFromNull() {

@Test
void assertConstructFromValueNullMap() {
Map<String, String> map = new HashMap<>();
map.put("t_order", null);
TableNameSchemaNameMapping mapping = new TableNameSchemaNameMapping(map);
assertNull(mapping.getSchemaName("t_order"));
assertNull(new TableNameSchemaNameMapping(Collections.singletonMap("t_order", null)).getSchemaName("t_order"));
}

@Test
void assertConstructFromMap() {
Map<String, String> map = new HashMap<>();
map.put("t_order", "public");
TableNameSchemaNameMapping mapping = new TableNameSchemaNameMapping(map);
assertThat(mapping.getSchemaName("t_order"), is("public"));
assertThat(new TableNameSchemaNameMapping(Collections.singletonMap("t_order", "public")).getSchemaName("t_order"), is("public"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@

package org.apache.shardingsphere.data.pipeline.common.config.ingest;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;

/**
* Incremental dumper configuration creator.
* Incremental dumper context creator.
*/
public interface IncrementalDumperConfigurationCreator {
public interface IncrementalDumperContextCreator {

/**
* Create dumper configuration.
* Create incremental dumper context.
*
* @param jobDataNodeLine job data node line
* @return dumper configuration
* @return created incremental dumper context
*/
IncrementalDumperConfiguration createDumperConfiguration(JobDataNodeLine jobDataNodeLine);
IncrementalDumperContext createDumperContext(JobDataNodeLine jobDataNodeLine);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.context.ingest.InventoryDumperContext;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
Expand Down Expand Up @@ -71,7 +71,7 @@
public final class InventoryDumper extends AbstractLifecycleExecutor implements Dumper {

@Getter(AccessLevel.PROTECTED)
private final InventoryDumperConfiguration dumperConfig;
private final InventoryDumperContext dumperContext;

private final PipelineChannel channel;

Expand All @@ -85,38 +85,38 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements

private final AtomicReference<Statement> dumpStatement = new AtomicReference<>();

public InventoryDumper(final InventoryDumperConfiguration dumperConfig, final PipelineChannel channel, final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
this.dumperConfig = dumperConfig;
public InventoryDumper(final InventoryDumperContext dumperContext, final PipelineChannel channel, final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
this.dumperContext = dumperContext;
this.channel = channel;
this.dataSource = dataSource;
DatabaseType databaseType = dumperConfig.getDataSourceConfig().getDatabaseType();
DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType();
inventoryDumpSQLBuilder = new PipelineInventoryDumpSQLBuilder(databaseType);
columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
this.metaDataLoader = metaDataLoader;
}

@Override
protected void runBlocking() {
IngestPosition position = dumperConfig.getPosition();
IngestPosition position = dumperContext.getPosition();
if (position instanceof FinishedPosition) {
log.info("Ignored because of already finished.");
return;
}
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName());
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName());
try (Connection connection = dataSource.getConnection()) {
dump(tableMetaData, connection);
} catch (final SQLException ex) {
log.error("Inventory dump, ex caught, msg={}.", ex.getMessage());
throw new IngestException("Inventory dump failed on " + dumperConfig.getActualTableName(), ex);
throw new IngestException("Inventory dump failed on " + dumperContext.getActualTableName(), ex);
}
}

@SuppressWarnings("MagicConstant")
private void dump(final PipelineTableMetaData tableMetaData, final Connection connection) throws SQLException {
int batchSize = dumperConfig.getBatchSize();
DatabaseType databaseType = dumperConfig.getDataSourceConfig().getDatabaseType();
if (null != dumperConfig.getTransactionIsolation()) {
connection.setTransactionIsolation(dumperConfig.getTransactionIsolation());
int batchSize = dumperContext.getBatchSize();
DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType();
if (null != dumperContext.getTransactionIsolation()) {
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
}
try (PreparedStatement preparedStatement = JDBCStreamQueryBuilder.build(databaseType, connection, buildInventoryDumpSQL())) {
dumpStatement.set(preparedStatement);
Expand All @@ -126,7 +126,7 @@ private void dump(final PipelineTableMetaData tableMetaData, final Connection co
setParameters(preparedStatement);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
int rowCount = 0;
JobRateLimitAlgorithm rateLimitAlgorithm = dumperConfig.getRateLimitAlgorithm();
JobRateLimitAlgorithm rateLimitAlgorithm = dumperContext.getRateLimitAlgorithm();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
List<Record> dataRecords = new LinkedList<>();
while (resultSet.next()) {
Expand All @@ -153,34 +153,34 @@ private void dump(final PipelineTableMetaData tableMetaData, final Connection co
}

private String buildInventoryDumpSQL() {
if (!Strings.isNullOrEmpty(dumperConfig.getQuerySQL())) {
return dumperConfig.getQuerySQL();
if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) {
return dumperContext.getQuerySQL();
}
LogicTableName logicTableName = new LogicTableName(dumperConfig.getLogicTableName());
String schemaName = dumperConfig.getSchemaName(logicTableName);
if (!dumperConfig.hasUniqueKey()) {
return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperConfig.getActualTableName());
LogicTableName logicTableName = new LogicTableName(dumperContext.getLogicTableName());
String schemaName = dumperContext.getSchemaName(logicTableName);
if (!dumperContext.hasUniqueKey()) {
return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName());
}
PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>) dumperConfig.getPosition();
PipelineColumnMetaData firstColumn = dumperConfig.getUniqueKeyColumns().get(0);
Collection<String> columnNames = dumperConfig.getColumnNames(logicTableName);
PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>) dumperContext.getPosition();
PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
Collection<String> columnNames = dumperContext.getColumnNames(logicTableName);
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
if (null != primaryKeyPosition.getBeginValue() && null != primaryKeyPosition.getEndValue()) {
return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
}
if (null != primaryKeyPosition.getBeginValue() && null == primaryKeyPosition.getEndValue()) {
return inventoryDumpSQLBuilder.buildUnlimitedDivisibleSQL(schemaName, dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
return inventoryDumpSQLBuilder.buildUnlimitedDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
}
}
return inventoryDumpSQLBuilder.buildIndivisibleSQL(schemaName, dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
return inventoryDumpSQLBuilder.buildIndivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
}

private void setParameters(final PreparedStatement preparedStatement) throws SQLException {
if (!dumperConfig.hasUniqueKey()) {
if (!dumperContext.hasUniqueKey()) {
return;
}
PipelineColumnMetaData firstColumn = dumperConfig.getUniqueKeyColumns().get(0);
PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperConfig.getPosition();
PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperContext.getPosition();
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) && null != position.getBeginValue() && null != position.getEndValue()) {
preparedStatement.setObject(1, position.getBeginValue());
preparedStatement.setObject(2, position.getEndValue());
Expand All @@ -198,8 +198,8 @@ private void setParameters(final PreparedStatement preparedStatement) throws SQL

private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData) throws SQLException {
int columnCount = resultSetMetaData.getColumnCount();
DataRecord result = new DataRecord(IngestDataChangeType.INSERT, dumperConfig.getLogicTableName(), newPosition(resultSet), columnCount);
List<String> insertColumnNames = Optional.ofNullable(dumperConfig.getInsertColumnNames()).orElse(Collections.emptyList());
DataRecord result = new DataRecord(IngestDataChangeType.INSERT, dumperContext.getLogicTableName(), newPosition(resultSet), columnCount);
List<String> insertColumnNames = Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList());
ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || insertColumnNames.size() == resultSetMetaData.getColumnCount(),
() -> new PipelineInvalidParameterException("Insert colum names count not equals ResultSet column count"));
for (int i = 1; i <= columnCount; i++) {
Expand All @@ -212,8 +212,8 @@ private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMeta
}

private IngestPosition newPosition(final ResultSet resultSet) throws SQLException {
return dumperConfig.hasUniqueKey()
? PrimaryKeyPositionFactory.newInstance(resultSet.getObject(dumperConfig.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition<?>) dumperConfig.getPosition()).getEndValue())
return dumperContext.hasUniqueKey()
? PrimaryKeyPositionFactory.newInstance(resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition<?>) dumperContext.getPosition()).getEndValue())
: new PlaceholderPosition();
}

Expand Down
Loading

0 comments on commit ef2647e

Please sign in to comment.