Skip to content

Commit

Permalink
Refactor DumperConfiguration
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 2, 2023
1 parent 59c1bd0 commit 8c8a552
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;

import java.util.List;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -112,22 +112,19 @@ public String getSchemaName(final ActualTableName actualTableName) {
* @param logicTableName logic table name
* @return column names
*/
public Optional<List<String>> getColumnNames(final LogicTableName logicTableName) {
Set<ColumnName> columnNames = null == targetTableColumnsMap ? null : targetTableColumnsMap.get(logicTableName);
if (null == columnNames) {
return Optional.empty();
}
return Optional.of(columnNames.stream().map(ColumnName::getOriginal).collect(Collectors.toList()));
public Collection<String> getColumnNames(final LogicTableName logicTableName) {
Collection<ColumnName> columnNames = null == targetTableColumnsMap ? null : targetTableColumnsMap.get(logicTableName);
return null == columnNames ? Collections.singleton("*") : columnNames.stream().map(ColumnName::getOriginal).collect(Collectors.toList());
}

/**
* Get column name set of table.
* Get column names.
*
* @param actualTableName actual table name
* @return column names of table
* @return column names
*/
public Optional<Set<ColumnName>> getColumnNameSet(final String actualTableName) {
public Collection<ColumnName> getColumnNames(final String actualTableName) {
Set<ColumnName> result = null == targetTableColumnsMap ? null : targetTableColumnsMap.get(getLogicTableName(actualTableName));
return Optional.ofNullable(result);
return null == result ? Collections.emptySet() : result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.shardingsphere.infra.database.core.type.DatabaseType;

import java.util.List;
import java.util.Collection;
import java.util.stream.Collectors;

/**
Expand All @@ -42,7 +42,7 @@ public PipelineInventoryDumpSQLBuilder(final DatabaseType databaseType) {
* @param uniqueKey unique key
* @return built SQL
*/
public String buildDivisibleSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey) {
public String buildDivisibleSQL(final String schemaName, final String tableName, final Collection<String> columnNames, final String uniqueKey) {
String qualifiedTableName = sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
String escapedUniqueKey = sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
return String.format("SELECT %s FROM %s WHERE %s>=? AND %s<=? ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, escapedUniqueKey, escapedUniqueKey, escapedUniqueKey);
Expand All @@ -57,7 +57,7 @@ public String buildDivisibleSQL(final String schemaName, final String tableName,
* @param uniqueKey unique key
* @return built SQL
*/
public String buildUnlimitedDivisibleSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey) {
public String buildUnlimitedDivisibleSQL(final String schemaName, final String tableName, final Collection<String> columnNames, final String uniqueKey) {
String qualifiedTableName = sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
String escapedUniqueKey = sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
return String.format("SELECT %s FROM %s WHERE %s>=? ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, escapedUniqueKey, escapedUniqueKey);
Expand All @@ -72,13 +72,13 @@ public String buildUnlimitedDivisibleSQL(final String schemaName, final String t
* @param uniqueKey unique key
* @return built SQL
*/
public String buildIndivisibleSQL(final String schemaName, final String tableName, final List<String> columnNames, final String uniqueKey) {
public String buildIndivisibleSQL(final String schemaName, final String tableName, final Collection<String> columnNames, final String uniqueKey) {
String qualifiedTableName = sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
String quotedUniqueKey = sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
return String.format("SELECT %s FROM %s ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey);
}

private String buildQueryColumns(final List<String> columnNames) {
private String buildQueryColumns(final Collection<String> columnNames) {
return columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -162,7 +163,7 @@ private String buildInventoryDumpSQL() {
}
PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>) dumperConfig.getPosition();
PipelineColumnMetaData firstColumn = dumperConfig.getUniqueKeyColumns().get(0);
List<String> columnNames = dumperConfig.getColumnNames(logicTableName).orElse(Collections.singletonList("*"));
Collection<String> columnNames = dumperConfig.getColumnNames(logicTableName);
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
if (null != primaryKeyPosition.getBeginValue() && null != primaryKeyPosition.getEndValue()) {
return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@

import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

/**
* MySQL incremental dumper.
Expand Down Expand Up @@ -161,13 +161,13 @@ private PipelineTableMetaData getPipelineTableMetaData(final String actualTableN
}

private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) {
Set<ColumnName> columnNameSet = dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
Collection<ColumnName> columnNames = dumperConfig.getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getAfterRows()) {
DataRecord dataRecord = createDataRecord(IngestDataChangeType.INSERT, event, each.length);
for (int i = 0; i < each.length; i++) {
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
continue;
}
dataRecord.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, columnMetaData.isUniqueKey()));
Expand All @@ -177,12 +177,12 @@ private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event, final
return result;
}

private boolean isColumnUnneeded(final Set<ColumnName> columnNameSet, final String columnName) {
return null != columnNameSet && !columnNameSet.contains(new ColumnName(columnName));
private boolean isColumnUnneeded(final Collection<ColumnName> columnNames, final String columnName) {
return !columnNames.isEmpty() && !columnNames.contains(new ColumnName(columnName));
}

private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent event, final PipelineTableMetaData tableMetaData) {
Set<ColumnName> columnNameSet = dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
Collection<ColumnName> columnNames = dumperConfig.getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (int i = 0; i < event.getBeforeRows().size(); i++) {
Serializable[] beforeValues = event.getBeforeRows().get(i);
Expand All @@ -193,7 +193,7 @@ private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent event, fina
Serializable newValue = afterValues[j];
boolean updated = !Objects.equals(newValue, oldValue);
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(j + 1);
if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
continue;
}
dataRecord.addColumn(new Column(columnMetaData.getName(),
Expand All @@ -206,13 +206,13 @@ private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent event, fina
}

private List<DataRecord> handleDeleteRowsEvent(final DeleteRowsEvent event, final PipelineTableMetaData tableMetaData) {
Set<ColumnName> columnNameSet = dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
Collection<ColumnName> columnNames = dumperConfig.getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getBeforeRows()) {
DataRecord dataRecord = createDataRecord(IngestDataChangeType.DELETE, event, each.length);
for (int i = 0, length = each.length; i < length; i++) {
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
continue;
}
dataRecord.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), null, true, columnMetaData.isUniqueKey()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
import org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;

import java.util.Collection;
import java.util.List;
import java.util.Set;

/**
* WAL event converter.
Expand Down Expand Up @@ -128,10 +128,10 @@ private DataRecord createDataRecord(final String type, final AbstractRowEvent ro
}

private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final String actualTableName, final List<Object> values) {
Set<ColumnName> columnNameSet = dumperConfig.getColumnNameSet(actualTableName).orElse(null);
Collection<ColumnName> columnNames = dumperConfig.getColumnNames(actualTableName);
for (int i = 0, count = values.size(); i < count; i++) {
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
continue;
}
boolean isUniqueKey = columnMetaData.isUniqueKey();
Expand All @@ -141,7 +141,7 @@ private void putColumnsIntoDataRecord(final DataRecord dataRecord, final Pipelin
}
}

private boolean isColumnUnneeded(final Set<ColumnName> columnNameSet, final String columnName) {
return null != columnNameSet && !columnNameSet.contains(new ColumnName(columnName));
private boolean isColumnUnneeded(final Collection<ColumnName> columnNames, final String columnName) {
return !columnNames.isEmpty() && !columnNames.contains(new ColumnName(columnName));
}
}

0 comments on commit 8c8a552

Please sign in to comment.