Skip to content

Commit

Permalink
Refactor DumperCommonContext
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 5, 2023
1 parent 405379c commit 3ba67fb
Show file tree
Hide file tree
Showing 27 changed files with 139 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
@Getter
@Setter
@ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"})
public abstract class DumperCommonContext {
public final class DumperCommonContext {

private String dataSourceName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@
package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;

/**
* Incremental dumper context.
*/
@RequiredArgsConstructor
@Getter
@Setter
@ToString(callSuper = true)
public final class IncrementalDumperContext extends DumperCommonContext {
@ToString
public final class IncrementalDumperContext {

private final DumperCommonContext commonContext;

private String jobId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
*/
@Getter
@Setter
@ToString(callSuper = true)
public final class InventoryDumperContext extends DumperCommonContext {
@ToString
public final class InventoryDumperContext {

private final DumperCommonContext commonContext;

private String actualTableName;

Expand All @@ -51,11 +53,12 @@ public final class InventoryDumperContext extends DumperCommonContext {

private JobRateLimitAlgorithm rateLimitAlgorithm;

public InventoryDumperContext(final DumperCommonContext dumperContext) {
setDataSourceName(dumperContext.getDataSourceName());
setDataSourceConfig(dumperContext.getDataSourceConfig());
setTableNameMapper(dumperContext.getTableNameMapper());
setTableAndSchemaNameMapper(dumperContext.getTableAndSchemaNameMapper());
public InventoryDumperContext(final DumperCommonContext commonContext) {
this.commonContext = new DumperCommonContext();
commonContext.setDataSourceName(commonContext.getDataSourceName());
commonContext.setDataSourceConfig(commonContext.getDataSourceConfig());
commonContext.setTableNameMapper(commonContext.getTableNameMapper());
commonContext.setTableAndSchemaNameMapper(commonContext.getTableAndSchemaNameMapper());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,21 @@ public InventoryDumper(final InventoryDumperContext dumperContext, final Pipelin
this.dumperContext = dumperContext;
this.channel = channel;
this.dataSource = dataSource;
DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType();
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
inventoryDumpSQLBuilder = new PipelineInventoryDumpSQLBuilder(databaseType);
columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
this.metaDataLoader = metaDataLoader;
}

@Override
protected void runBlocking() {
IngestPosition position = dumperContext.getPosition();
IngestPosition position = dumperContext.getCommonContext().getPosition();
if (position instanceof FinishedPosition) {
log.info("Ignored because of already finished.");
return;
}
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName());
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName());
try (Connection connection = dataSource.getConnection()) {
dump(tableMetaData, connection);
} catch (final SQLException ex) {
Expand All @@ -114,7 +114,7 @@ protected void runBlocking() {
@SuppressWarnings("MagicConstant")
private void dump(final PipelineTableMetaData tableMetaData, final Connection connection) throws SQLException {
int batchSize = dumperContext.getBatchSize();
DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType();
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
if (null != dumperContext.getTransactionIsolation()) {
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
}
Expand Down Expand Up @@ -156,11 +156,11 @@ private String buildInventoryDumpSQL() {
if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) {
return dumperContext.getQuerySQL();
}
String schemaName = dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
if (!dumperContext.hasUniqueKey()) {
return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName());
}
PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>) dumperContext.getPosition();
PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>) dumperContext.getCommonContext().getPosition();
PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
Collection<String> columnNames = Collections.singleton("*");
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
Expand All @@ -179,7 +179,7 @@ private void setParameters(final PreparedStatement preparedStatement) throws SQL
return;
}
PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperContext.getPosition();
PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperContext.getCommonContext().getPosition();
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) && null != position.getBeginValue() && null != position.getEndValue()) {
preparedStatement.setObject(1, position.getBeginValue());
preparedStatement.setObject(2, position.getEndValue());
Expand Down Expand Up @@ -212,7 +212,8 @@ private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMeta

private IngestPosition newPosition(final ResultSet resultSet) throws SQLException {
return dumperContext.hasUniqueKey()
? PrimaryKeyPositionFactory.newInstance(resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition<?>) dumperContext.getPosition()).getEndValue())
? PrimaryKeyPositionFactory.newInstance(
resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition<?>) dumperContext.getCommonContext().getPosition()).getEndValue())
: new PlaceholderPosition();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public final class InventoryRecordsCountCalculator {
* @throws SplitPipelineJobByUniqueKeyException if there's exception from database
*/
public static long getTableRecordsCount(final InventoryDumperContext dumperContext, final PipelineDataSourceWrapper dataSource) {
String schemaName = dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
String actualTableName = dumperContext.getActualTableName();
PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(dataSource.getDatabaseType());
Optional<String> sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public List<InventoryTask> splitInventoryData(final InventoryIncrementalJobItemC
long startTimeMillis = System.currentTimeMillis();
InventoryIncrementalProcessContext processContext = jobItemContext.getJobProcessContext();
for (InventoryDumperContext each : splitInventoryDumperContext(jobItemContext)) {
AtomicReference<IngestPosition> position = new AtomicReference<>(each.getPosition());
AtomicReference<IngestPosition> position = new AtomicReference<>(each.getCommonContext().getPosition());
PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position);
Dumper dumper = new InventoryDumper(each, channel, sourceDataSource, jobItemContext.getSourceMetaDataLoader());
Importer importer = new SingleChannelConsumerImporter(channel, importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(), jobItemContext);
Expand All @@ -110,12 +110,12 @@ public Collection<InventoryDumperContext> splitInventoryDumperContext(final Inve

private Collection<InventoryDumperContext> splitByTable(final InventoryDumperContext dumperContext) {
Collection<InventoryDumperContext> result = new LinkedList<>();
dumperContext.getTableNameMapper().getTableNameMap().forEach((key, value) -> {
InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext);
dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().forEach((key, value) -> {
InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext.getCommonContext());
// use original table name, for metadata loader, since some database table name case-sensitive
inventoryDumperContext.setActualTableName(key.getOriginal());
inventoryDumperContext.setLogicTableName(value.getOriginal());
inventoryDumperContext.setPosition(new PlaceholderPosition());
inventoryDumperContext.getCommonContext().setPosition(new PlaceholderPosition());
inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
result.add(inventoryDumperContext);
Expand All @@ -126,7 +126,7 @@ private Collection<InventoryDumperContext> splitByTable(final InventoryDumperCon
private Collection<InventoryDumperContext> splitByPrimaryKey(final InventoryDumperContext dumperContext, final InventoryIncrementalJobItemContext jobItemContext,
final PipelineDataSourceWrapper dataSource) {
if (null == dumperContext.getUniqueKeyColumns()) {
String schemaName = dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
String actualTableName = dumperContext.getActualTableName();
List<PipelineColumnMetaData> uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName, jobItemContext.getSourceMetaDataLoader());
dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
Expand All @@ -139,8 +139,8 @@ private Collection<InventoryDumperContext> splitByPrimaryKey(final InventoryDump
Collection<IngestPosition> inventoryPositions = getInventoryPositions(dumperContext, jobItemContext, dataSource);
int i = 0;
for (IngestPosition each : inventoryPositions) {
InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext);
splitDumperContext.setPosition(each);
InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext.getCommonContext());
splitDumperContext.getCommonContext().setPosition(each);
splitDumperContext.setShardingItem(i++);
splitDumperContext.setActualTableName(dumperContext.getActualTableName());
splitDumperContext.setLogicTableName(dumperContext.getLogicTableName());
Expand Down Expand Up @@ -204,7 +204,7 @@ private Range<Long> getUniqueKeyValuesRange(final InventoryIncrementalJobItemCon
String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName();
PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName(), uniqueKey);
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName(), uniqueKey);
try (
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ public static IngestPosition getIncrementalPosition(final JobItemIncrementalTask
return position.get();
}
}
DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType();
DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig());
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
return DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType).init(dataSource, dumperContext.getJobId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public final class PipelineTaskUtils {
* @return inventory task id
*/
public static String generateInventoryTaskId(final InventoryDumperContext inventoryDumperContext) {
String result = String.format("%s.%s", inventoryDumperContext.getDataSourceName(), inventoryDumperContext.getActualTableName());
String result = String.format("%s.%s", inventoryDumperContext.getCommonContext().getDataSourceName(), inventoryDumperContext.getActualTableName());
return result + "#" + inventoryDumperContext.getShardingItem();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl

public MySQLIncrementalDumper(final IncrementalDumperContext dumperContext, final IngestPosition binlogPosition,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
Preconditions.checkArgument(dumperContext.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
Preconditions.checkArgument(dumperContext.getCommonContext().getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration,
"MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
this.dumperContext = dumperContext;
this.binlogPosition = (BinlogPosition) binlogPosition;
this.channel = channel;
this.metaDataLoader = metaDataLoader;
YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig()).getJdbcConfig();
YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()).getJdbcConfig();
ConnectionPropertiesParser parser = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, TypedSPILoader.getService(DatabaseType.class, "MySQL"));
ConnectionProperties connectionProps = parser.parse(jdbcConfig.getUrl(), null, null);
ConnectInfo connectInfo = new ConnectInfo(generateServerId(), connectionProps.getHostname(), connectionProps.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword());
Expand Down Expand Up @@ -132,7 +133,7 @@ private List<? extends Record> handleEvent(final AbstractBinlogEvent event) {
return Collections.singletonList(createPlaceholderRecord(event));
}
AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event;
if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperContext.getTableNameMapper().containsTable(rowsEvent.getTableName())) {
if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperContext.getCommonContext().getTableNameMapper().containsTable(rowsEvent.getTableName())) {
return Collections.singletonList(createPlaceholderRecord(event));
}
PipelineTableMetaData tableMetaData = getPipelineTableMetaData(rowsEvent.getTableName());
Expand All @@ -155,8 +156,8 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractBinlogEvent even
}

private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) {
LogicTableName logicTableName = dumperContext.getTableNameMapper().getLogicTableName(actualTableName);
return metaDataLoader.getTableMetaData(dumperContext.getTableAndSchemaNameMapper().getSchemaName(logicTableName), actualTableName);
LogicTableName logicTableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
return metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName), actualTableName);
}

private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) {
Expand Down Expand Up @@ -217,7 +218,7 @@ private Serializable handleValue(final PipelineColumnMetaData columnMetaData, fi
}

private DataRecord createDataRecord(final String type, final AbstractRowsEvent rowsEvent, final int columnCount) {
String tableName = dumperContext.getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal();
String tableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal();
IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId());
DataRecord result = new DataRecord(type, tableName, position, columnCount);
result.setActualTableName(rowsEvent.getTableName());
Expand Down
Loading

0 comments on commit 3ba67fb

Please sign in to comment.