Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove useless DumperCommonContext.targetTableColumnsMap #28935

Merged
merged 1 commit into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,9 @@
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Base dumper context.
Expand All @@ -49,9 +44,6 @@ public abstract class DumperCommonContext {

private TableNameSchemaNameMapping tableNameSchemaNameMapping;

// LinkedHashSet is required
private Map<LogicTableName, Collection<ColumnName>> targetTableColumnsMap = new HashMap<>();

private IngestPosition position;

/**
Expand Down Expand Up @@ -97,26 +89,4 @@ public String getSchemaName(final LogicTableName logicTableName) {
public String getSchemaName(final ActualTableName actualTableName) {
return tableNameSchemaNameMapping.getSchemaName(getLogicTableName(actualTableName));
}

/**
* Get column names.
*
* @param logicTableName logic table name
* @return column names
*/
public Collection<String> getColumnNames(final LogicTableName logicTableName) {
return targetTableColumnsMap.containsKey(logicTableName)
? targetTableColumnsMap.get(logicTableName).stream().map(ColumnName::getOriginal).collect(Collectors.toList())
: Collections.singleton("*");
}

/**
* Get column names.
*
* @param actualTableName actual table name
* @return column names
*/
public Collection<ColumnName> getColumnNames(final String actualTableName) {
return targetTableColumnsMap.getOrDefault(getLogicTableName(actualTableName), Collections.emptySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public InventoryDumperContext(final DumperCommonContext dumperContext) {
setDataSourceConfig(dumperContext.getDataSourceConfig());
setTableNameMap(dumperContext.getTableNameMap());
setTableNameSchemaNameMapping(dumperContext.getTableNameSchemaNameMapping());
setTargetTableColumnsMap(dumperContext.getTargetTableColumnsMap());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private String buildInventoryDumpSQL() {
}
PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>) dumperContext.getPosition();
PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
Collection<String> columnNames = dumperContext.getColumnNames(logicTableName);
Collection<String> columnNames = Collections.singleton("*");
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
if (null != primaryKeyPosition.getBeginValue() && null != primaryKeyPosition.getEndValue()) {
return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
Expand All @@ -55,7 +54,6 @@

import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -161,28 +159,19 @@ private PipelineTableMetaData getPipelineTableMetaData(final String actualTableN
}

private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) {
Collection<ColumnName> columnNames = dumperContext.getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getAfterRows()) {
DataRecord dataRecord = createDataRecord(IngestDataChangeType.INSERT, event, each.length);
for (int i = 0; i < each.length; i++) {
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
continue;
}
dataRecord.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, columnMetaData.isUniqueKey()));
}
result.add(dataRecord);
}
return result;
}

private boolean isColumnUnneeded(final Collection<ColumnName> columnNames, final String columnName) {
return !columnNames.isEmpty() && !columnNames.contains(new ColumnName(columnName));
}

private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent event, final PipelineTableMetaData tableMetaData) {
Collection<ColumnName> columnNames = dumperContext.getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (int i = 0; i < event.getBeforeRows().size(); i++) {
Serializable[] beforeValues = event.getBeforeRows().get(i);
Expand All @@ -193,9 +182,6 @@ private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent event, fina
Serializable newValue = afterValues[j];
boolean updated = !Objects.equals(newValue, oldValue);
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(j + 1);
if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
continue;
}
dataRecord.addColumn(new Column(columnMetaData.getName(),
handleValue(columnMetaData, oldValue),
handleValue(columnMetaData, newValue), updated, columnMetaData.isUniqueKey()));
Expand All @@ -206,15 +192,11 @@ private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent event, fina
}

