Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename DumperCommonContext #28930

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
import java.util.stream.Collectors;

/**
* Base dumper context.
* Dumper common context.
*/
@Getter
@Setter
@ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"})
public abstract class BaseDumperContext {
public final class DumperCommonContext {

private String dataSourceName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@
package org.apache.shardingsphere.data.pipeline.api.context.ingest;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;

/**
* Incremental dumper context.
*/
@RequiredArgsConstructor
@Getter
@Setter
@ToString(callSuper = true)
public class IncrementalDumperContext extends BaseDumperContext {
@ToString
public class IncrementalDumperContext {

private final DumperCommonContext commonContext;

private String jobId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
*/
@Getter
@Setter
@ToString(callSuper = true)
public final class InventoryDumperContext extends BaseDumperContext {
@ToString
public final class InventoryDumperContext {

private final DumperCommonContext commonContext;

private String actualTableName;

Expand All @@ -51,12 +53,9 @@ public final class InventoryDumperContext extends BaseDumperContext {

private JobRateLimitAlgorithm rateLimitAlgorithm;

public InventoryDumperContext(final BaseDumperContext dumperContext) {
setDataSourceName(dumperContext.getDataSourceName());
setDataSourceConfig(dumperContext.getDataSourceConfig());
setTableNameMap(dumperContext.getTableNameMap());
setTableNameSchemaNameMapping(dumperContext.getTableNameSchemaNameMapping());
setTargetTableColumnsMap(dumperContext.getTargetTableColumnsMap());
public InventoryDumperContext(final DumperCommonContext commonContext) {
this.commonContext = commonContext;
this.commonContext.setPosition(null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,21 @@ public InventoryDumper(final InventoryDumperContext dumperContext, final Pipelin
this.dumperContext = dumperContext;
this.channel = channel;
this.dataSource = dataSource;
DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType();
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
inventoryDumpSQLBuilder = new PipelineInventoryDumpSQLBuilder(databaseType);
columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
this.metaDataLoader = metaDataLoader;
}

@Override
protected void runBlocking() {
IngestPosition position = dumperContext.getPosition();
IngestPosition position = dumperContext.getCommonContext().getPosition();
if (position instanceof FinishedPosition) {
log.info("Ignored because of already finished.");
return;
}
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName());
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(
dumperContext.getCommonContext().getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName());
try (Connection connection = dataSource.getConnection()) {
dump(tableMetaData, connection);
} catch (final SQLException ex) {
Expand All @@ -114,7 +115,7 @@ protected void runBlocking() {
@SuppressWarnings("MagicConstant")
private void dump(final PipelineTableMetaData tableMetaData, final Connection connection) throws SQLException {
int batchSize = dumperContext.getBatchSize();
DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType();
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
if (null != dumperContext.getTransactionIsolation()) {
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
}
Expand Down Expand Up @@ -157,13 +158,13 @@ private String buildInventoryDumpSQL() {
return dumperContext.getQuerySQL();
}
LogicTableName logicTableName = new LogicTableName(dumperContext.getLogicTableName());
String schemaName = dumperContext.getSchemaName(logicTableName);
String schemaName = dumperContext.getCommonContext().getSchemaName(logicTableName);
if (!dumperContext.hasUniqueKey()) {
return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName());
}
PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>) dumperContext.getPosition();
PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>) dumperContext.getCommonContext().getPosition();
PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
Collection<String> columnNames = dumperContext.getColumnNames(logicTableName);
Collection<String> columnNames = dumperContext.getCommonContext().getColumnNames(logicTableName);
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
if (null != primaryKeyPosition.getBeginValue() && null != primaryKeyPosition.getEndValue()) {
return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
Expand All @@ -180,7 +181,7 @@ private void setParameters(final PreparedStatement preparedStatement) throws SQL
return;
}
PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperContext.getPosition();
PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperContext.getCommonContext().getPosition();
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) && null != position.getBeginValue() && null != position.getEndValue()) {
preparedStatement.setObject(1, position.getBeginValue());
preparedStatement.setObject(2, position.getEndValue());
Expand Down Expand Up @@ -213,7 +214,8 @@ private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMeta

private IngestPosition newPosition(final ResultSet resultSet) throws SQLException {
return dumperContext.hasUniqueKey()
? PrimaryKeyPositionFactory.newInstance(resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition<?>) dumperContext.getPosition()).getEndValue())
? PrimaryKeyPositionFactory.newInstance(
resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition<?>) dumperContext.getCommonContext().getPosition()).getEndValue())
: new PlaceholderPosition();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public final class InventoryRecordsCountCalculator {
* @throws SplitPipelineJobByUniqueKeyException if there's exception from database
*/
public static long getTableRecordsCount(final InventoryDumperContext dumperContext, final PipelineDataSourceWrapper dataSource) {
String schemaName = dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName()));
String schemaName = dumperContext.getCommonContext().getSchemaName(new LogicTableName(dumperContext.getLogicTableName()));
String actualTableName = dumperContext.getActualTableName();
PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(dataSource.getDatabaseType());
Optional<String> sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public List<InventoryTask> splitInventoryData(final InventoryIncrementalJobItemC
long startTimeMillis = System.currentTimeMillis();
InventoryIncrementalProcessContext processContext = jobItemContext.getJobProcessContext();
for (InventoryDumperContext each : splitInventoryDumperContext(jobItemContext)) {
AtomicReference<IngestPosition> position = new AtomicReference<>(each.getPosition());
AtomicReference<IngestPosition> position = new AtomicReference<>(each.getCommonContext().getPosition());
PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position);
Dumper dumper = new InventoryDumper(each, channel, sourceDataSource, jobItemContext.getSourceMetaDataLoader());
Importer importer = new SingleChannelConsumerImporter(channel, importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(), jobItemContext);
Expand All @@ -111,12 +111,12 @@ public Collection<InventoryDumperContext> splitInventoryDumperContext(final Inve

private Collection<InventoryDumperContext> splitByTable(final InventoryDumperContext dumperContext) {
Collection<InventoryDumperContext> result = new LinkedList<>();
dumperContext.getTableNameMap().forEach((key, value) -> {
InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext);
dumperContext.getCommonContext().getTableNameMap().forEach((key, value) -> {
InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext.getCommonContext());
inventoryDumperContext.getCommonContext().setPosition(new PlaceholderPosition());
// use original table name, for metadata loader, since some database table name case-sensitive
inventoryDumperContext.setActualTableName(key.getOriginal());
inventoryDumperContext.setLogicTableName(value.getOriginal());
inventoryDumperContext.setPosition(new PlaceholderPosition());
inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
result.add(inventoryDumperContext);
Expand All @@ -127,7 +127,7 @@ private Collection<InventoryDumperContext> splitByTable(final InventoryDumperCon
private Collection<InventoryDumperContext> splitByPrimaryKey(final InventoryDumperContext dumperContext, final InventoryIncrementalJobItemContext jobItemContext,
final PipelineDataSourceWrapper dataSource) {
if (null == dumperContext.getUniqueKeyColumns()) {
String schemaName = dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName()));
String schemaName = dumperContext.getCommonContext().getSchemaName(new LogicTableName(dumperContext.getLogicTableName()));
String actualTableName = dumperContext.getActualTableName();
List<PipelineColumnMetaData> uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName, jobItemContext.getSourceMetaDataLoader());
dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
Expand All @@ -140,8 +140,8 @@ private Collection<InventoryDumperContext> splitByPrimaryKey(final InventoryDump
Collection<IngestPosition> inventoryPositions = getInventoryPositions(dumperContext, jobItemContext, dataSource);
int i = 0;
for (IngestPosition each : inventoryPositions) {
InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext);
splitDumperContext.setPosition(each);
InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext.getCommonContext());
splitDumperContext.getCommonContext().setPosition(each);
splitDumperContext.setShardingItem(i++);
splitDumperContext.setActualTableName(dumperContext.getActualTableName());
splitDumperContext.setLogicTableName(dumperContext.getLogicTableName());
Expand Down Expand Up @@ -205,7 +205,7 @@ private Range<Long> getUniqueKeyValuesRange(final InventoryIncrementalJobItemCon
String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName();
PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(
dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName(), uniqueKey);
dumperContext.getCommonContext().getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName(), uniqueKey);
try (
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ public static IngestPosition getIncrementalPosition(final JobItemIncrementalTask
return position.get();
}
}
DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType();
DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig());
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
return DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType).init(dataSource, dumperContext.getJobId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public final class PipelineTaskUtils {
* @return inventory task id
*/
public static String generateInventoryTaskId(final InventoryDumperContext inventoryDumperContext) {
String result = String.format("%s.%s", inventoryDumperContext.getDataSourceName(), inventoryDumperContext.getActualTableName());
String result = String.format("%s.%s", inventoryDumperContext.getCommonContext().getDataSourceName(), inventoryDumperContext.getActualTableName());
return result + "#" + inventoryDumperContext.getShardingItem();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,13 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl

public MySQLIncrementalDumper(final IncrementalDumperContext dumperContext, final IngestPosition binlogPosition,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
Preconditions.checkArgument(dumperContext.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
Preconditions.checkArgument(
dumperContext.getCommonContext().getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
this.dumperContext = dumperContext;
this.binlogPosition = (BinlogPosition) binlogPosition;
this.channel = channel;
this.metaDataLoader = metaDataLoader;
YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig()).getJdbcConfig();
YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()).getJdbcConfig();
ConnectionPropertiesParser parser = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, TypedSPILoader.getService(DatabaseType.class, "MySQL"));
ConnectionProperties connectionProps = parser.parse(jdbcConfig.getUrl(), null, null);
ConnectInfo connectInfo = new ConnectInfo(generateServerId(), connectionProps.getHostname(), connectionProps.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword());
Expand Down Expand Up @@ -134,7 +135,7 @@ private List<? extends Record> handleEvent(final AbstractBinlogEvent event) {
return Collections.singletonList(createPlaceholderRecord(event));
}
AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event;
if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperContext.containsTable(rowsEvent.getTableName())) {
if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperContext.getCommonContext().containsTable(rowsEvent.getTableName())) {
return Collections.singletonList(createPlaceholderRecord(event));
}
PipelineTableMetaData tableMetaData = getPipelineTableMetaData(rowsEvent.getTableName());
Expand All @@ -157,11 +158,11 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractBinlogEvent even
}

private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) {
return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new ActualTableName(actualTableName)), actualTableName);
return metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getSchemaName(new ActualTableName(actualTableName)), actualTableName);
}

private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) {
Collection<ColumnName> columnNames = dumperContext.getColumnNames(event.getTableName());
Collection<ColumnName> columnNames = dumperContext.getCommonContext().getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getAfterRows()) {
DataRecord dataRecord = createDataRecord(IngestDataChangeType.INSERT, event, each.length);
Expand All @@ -182,7 +183,7 @@ private boolean isColumnUnneeded(final Collection<ColumnName> columnNames, final
}

private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent event, final PipelineTableMetaData tableMetaData) {
Collection<ColumnName> columnNames = dumperContext.getColumnNames(event.getTableName());
Collection<ColumnName> columnNames = dumperContext.getCommonContext().getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (int i = 0; i < event.getBeforeRows().size(); i++) {
Serializable[] beforeValues = event.getBeforeRows().get(i);
Expand All @@ -206,7 +207,7 @@ private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent event, fina
}

private List<DataRecord> handleDeleteRowsEvent(final DeleteRowsEvent event, final PipelineTableMetaData tableMetaData) {
Collection<ColumnName> columnNames = dumperContext.getColumnNames(event.getTableName());
Collection<ColumnName> columnNames = dumperContext.getCommonContext().getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getBeforeRows()) {
DataRecord dataRecord = createDataRecord(IngestDataChangeType.DELETE, event, each.length);
Expand Down Expand Up @@ -234,7 +235,7 @@ private Serializable handleValue(final PipelineColumnMetaData columnMetaData, fi
}

private DataRecord createDataRecord(final String type, final AbstractRowsEvent rowsEvent, final int columnCount) {
String tableName = dumperContext.getLogicTableName(rowsEvent.getTableName()).getOriginal();
String tableName = dumperContext.getCommonContext().getLogicTableName(rowsEvent.getTableName()).getOriginal();
IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId());
DataRecord result = new DataRecord(type, tableName, position, columnCount);
result.setActualTableName(rowsEvent.getTableName());
Expand Down
Loading
Loading