From 8c8a5521b6f35012250fa87071819cbbe42314ae Mon Sep 17 00:00:00 2001 From: zhangliang Date: Thu, 2 Nov 2023 22:30:37 +0800 Subject: [PATCH] Refactor DumperConfiguration --- .../config/ingest/DumperConfiguration.java | 21 ++++++++----------- .../PipelineInventoryDumpSQLBuilder.java | 10 ++++----- .../pipeline/core/dumper/InventoryDumper.java | 3 ++- .../mysql/ingest/MySQLIncrementalDumper.java | 18 ++++++++-------- .../ingest/wal/WALEventConverter.java | 10 ++++----- 5 files changed, 30 insertions(+), 32 deletions(-) diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java index 474ea9cc8be2d..45b77d4a2c187 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java @@ -28,9 +28,9 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; -import java.util.List; +import java.util.Collection; +import java.util.Collections; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -112,22 +112,19 @@ public String getSchemaName(final ActualTableName actualTableName) { * @param logicTableName logic table name * @return column names */ - public Optional> getColumnNames(final LogicTableName logicTableName) { - Set columnNames = null == targetTableColumnsMap ? null : targetTableColumnsMap.get(logicTableName); - if (null == columnNames) { - return Optional.empty(); - } - return Optional.of(columnNames.stream().map(ColumnName::getOriginal).collect(Collectors.toList())); + public Collection getColumnNames(final LogicTableName logicTableName) { + Collection columnNames = null == targetTableColumnsMap ? null : targetTableColumnsMap.get(logicTableName); + return null == columnNames ? Collections.singleton("*") : columnNames.stream().map(ColumnName::getOriginal).collect(Collectors.toList()); } /** - * Get column name set of table. + * Get column names. * * @param actualTableName actual table name - * @return column names of table + * @return column names */ - public Optional> getColumnNameSet(final String actualTableName) { + public Collection getColumnNames(final String actualTableName) { Set result = null == targetTableColumnsMap ? null : targetTableColumnsMap.get(getLogicTableName(actualTableName)); - return Optional.ofNullable(result); + return null == result ? Collections.emptySet() : result; } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineInventoryDumpSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineInventoryDumpSQLBuilder.java index 81a9711856a16..395d98cacf232 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineInventoryDumpSQLBuilder.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineInventoryDumpSQLBuilder.java @@ -19,7 +19,7 @@ import org.apache.shardingsphere.infra.database.core.type.DatabaseType; -import java.util.List; +import java.util.Collection; import java.util.stream.Collectors; /** @@ -42,7 +42,7 @@ public PipelineInventoryDumpSQLBuilder(final DatabaseType databaseType) { * @param uniqueKey unique key * @return built SQL */ - public String buildDivisibleSQL(final String schemaName, final String tableName, final List columnNames, final String uniqueKey) { + public String buildDivisibleSQL(final String schemaName, final String tableName, final Collection columnNames, final String uniqueKey) { String qualifiedTableName = sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName); String escapedUniqueKey = sqlSegmentBuilder.getEscapedIdentifier(uniqueKey); return String.format("SELECT %s FROM %s WHERE %s>=? AND %s<=? ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, escapedUniqueKey, escapedUniqueKey, escapedUniqueKey); @@ -57,7 +57,7 @@ public String buildDivisibleSQL(final String schemaName, final String tableName, * @param uniqueKey unique key * @return built SQL */ - public String buildUnlimitedDivisibleSQL(final String schemaName, final String tableName, final List columnNames, final String uniqueKey) { + public String buildUnlimitedDivisibleSQL(final String schemaName, final String tableName, final Collection columnNames, final String uniqueKey) { String qualifiedTableName = sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName); String escapedUniqueKey = sqlSegmentBuilder.getEscapedIdentifier(uniqueKey); return String.format("SELECT %s FROM %s WHERE %s>=? ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, escapedUniqueKey, escapedUniqueKey); @@ -72,13 +72,13 @@ public String buildUnlimitedDivisibleSQL(final String schemaName, final String t * @param uniqueKey unique key * @return built SQL */ - public String buildIndivisibleSQL(final String schemaName, final String tableName, final List columnNames, final String uniqueKey) { + public String buildIndivisibleSQL(final String schemaName, final String tableName, final Collection columnNames, final String uniqueKey) { String qualifiedTableName = sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName); String quotedUniqueKey = sqlSegmentBuilder.getEscapedIdentifier(uniqueKey); return String.format("SELECT %s FROM %s ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey); } - private String buildQueryColumns(final List columnNames) { + private String buildQueryColumns(final Collection columnNames) { return columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(",")); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java index 31e80d750d546..23462cdf1db01 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java @@ -57,6 +57,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; +import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -162,7 +163,7 @@ private String buildInventoryDumpSQL() { } PrimaryKeyPosition primaryKeyPosition = (PrimaryKeyPosition) dumperConfig.getPosition(); PipelineColumnMetaData firstColumn = dumperConfig.getUniqueKeyColumns().get(0); - List columnNames = dumperConfig.getColumnNames(logicTableName).orElse(Collections.singletonList("*")); + Collection columnNames = dumperConfig.getColumnNames(logicTableName); if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) { if (null != primaryKeyPosition.getBeginValue() && null != primaryKeyPosition.getEndValue()) { return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperConfig.getActualTableName(), columnNames, firstColumn.getName()); diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index 6c5cde7993a4c..c41f2ae9e3314 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -55,12 +55,12 @@ import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.Set; /** * MySQL incremental dumper. @@ -161,13 +161,13 @@ private PipelineTableMetaData getPipelineTableMetaData(final String actualTableN } private List handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) { - Set columnNameSet = dumperConfig.getColumnNameSet(event.getTableName()).orElse(null); + Collection columnNames = dumperConfig.getColumnNames(event.getTableName()); List result = new LinkedList<>(); for (Serializable[] each : event.getAfterRows()) { DataRecord dataRecord = createDataRecord(IngestDataChangeType.INSERT, event, each.length); for (int i = 0; i < each.length; i++) { PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1); - if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) { + if (isColumnUnneeded(columnNames, columnMetaData.getName())) { continue; } dataRecord.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, columnMetaData.isUniqueKey())); @@ -177,12 +177,12 @@ private List handleWriteRowsEvent(final WriteRowsEvent event, final return result; } - private boolean isColumnUnneeded(final Set columnNameSet, final String columnName) { - return null != columnNameSet && !columnNameSet.contains(new ColumnName(columnName)); + private boolean isColumnUnneeded(final Collection columnNames, final String columnName) { + return !columnNames.isEmpty() && !columnNames.contains(new ColumnName(columnName)); } private List handleUpdateRowsEvent(final UpdateRowsEvent event, final PipelineTableMetaData tableMetaData) { - Set columnNameSet = dumperConfig.getColumnNameSet(event.getTableName()).orElse(null); + Collection columnNames = dumperConfig.getColumnNames(event.getTableName()); List result = new LinkedList<>(); for (int i = 0; i < event.getBeforeRows().size(); i++) { Serializable[] beforeValues = event.getBeforeRows().get(i); @@ -193,7 +193,7 @@ private List handleUpdateRowsEvent(final UpdateRowsEvent event, fina Serializable newValue = afterValues[j]; boolean updated = !Objects.equals(newValue, oldValue); PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(j + 1); - if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) { + if (isColumnUnneeded(columnNames, columnMetaData.getName())) { continue; } dataRecord.addColumn(new Column(columnMetaData.getName(), @@ -206,13 +206,13 @@ private List handleUpdateRowsEvent(final UpdateRowsEvent event, fina } private List handleDeleteRowsEvent(final DeleteRowsEvent event, final PipelineTableMetaData tableMetaData) { - Set columnNameSet = dumperConfig.getColumnNameSet(event.getTableName()).orElse(null); + Collection columnNames = dumperConfig.getColumnNames(event.getTableName()); List result = new LinkedList<>(); for (Serializable[] each : event.getBeforeRows()) { DataRecord dataRecord = createDataRecord(IngestDataChangeType.DELETE, event, each.length); for (int i = 0, length = each.length; i < length; i++) { PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1); - if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) { + if (isColumnUnneeded(columnNames, columnMetaData.getName())) { continue; } dataRecord.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), null, true, columnMetaData.isUniqueKey())); diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java index bd261596024ef..5703ed68ce018 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java @@ -35,8 +35,8 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent; import org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException; +import java.util.Collection; import java.util.List; -import java.util.Set; /** * WAL event converter. @@ -128,10 +128,10 @@ private DataRecord createDataRecord(final String type, final AbstractRowEvent ro } private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final String actualTableName, final List values) { - Set columnNameSet = dumperConfig.getColumnNameSet(actualTableName).orElse(null); + Collection columnNames = dumperConfig.getColumnNames(actualTableName); for (int i = 0, count = values.size(); i < count; i++) { PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1); - if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) { + if (isColumnUnneeded(columnNames, columnMetaData.getName())) { continue; } boolean isUniqueKey = columnMetaData.isUniqueKey(); @@ -141,7 +141,7 @@ private void putColumnsIntoDataRecord(final DataRecord dataRecord, final Pipelin } } - private boolean isColumnUnneeded(final Set columnNameSet, final String columnName) { - return null != columnNameSet && !columnNameSet.contains(new ColumnName(columnName)); + private boolean isColumnUnneeded(final Collection columnNames, final String columnName) { + return !columnNames.isEmpty() && !columnNames.contains(new ColumnName(columnName)); } }