private List<DataRecord> handleDeleteRowsEvent(final DeleteRowsEvent event, final PipelineTableMetaData tableMetaData) {
Collection<ColumnName> columnNames = dumperContext.getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getBeforeRows()) {
DataRecord dataRecord = createDataRecord(IngestDataChangeType.DELETE, event, each.length);
for (int i = 0, length = each.length; i < length; i++) {
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
continue;
}
dataRecord.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), null, true, columnMetaData.isUniqueKey()));
}
result.add(dataRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,13 @@
@SuppressWarnings("unchecked")
class MySQLIncrementalDumperTest {

private IncrementalDumperContext dumperContext;

private MySQLIncrementalDumper incrementalDumper;

private PipelineTableMetaData pipelineTableMetaData;

@BeforeEach
void setUp() {
dumperContext = mockDumperContext();
IncrementalDumperContext dumperContext = mockDumperContext();
initTableData(dumperContext);
dumperContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root"));
PipelineTableMetaDataLoader metaDataLoader = mock(PipelineTableMetaDataLoader.class);
Expand All @@ -98,7 +96,6 @@ private IncrementalDumperContext mockDumperContext() {
result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root"));
result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")));
result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap()));
result.setTargetTableColumnsMap(Collections.singletonMap(new LogicTableName("t_order"), Collections.singleton(new ColumnName("order_id"))));
return result;
}

Expand Down Expand Up @@ -129,16 +126,10 @@ private List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {

@Test
void assertWriteRowsEventWithoutCustomColumns() throws ReflectiveOperationException {
assertWriteRowsEvent0(Collections.emptyMap(), 3);
assertWriteRowsEvent0(3);
}

@Test
void assertWriteRowsEventWithCustomColumns() throws ReflectiveOperationException {
assertWriteRowsEvent0(mockTargetTableColumnsMap(), 1);
}

private void assertWriteRowsEvent0(final Map<LogicTableName, Collection<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException {
dumperContext.setTargetTableColumnsMap(targetTableColumnsMap);
private void assertWriteRowsEvent0(final int expectedColumnCount) throws ReflectiveOperationException {
WriteRowsEvent rowsEvent = new WriteRowsEvent();
rowsEvent.setDatabaseName("");
rowsEvent.setTableName("t_order");
Expand All @@ -157,16 +148,10 @@ private Map<LogicTableName, Collection<ColumnName>> mockTargetTableColumnsMap()

@Test
void assertUpdateRowsEventWithoutCustomColumns() throws ReflectiveOperationException {
assertUpdateRowsEvent0(Collections.emptyMap(), 3);
}

@Test
void assertUpdateRowsEventWithCustomColumns() throws ReflectiveOperationException {
assertUpdateRowsEvent0(mockTargetTableColumnsMap(), 1);
assertUpdateRowsEvent0(3);
}

private void assertUpdateRowsEvent0(final Map<LogicTableName, Collection<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException {
dumperContext.setTargetTableColumnsMap(targetTableColumnsMap);
private void assertUpdateRowsEvent0(final int expectedColumnCount) throws ReflectiveOperationException {
UpdateRowsEvent rowsEvent = new UpdateRowsEvent();
rowsEvent.setDatabaseName("test");
rowsEvent.setTableName("t_order");
Expand All @@ -182,16 +167,10 @@ private void assertUpdateRowsEvent0(final Map<LogicTableName, Collection<ColumnN

@Test
void assertDeleteRowsEventWithoutCustomColumns() throws ReflectiveOperationException {
assertDeleteRowsEvent0(Collections.emptyMap(), 3);
}

@Test
void assertDeleteRowsEventWithCustomColumns() throws ReflectiveOperationException {
assertDeleteRowsEvent0(mockTargetTableColumnsMap(), 1);
assertDeleteRowsEvent0(3);
}

private void assertDeleteRowsEvent0(final Map<LogicTableName, Collection<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException {
dumperContext.setTargetTableColumnsMap(targetTableColumnsMap);
private void assertDeleteRowsEvent0(final int expectedColumnCount) throws ReflectiveOperationException {
DeleteRowsEvent rowsEvent = new DeleteRowsEvent();
rowsEvent.setDatabaseName("");
rowsEvent.setTableName("t_order");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
Expand All @@ -35,7 +34,6 @@
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
import org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;

import java.util.Collection;
import java.util.List;

/**
Expand Down Expand Up @@ -97,14 +95,13 @@ private PipelineTableMetaData getPipelineTableMetaData(final String actualTableN

private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, final PipelineTableMetaData tableMetaData) {
DataRecord result = createDataRecord(IngestDataChangeType.INSERT, writeRowEvent, writeRowEvent.getAfterRow().size());
putColumnsIntoDataRecord(result, tableMetaData, writeRowEvent.getTableName(), writeRowEvent.getAfterRow());
putColumnsIntoDataRecord(result, tableMetaData, writeRowEvent.getAfterRow());
return result;
}

private DataRecord handleUpdateRowEvent(final UpdateRowEvent updateRowEvent, final PipelineTableMetaData tableMetaData) {
DataRecord result = createDataRecord(IngestDataChangeType.UPDATE, updateRowEvent, updateRowEvent.getAfterRow().size());
String actualTableName = updateRowEvent.getTableName();
putColumnsIntoDataRecord(result, tableMetaData, actualTableName, updateRowEvent.getAfterRow());
putColumnsIntoDataRecord(result, tableMetaData, updateRowEvent.getAfterRow());
return result;
}

Expand All @@ -127,21 +124,13 @@ private DataRecord createDataRecord(final String type, final AbstractRowEvent ro
return result;
}

private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final String actualTableName, final List<Object> values) {
Collection<ColumnName> columnNames = dumperContext.getColumnNames(actualTableName);
private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final List<Object> values) {
for (int i = 0, count = values.size(); i < count; i++) {
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
continue;
}
boolean isUniqueKey = columnMetaData.isUniqueKey();
Object uniqueKeyOldValue = isUniqueKey && IngestDataChangeType.UPDATE.equals(dataRecord.getType()) ? values.get(i) : null;
Column column = new Column(columnMetaData.getName(), uniqueKeyOldValue, values.get(i), true, isUniqueKey);
dataRecord.addColumn(column);
}
}

private boolean isColumnUnneeded(final Collection<ColumnName> columnNames, final String columnName) {
return !columnNames.isEmpty() && !columnNames.contains(new ColumnName(columnName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
Expand Down Expand Up @@ -54,7 +53,6 @@
import java.sql.Statement;
import java.sql.Types;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -71,8 +69,6 @@

class WALEventConverterTest {

private IncrementalDumperContext dumperContext;

private WALEventConverter walEventConverter;

private final LogSequenceNumber logSequenceNumber = LogSequenceNumber.valueOf("0/14EFDB8");
Expand All @@ -81,7 +77,7 @@ class WALEventConverterTest {

@BeforeEach
void setUp() {
dumperContext = mockDumperContext();
IncrementalDumperContext dumperContext = mockDumperContext();
PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
walEventConverter = new WALEventConverter(dumperContext, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())));
initTableData(dumperContext);
Expand Down Expand Up @@ -123,16 +119,10 @@ private List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {

@Test
void assertWriteRowEventWithoutCustomColumns() throws ReflectiveOperationException {
assertWriteRowEvent0(Collections.emptyMap(), 3);
}

@Test
void assertWriteRowEventWithCustomColumns() throws ReflectiveOperationException {
assertWriteRowEvent0(mockTargetTableColumnsMap(), 1);
assertWriteRowEvent0(3);
}

private void assertWriteRowEvent0(final Map<LogicTableName, Collection<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException {
dumperContext.setTargetTableColumnsMap(targetTableColumnsMap);
private void assertWriteRowEvent0(final int expectedColumnCount) throws ReflectiveOperationException {
WriteRowEvent rowsEvent = new WriteRowEvent();
rowsEvent.setSchemaName("");
rowsEvent.setTableName("t_order");
Expand All @@ -143,10 +133,6 @@ private void assertWriteRowEvent0(final Map<LogicTableName, Collection<ColumnNam
assertThat(actual.getColumnCount(), is(expectedColumnCount));
}

private Map<LogicTableName, Collection<ColumnName>> mockTargetTableColumnsMap() {
return Collections.singletonMap(new LogicTableName("t_order"), Collections.singleton(new ColumnName("order_id")));
}

@Test
void assertConvertBeginTXEvent() {
BeginTXEvent beginTXEvent = new BeginTXEvent(100);
Expand Down