From 63d7f318fce9fe54000861c55cdfc524fbfb4f40 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 4 Nov 2023 11:48:43 +0800 Subject: [PATCH] Remove useless DumperCommonContext.targetTableColumnsMap --- .../context/ingest/DumperCommonContext.java | 30 ---------------- .../ingest/InventoryDumperContext.java | 1 - .../pipeline/core/dumper/InventoryDumper.java | 2 +- .../mysql/ingest/MySQLIncrementalDumper.java | 18 ---------- .../ingest/MySQLIncrementalDumperTest.java | 35 ++++--------------- .../ingest/wal/WALEventConverter.java | 17 ++------- .../ingest/wal/WALEventConverterTest.java | 20 ++--------- 7 files changed, 14 insertions(+), 109 deletions(-) diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/DumperCommonContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/DumperCommonContext.java index 31d5f33abbed8..0881f8f787f75 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/DumperCommonContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/DumperCommonContext.java @@ -24,14 +24,9 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; -import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; -import java.util.stream.Collectors; /** * Base dumper context. @@ -49,9 +44,6 @@ public abstract class DumperCommonContext { private TableNameSchemaNameMapping tableNameSchemaNameMapping; - // LinkedHashSet is required - private Map> targetTableColumnsMap = new HashMap<>(); - private IngestPosition position; /** @@ -97,26 +89,4 @@ public String getSchemaName(final LogicTableName logicTableName) { public String getSchemaName(final ActualTableName actualTableName) { return tableNameSchemaNameMapping.getSchemaName(getLogicTableName(actualTableName)); } - - /** - * Get column names. - * - * @param logicTableName logic table name - * @return column names - */ - public Collection getColumnNames(final LogicTableName logicTableName) { - return targetTableColumnsMap.containsKey(logicTableName) - ? targetTableColumnsMap.get(logicTableName).stream().map(ColumnName::getOriginal).collect(Collectors.toList()) - : Collections.singleton("*"); - } - - /** - * Get column names. - * - * @param actualTableName actual table name - * @return column names - */ - public Collection getColumnNames(final String actualTableName) { - return targetTableColumnsMap.getOrDefault(getLogicTableName(actualTableName), Collections.emptySet()); - } } diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/InventoryDumperContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/InventoryDumperContext.java index 5b4b723b4487c..362537d97c13b 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/InventoryDumperContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/InventoryDumperContext.java @@ -56,7 +56,6 @@ public InventoryDumperContext(final DumperCommonContext dumperContext) { setDataSourceConfig(dumperContext.getDataSourceConfig()); setTableNameMap(dumperContext.getTableNameMap()); setTableNameSchemaNameMapping(dumperContext.getTableNameSchemaNameMapping()); - setTargetTableColumnsMap(dumperContext.getTargetTableColumnsMap()); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java index d1ca7a44e2e35..0777dbc2e4633 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java @@ -163,7 +163,7 @@ private String buildInventoryDumpSQL() { } PrimaryKeyPosition primaryKeyPosition = (PrimaryKeyPosition) dumperContext.getPosition(); PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0); - Collection columnNames = dumperContext.getColumnNames(logicTableName); + Collection columnNames = Collections.singleton("*"); if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) { if (null != primaryKeyPosition.getBeginValue() && null != primaryKeyPosition.getEndValue()) { return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName()); diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index d1fb5ca8ea489..4446075a3c162 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -31,7 +31,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; -import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName; import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData; @@ -55,7 +54,6 @@ import java.io.Serializable; import java.nio.charset.Charset; -import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -161,15 +159,11 @@ private PipelineTableMetaData getPipelineTableMetaData(final String actualTableN } private List handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) { - Collection columnNames = dumperContext.getColumnNames(event.getTableName()); List result = new LinkedList<>(); for (Serializable[] each : event.getAfterRows()) { DataRecord dataRecord = createDataRecord(IngestDataChangeType.INSERT, event, each.length); for (int i = 0; i < each.length; i++) { PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1); - if (isColumnUnneeded(columnNames, columnMetaData.getName())) { - continue; - } dataRecord.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, columnMetaData.isUniqueKey())); } result.add(dataRecord); @@ -177,12 +171,7 @@ private List handleWriteRowsEvent(final WriteRowsEvent event, final return result; } - private boolean isColumnUnneeded(final Collection columnNames, final String columnName) { - return !columnNames.isEmpty() && !columnNames.contains(new ColumnName(columnName)); - } - private List handleUpdateRowsEvent(final UpdateRowsEvent event, final PipelineTableMetaData tableMetaData) { - Collection columnNames = dumperContext.getColumnNames(event.getTableName()); List result = new LinkedList<>(); for (int i = 0; i < event.getBeforeRows().size(); i++) { Serializable[] beforeValues = event.getBeforeRows().get(i); @@ -193,9 +182,6 @@ private List handleUpdateRowsEvent(final UpdateRowsEvent event, fina Serializable newValue = afterValues[j]; boolean updated = !Objects.equals(newValue, oldValue); PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(j + 1); - if (isColumnUnneeded(columnNames, columnMetaData.getName())) { - continue; - } dataRecord.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, oldValue), handleValue(columnMetaData, newValue), updated, columnMetaData.isUniqueKey())); @@ -206,15 +192,11 @@ private List handleUpdateRowsEvent(final UpdateRowsEvent event, fina } private List handleDeleteRowsEvent(final DeleteRowsEvent event, final PipelineTableMetaData tableMetaData) { - Collection columnNames = dumperContext.getColumnNames(event.getTableName()); List result = new LinkedList<>(); for (Serializable[] each : event.getBeforeRows()) { DataRecord dataRecord = createDataRecord(IngestDataChangeType.DELETE, event, each.length); for (int i = 0, length = each.length; i < length; i++) { PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1); - if (isColumnUnneeded(columnNames, columnMetaData.getName())) { - continue; - } dataRecord.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), null, true, columnMetaData.isUniqueKey())); } result.add(dataRecord); diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java index a2df83744e196..f4aea914c980f 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java @@ -75,15 +75,13 @@ @SuppressWarnings("unchecked") class MySQLIncrementalDumperTest { - private IncrementalDumperContext dumperContext; - private MySQLIncrementalDumper incrementalDumper; private PipelineTableMetaData pipelineTableMetaData; @BeforeEach void setUp() { - dumperContext = mockDumperContext(); + IncrementalDumperContext dumperContext = mockDumperContext(); initTableData(dumperContext); dumperContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root")); PipelineTableMetaDataLoader metaDataLoader = mock(PipelineTableMetaDataLoader.class); @@ -98,7 +96,6 @@ private IncrementalDumperContext mockDumperContext() { result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root")); result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap())); - result.setTargetTableColumnsMap(Collections.singletonMap(new LogicTableName("t_order"), Collections.singleton(new ColumnName("order_id")))); return result; } @@ -129,16 +126,10 @@ private List mockOrderColumnsMetaDataList() { @Test void assertWriteRowsEventWithoutCustomColumns() throws ReflectiveOperationException { - assertWriteRowsEvent0(Collections.emptyMap(), 3); + assertWriteRowsEvent0(3); } - @Test - void assertWriteRowsEventWithCustomColumns() throws ReflectiveOperationException { - assertWriteRowsEvent0(mockTargetTableColumnsMap(), 1); - } - - private void assertWriteRowsEvent0(final Map> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException { - dumperContext.setTargetTableColumnsMap(targetTableColumnsMap); + private void assertWriteRowsEvent0(final int expectedColumnCount) throws ReflectiveOperationException { WriteRowsEvent rowsEvent = new WriteRowsEvent(); rowsEvent.setDatabaseName(""); rowsEvent.setTableName("t_order"); @@ -157,16 +148,10 @@ private Map> mockTargetTableColumnsMap() @Test void assertUpdateRowsEventWithoutCustomColumns() throws ReflectiveOperationException { - assertUpdateRowsEvent0(Collections.emptyMap(), 3); - } - - @Test - void assertUpdateRowsEventWithCustomColumns() throws ReflectiveOperationException { - assertUpdateRowsEvent0(mockTargetTableColumnsMap(), 1); + assertUpdateRowsEvent0(3); } - private void assertUpdateRowsEvent0(final Map> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException { - dumperContext.setTargetTableColumnsMap(targetTableColumnsMap); + private void assertUpdateRowsEvent0(final int expectedColumnCount) throws ReflectiveOperationException { UpdateRowsEvent rowsEvent = new UpdateRowsEvent(); rowsEvent.setDatabaseName("test"); rowsEvent.setTableName("t_order"); @@ -182,16 +167,10 @@ private void assertUpdateRowsEvent0(final Map> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException { - dumperContext.setTargetTableColumnsMap(targetTableColumnsMap); + private void assertDeleteRowsEvent0(final int expectedColumnCount) throws ReflectiveOperationException { DeleteRowsEvent rowsEvent = new DeleteRowsEvent(); rowsEvent.setDatabaseName(""); rowsEvent.setTableName("t_order"); diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java index aa96452a9b6b8..1e14fd11575d4 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java @@ -23,7 +23,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; -import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName; import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData; @@ -35,7 +34,6 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent; import org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException; -import java.util.Collection; import java.util.List; /** @@ -97,14 +95,13 @@ private PipelineTableMetaData getPipelineTableMetaData(final String actualTableN private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, final PipelineTableMetaData tableMetaData) { DataRecord result = createDataRecord(IngestDataChangeType.INSERT, writeRowEvent, writeRowEvent.getAfterRow().size()); - putColumnsIntoDataRecord(result, tableMetaData, writeRowEvent.getTableName(), writeRowEvent.getAfterRow()); + putColumnsIntoDataRecord(result, tableMetaData, writeRowEvent.getAfterRow()); return result; } private DataRecord handleUpdateRowEvent(final UpdateRowEvent updateRowEvent, final PipelineTableMetaData tableMetaData) { DataRecord result = createDataRecord(IngestDataChangeType.UPDATE, updateRowEvent, updateRowEvent.getAfterRow().size()); - String actualTableName = updateRowEvent.getTableName(); - putColumnsIntoDataRecord(result, tableMetaData, actualTableName, updateRowEvent.getAfterRow()); + putColumnsIntoDataRecord(result, tableMetaData, updateRowEvent.getAfterRow()); return result; } @@ -127,21 +124,13 @@ private DataRecord createDataRecord(final String type, final AbstractRowEvent ro return result; } - private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final String actualTableName, final List values) { - Collection columnNames = dumperContext.getColumnNames(actualTableName); + private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final List values) { for (int i = 0, count = values.size(); i < count; i++) { PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1); - if (isColumnUnneeded(columnNames, columnMetaData.getName())) { - continue; - } boolean isUniqueKey = columnMetaData.isUniqueKey(); Object uniqueKeyOldValue = isUniqueKey && IngestDataChangeType.UPDATE.equals(dataRecord.getType()) ? values.get(i) : null; Column column = new Column(columnMetaData.getName(), uniqueKeyOldValue, values.get(i), true, isUniqueKey); dataRecord.addColumn(column); } } - - private boolean isColumnUnneeded(final Collection columnNames, final String columnName) { - return !columnNames.isEmpty() && !columnNames.contains(new ColumnName(columnName)); - } } diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java index 389b3f724bdca..1197dfefd8279 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java @@ -25,7 +25,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; -import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData; @@ -54,7 +53,6 @@ import java.sql.Statement; import java.sql.Types; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -71,8 +69,6 @@ class WALEventConverterTest { - private IncrementalDumperContext dumperContext; - private WALEventConverter walEventConverter; private final LogSequenceNumber logSequenceNumber = LogSequenceNumber.valueOf("0/14EFDB8"); @@ -81,7 +77,7 @@ class WALEventConverterTest { @BeforeEach void setUp() { - dumperContext = mockDumperContext(); + IncrementalDumperContext dumperContext = mockDumperContext(); PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); walEventConverter = new WALEventConverter(dumperContext, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()))); initTableData(dumperContext); @@ -123,16 +119,10 @@ private List mockOrderColumnsMetaDataList() { @Test void assertWriteRowEventWithoutCustomColumns() throws ReflectiveOperationException { - assertWriteRowEvent0(Collections.emptyMap(), 3); - } - - @Test - void assertWriteRowEventWithCustomColumns() throws ReflectiveOperationException { - assertWriteRowEvent0(mockTargetTableColumnsMap(), 1); + assertWriteRowEvent0(3); } - private void assertWriteRowEvent0(final Map> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException { - dumperContext.setTargetTableColumnsMap(targetTableColumnsMap); + private void assertWriteRowEvent0(final int expectedColumnCount) throws ReflectiveOperationException { WriteRowEvent rowsEvent = new WriteRowEvent(); rowsEvent.setSchemaName(""); rowsEvent.setTableName("t_order"); @@ -143,10 +133,6 @@ private void assertWriteRowEvent0(final Map> mockTargetTableColumnsMap() { - return Collections.singletonMap(new LogicTableName("t_order"), Collections.singleton(new ColumnName("order_id"))); - } - @Test void assertConvertBeginTXEvent() { BeginTXEvent beginTXEvent = new BeginTXEvent(100);