From e6b29d7d3c191e2692811675aa89f81fa43a7bcc Mon Sep 17 00:00:00 2001 From: Hongsheng Zhong Date: Wed, 21 Aug 2024 10:26:04 +0800 Subject: [PATCH] Refactor inventory dumper and data consistency streaming query to page query (#32607) * Refactor inventory dumper and data consistency streaming query to page query * Fix spotless --- .../DataConsistencyCheckUtils.java | 55 ++++- ...dSingleTableInventoryCalculatedResult.java | 5 +- .../table/TableInventoryCheckParameter.java | 6 +- .../table/calculator/CalculationContext.java | 16 ++ .../RecordSingleTableInventoryCalculator.java | 135 +++++++++-- ...ingleTableInventoryCalculateParameter.java | 112 ++++++++- .../dumper/inventory/InventoryDumper.java | 218 ++++++++++++++---- .../inventory/InventoryDumperContext.java | 2 + .../inventory/InventoryQueryParameter.java | 56 +++++ .../ingest/dumper/inventory/QueryRange.java | 35 +++ .../ingest/dumper/inventory/QueryType.java | 26 +++ .../dialect/DialectPipelineSQLBuilder.java | 10 + ...ineDataConsistencyCalculateSQLBuilder.java | 78 ++++++- .../sql/PipelineImportSQLBuilder.java | 2 +- .../sql/PipelineInventoryDumpSQLBuilder.java | 43 +++- .../sql/PipelinePrepareSQLBuilder.java | 2 +- .../pipeline/core/util/DatabaseTypeUtils.java | 50 ++++ ...ataConsistencyCalculateSQLBuilderTest.java | 75 +++++- .../PipelineInventoryDumpSQLBuilderTest.java | 23 +- ...ordSingleTableInventoryCalculatorTest.java | 117 ++++++++-- 20 files changed, 939 insertions(+), 127 deletions(-) create mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryQueryParameter.java create mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/QueryRange.java create mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/QueryType.java create mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/DatabaseTypeUtils.java diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java index f78eb54f97216..64cd41c3d87b8 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java @@ -23,7 +23,9 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.builder.EqualsBuilder; +import javax.annotation.Nullable; import java.math.BigDecimal; +import java.math.BigInteger; import java.math.RoundingMode; import java.sql.Array; import java.sql.SQLException; @@ -76,30 +78,49 @@ public static boolean recordsEquals(final Map thisRecord, final @SneakyThrows(SQLException.class) public static boolean isMatched(final EqualsBuilder equalsBuilder, final Object thisColumnValue, final Object thatColumnValue) { equalsBuilder.reset(); - if (isInteger(thisColumnValue) && isInteger(thatColumnValue)) { - return isIntegerEquals((Number) thisColumnValue, (Number) thatColumnValue); + if (thisColumnValue instanceof Number && thatColumnValue instanceof Number) { + return isNumberEquals((Number) thisColumnValue, (Number) thatColumnValue); } if (thisColumnValue instanceof SQLXML && thatColumnValue instanceof SQLXML) { return ((SQLXML) thisColumnValue).getString().equals(((SQLXML) thatColumnValue).getString()); } - if (thisColumnValue instanceof BigDecimal && thatColumnValue instanceof BigDecimal) { - return isBigDecimalEquals((BigDecimal) thisColumnValue, (BigDecimal) thatColumnValue); - } if (thisColumnValue instanceof Array && thatColumnValue instanceof Array) { return Objects.deepEquals(((Array) thisColumnValue).getArray(), ((Array) thatColumnValue).getArray()); } return equalsBuilder.append(thisColumnValue, thatColumnValue).isEquals(); } - private static boolean isInteger(final Object value) { - if (!(value instanceof Number)) { - return false; + private static boolean isNumberEquals(final Number one, final Number another) { + if (isInteger(one) && isInteger(another)) { + return one.longValue() == another.longValue(); } + return isBigDecimalEquals(convertToBigDecimal(one), convertToBigDecimal(another)); + } + + private static boolean isInteger(final Number value) { return value instanceof Long || value instanceof Integer || value instanceof Short || value instanceof Byte; } - private static boolean isIntegerEquals(final Number one, final Number another) { - return one.longValue() == another.longValue(); + /** + * Convert number to BigDecimal. + * + * @param value number + * @return BigDecimal + */ + public static BigDecimal convertToBigDecimal(final Number value) { + if (value instanceof BigDecimal) { + return (BigDecimal) value; + } + if (isInteger(value)) { + return BigDecimal.valueOf(value.longValue()); + } + if (value instanceof Float || value instanceof Double) { + return BigDecimal.valueOf(value.doubleValue()); + } + if (value instanceof BigInteger) { + return new BigDecimal((BigInteger) value); + } + return new BigDecimal(value.toString()); } /** @@ -128,4 +149,18 @@ public static boolean isBigDecimalEquals(final BigDecimal one, final BigDecimal } return 0 == decimalOne.compareTo(decimalTwo); } + + /** + * Get first unique key value. + * + * @param rawRecord raw record + * @param uniqueKey unique key + * @return first unique key value + */ + public static Object getFirstUniqueKeyValue(final Map rawRecord, final @Nullable String uniqueKey) { + if (rawRecord.isEmpty() || null == uniqueKey) { + return null; + } + return rawRecord.get(uniqueKey); + } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java index 2e3aa64ac7193..9a92206ef9efa 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java @@ -67,8 +67,9 @@ public boolean equals(final Object o) { final RecordSingleTableInventoryCalculatedResult that = (RecordSingleTableInventoryCalculatedResult) o; EqualsBuilder equalsBuilder = new EqualsBuilder(); if (recordsCount != that.recordsCount || !DataConsistencyCheckUtils.isMatched(equalsBuilder, maxUniqueKeyValue, that.maxUniqueKeyValue)) { - log.warn("Record count or max unique key value not match, recordCount1={}, recordCount2={}, maxUniqueKeyValue1={}, maxUniqueKeyValue2={}.", - recordsCount, that.recordsCount, maxUniqueKeyValue, that.maxUniqueKeyValue); + log.warn("Record count or max unique key value not match, recordCount1={}, recordCount2={}, maxUniqueKeyValue1={}, maxUniqueKeyValue2={}, value1.class={}, value2.class={}.", + recordsCount, that.recordsCount, maxUniqueKeyValue, that.maxUniqueKeyValue, + null == maxUniqueKeyValue ? "" : maxUniqueKeyValue.getClass().getName(), null == that.maxUniqueKeyValue ? "" : that.maxUniqueKeyValue.getClass().getName()); return false; } Iterator> thisRecordsIterator = records.iterator(); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java index c12a4baba0792..76300d155889a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java @@ -19,11 +19,11 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable; -import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; +import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper; +import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm; +import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable; import java.util.List; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java index 30b02a7446e4e..f0297044bf8f3 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java @@ -22,6 +22,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** @@ -35,6 +36,8 @@ public final class CalculationContext implements AutoCloseable { private final AtomicReference resultSet = new AtomicReference<>(); + private final AtomicBoolean closed = new AtomicBoolean(false); + /** * Get connection. * @@ -80,10 +83,23 @@ public void setResultSet(final ResultSet resultSet) { this.resultSet.set(resultSet); } + /** + * Is closed. + * + * @return closed or not + */ + public boolean isClosed() { + return closed.get(); + } + @Override public void close() { + closed.set(true); QuietlyCloser.close(resultSet.get()); QuietlyCloser.close(preparedStatement.get()); QuietlyCloser.close(connection.get()); + resultSet.set(null); + preparedStatement.set(null); + connection.set(null); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java index 493d4295c6d1a..79e9afec6e31a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java @@ -18,10 +18,13 @@ package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator; import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCheckUtils; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordSingleTableInventoryCalculatedResult; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult; import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException; import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryRange; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryType; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine; import org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineDataConsistencyCalculateSQLBuilder; @@ -42,6 +45,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; /** @@ -55,10 +59,37 @@ public final class RecordSingleTableInventoryCalculator extends AbstractStreamin @Override public Optional calculateChunk(final SingleTableInventoryCalculateParameter param) { - CalculationContext calculationContext = getOrCreateCalculationContext(param); - try { - List> records = new LinkedList<>(); - Object maxUniqueKeyValue = null; + List> records = calculateChunk0(param, QueryType.RANGE_QUERY == param.getQueryType()); + if (records.isEmpty()) { + return Optional.empty(); + } + String firstUniqueKey = param.getFirstUniqueKey().getName(); + if (QueryType.POINT_QUERY == param.getQueryType()) { + return convertRecordsToResult(records, firstUniqueKey); + } + if (records.size() == chunkSize) { + Object minUniqueKeyValue = DataConsistencyCheckUtils.getFirstUniqueKeyValue(records.get(0), firstUniqueKey); + removeLastRecords(records, param); + if (!records.isEmpty()) { + updateQueryRangeLower(param, records, firstUniqueKey); + return convertRecordsToResult(records, firstUniqueKey); + } + SingleTableInventoryCalculateParameter newParam = buildNewCalculateParameter(param, minUniqueKeyValue); + records = calculateChunk0(newParam, false); + if (!records.isEmpty()) { + updateQueryRangeLower(param, records, firstUniqueKey); + return convertRecordsToResult(records, firstUniqueKey); + } + return Optional.empty(); + } else { + updateQueryRangeLower(param, records, firstUniqueKey); + return convertRecordsToResult(records, firstUniqueKey); + } + } + + private List> calculateChunk0(final SingleTableInventoryCalculateParameter param, final boolean isRangeQuery) { + try (CalculationContext calculationContext = getOrCreateCalculationContext(param)) { + List> result = new LinkedList<>(); InventoryColumnValueReaderEngine columnValueReaderEngine = new InventoryColumnValueReaderEngine(param.getDatabaseType()); ResultSet resultSet = calculationContext.getResultSet(); ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); @@ -69,30 +100,24 @@ public Optional calculateChunk(final Singl for (int columnIndex = 1, columnCount = resultSetMetaData.getColumnCount(); columnIndex <= columnCount; columnIndex++) { columnRecord.put(resultSetMetaData.getColumnLabel(columnIndex), columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex)); } - records.add(columnRecord); - maxUniqueKeyValue = columnValueReaderEngine.read(resultSet, resultSetMetaData, param.getFirstUniqueKey().getOrdinalPosition()); - if (records.size() == chunkSize) { + result.add(columnRecord); + if (isRangeQuery && result.size() == chunkSize) { break; } } - if (records.isEmpty()) { - calculationContext.close(); - } - return records.isEmpty() ? Optional.empty() : Optional.of(new RecordSingleTableInventoryCalculatedResult(maxUniqueKeyValue, records)); + return result; } catch (final PipelineSQLException | PipelineJobCancelingException ex) { - calculationContext.close(); throw ex; // CHECKSTYLE:OFF } catch (final SQLException | RuntimeException ex) { // CHECKSTYLE:ON - calculationContext.close(); throw new PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), param.getLogicTableName(), ex); } } private CalculationContext getOrCreateCalculationContext(final SingleTableInventoryCalculateParameter param) { CalculationContext result = (CalculationContext) param.getCalculationContext(); - if (null != result) { + if (null != result && !result.isClosed()) { return result; } try { @@ -134,14 +159,86 @@ private String getQuerySQL(final SingleTableInventoryCalculateParameter param) { } PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder = new PipelineDataConsistencyCalculateSQLBuilder(param.getDatabaseType()); Collection columnNames = param.getColumnNames().isEmpty() ? Collections.singleton("*") : param.getColumnNames(); - boolean firstQuery = null == param.getTableCheckPosition(); - return pipelineSQLBuilder.buildQueryAllOrderingSQL(param.getSchemaName(), param.getLogicTableName(), columnNames, param.getFirstUniqueKey().getName(), firstQuery); + switch (param.getQueryType()) { + case RANGE_QUERY: + return pipelineSQLBuilder.buildQueryRangeOrderingSQL(param.getSchemaName(), param.getLogicTableName(), + columnNames, param.getUniqueKeysNames(), param.getQueryRange(), param.getShardingColumnsNames()); + case POINT_QUERY: + return pipelineSQLBuilder.buildPointQuerySQL(param.getSchemaName(), param.getLogicTableName(), columnNames, param.getUniqueKeysNames(), param.getShardingColumnsNames()); + default: + throw new UnsupportedOperationException("Query type: " + param.getQueryType()); + } } private void setParameters(final PreparedStatement preparedStatement, final SingleTableInventoryCalculateParameter param) throws SQLException { - Object tableCheckPosition = param.getTableCheckPosition(); - if (null != tableCheckPosition) { - preparedStatement.setObject(1, tableCheckPosition); + QueryType queryType = param.getQueryType(); + if (queryType == QueryType.RANGE_QUERY) { + QueryRange queryRange = param.getQueryRange(); + ShardingSpherePreconditions.checkNotNull(queryRange, () -> new PipelineTableDataConsistencyCheckLoadingFailedException( + param.getSchemaName(), param.getLogicTableName(), new RuntimeException("Unique keys values range is null"))); + int parameterIndex = 1; + if (null != queryRange.getLower()) { + preparedStatement.setObject(parameterIndex++, queryRange.getLower()); + } + if (null != queryRange.getUpper()) { + preparedStatement.setObject(parameterIndex++, queryRange.getUpper()); + } + preparedStatement.setObject(parameterIndex, chunkSize); + } else if (queryType == QueryType.POINT_QUERY) { + Collection uniqueKeysValues = param.getUniqueKeysValues(); + ShardingSpherePreconditions.checkNotNull(uniqueKeysValues, () -> new PipelineTableDataConsistencyCheckLoadingFailedException( + param.getSchemaName(), param.getLogicTableName(), new RuntimeException("Unique keys values is null"))); + int parameterIndex = 1; + for (Object each : uniqueKeysValues) { + preparedStatement.setObject(parameterIndex++, each); + } + if (null != param.getShardingColumnsNames() && !param.getShardingColumnsNames().isEmpty()) { + List shardingColumnsValues = param.getShardingColumnsValues(); + ShardingSpherePreconditions.checkNotNull(shardingColumnsValues, () -> new PipelineTableDataConsistencyCheckLoadingFailedException( + param.getSchemaName(), param.getLogicTableName(), new RuntimeException("Sharding columns values is null when names not empty"))); + for (Object each : shardingColumnsValues) { + preparedStatement.setObject(parameterIndex++, each); + } + } + } else { + throw new UnsupportedOperationException("Query type: " + queryType); + } + } + + private void removeLastRecords(final List> records, final SingleTableInventoryCalculateParameter param) { + Object minUniqueKeyValue = DataConsistencyCheckUtils.getFirstUniqueKeyValue(records.get(0), param.getFirstUniqueKey().getName()); + Object maxUniqueKeyValue = DataConsistencyCheckUtils.getFirstUniqueKeyValue(records.get(records.size() - 1), param.getFirstUniqueKey().getName()); + if (Objects.equals(minUniqueKeyValue, maxUniqueKeyValue)) { + records.clear(); + return; + } + records.remove(records.size() - 1); + for (int i = records.size() - 1; i >= 0; i--) { + if (Objects.deepEquals(maxUniqueKeyValue, DataConsistencyCheckUtils.getFirstUniqueKeyValue(records.get(i), param.getFirstUniqueKey().getName()))) { + records.remove(i); + } else { + break; + } } } + + private Optional convertRecordsToResult(final List> records, final String firstUniqueKey) { + Object maxUniqueKeyValue = DataConsistencyCheckUtils.getFirstUniqueKeyValue(records.get(records.size() - 1), firstUniqueKey); + return Optional.of(new RecordSingleTableInventoryCalculatedResult(maxUniqueKeyValue, records)); + } + + private SingleTableInventoryCalculateParameter buildNewCalculateParameter(final SingleTableInventoryCalculateParameter param, final Object maxUniqueKeyValue) { + SingleTableInventoryCalculateParameter result = new SingleTableInventoryCalculateParameter(param.getDataSource(), param.getTable(), param.getColumnNames(), + Collections.singletonList(param.getFirstUniqueKey()), QueryType.POINT_QUERY); + result.setUniqueKeysValues(Collections.singletonList(maxUniqueKeyValue)); + result.setQueryRange(param.getQueryRange()); + result.setShardingColumnsNames(param.getShardingColumnsNames()); + result.setShardingColumnsValues(param.getShardingColumnsValues()); + return result; + } + + private void updateQueryRangeLower(final SingleTableInventoryCalculateParameter param, final List> records, final String firstUniqueKey) { + Object maxUniqueKeyValue = DataConsistencyCheckUtils.getFirstUniqueKeyValue(records.get(records.size() - 1), firstUniqueKey); + param.setQueryRange(new QueryRange(maxUniqueKeyValue, false, param.getQueryRange().getUpper())); + } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java index b319faaad1ff9..9614b971b16da 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java @@ -19,13 +19,18 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable; -import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryRange; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryType; +import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable; +import javax.annotation.Nullable; +import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; /** * Single table inventory calculate parameter. @@ -50,10 +55,28 @@ public final class SingleTableInventoryCalculateParameter { */ private final List uniqueKeys; - private final Object tableCheckPosition; - private final AtomicReference calculationContext = new AtomicReference<>(); + private final AtomicReference> uniqueKeysValues = new AtomicReference<>(); + + private final AtomicReference uniqueKeysValuesRange = new AtomicReference<>(); + + private final AtomicReference> shardingColumnsNames = new AtomicReference<>(); + + private final AtomicReference> shardingColumnsValues = new AtomicReference<>(); + + private final QueryType queryType; + + public SingleTableInventoryCalculateParameter(final PipelineDataSourceWrapper dataSource, final CaseInsensitiveQualifiedTable table, final List columnNames, + final List uniqueKeys, final Object tableCheckPosition) { + this.dataSource = dataSource; + this.table = table; + this.columnNames = columnNames; + this.uniqueKeys = uniqueKeys; + queryType = QueryType.RANGE_QUERY; + setQueryRange(new QueryRange(tableCheckPosition, false, null)); + } + /** * Get database type. * @@ -107,4 +130,85 @@ public AutoCloseable getCalculationContext() { public void setCalculationContext(final AutoCloseable calculationContext) { this.calculationContext.set(calculationContext); } + + /** + * Get unique keys names. + * + * @return unique keys names + */ + public List getUniqueKeysNames() { + return uniqueKeys.stream().map(PipelineColumnMetaData::getName).collect(Collectors.toList()); + } + + /** + * Get unique keys values. + * + * @return unique keys values + */ + public Collection getUniqueKeysValues() { + return uniqueKeysValues.get(); + } + + /** + * Set unique keys values. + * + * @param uniqueKeysValues unique keys values + */ + public void setUniqueKeysValues(final Collection uniqueKeysValues) { + this.uniqueKeysValues.set(uniqueKeysValues); + } + + /** + * Get query range. + * + * @return query range + */ + public QueryRange getQueryRange() { + return uniqueKeysValuesRange.get(); + } + + /** + * Set query range. + * + * @param queryRange query range + */ + public void setQueryRange(final QueryRange queryRange) { + this.uniqueKeysValuesRange.set(queryRange); + } + + /** + * Get sharding columns names. + * + * @return sharding columns names + */ + public @Nullable List getShardingColumnsNames() { + return shardingColumnsNames.get(); + } + + /** + * Set sharding columns names. + * + * @param shardingColumnsNames sharding columns names + */ + public void setShardingColumnsNames(final List shardingColumnsNames) { + this.shardingColumnsNames.set(shardingColumnsNames); + } + + /** + * Get sharding columns values. + * + * @return sharding columns values + */ + public @Nullable List getShardingColumnsValues() { + return shardingColumnsValues.get(); + } + + /** + * Set sharding columns values. + * + * @param shardingColumnsValues sharding columns values + */ + public void setShardingColumnsValues(final List shardingColumnsValues) { + this.shardingColumnsValues.set(shardingColumnsValues); + } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java index cfb82e7dcfc17..7b62d5eadd872 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java @@ -21,11 +21,12 @@ import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType; import org.apache.shardingsphere.data.pipeline.core.exception.IngestException; +import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable; -import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine; import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition; @@ -43,11 +44,12 @@ import org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder; import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineInventoryDumpSQLBuilder; +import org.apache.shardingsphere.data.pipeline.core.util.DatabaseTypeUtils; import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils; import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; -import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; +import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier; import javax.sql.DataSource; import java.sql.Connection; @@ -56,19 +58,21 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; -import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Objects; import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; /** * Inventory dumper. */ @HighFrequencyInvocation @Slf4j -public final class InventoryDumper extends AbstractPipelineLifecycleRunnable implements Dumper { +public class InventoryDumper extends AbstractPipelineLifecycleRunnable implements Dumper { @Getter(AccessLevel.PROTECTED) private final InventoryDumperContext dumperContext; @@ -85,6 +89,11 @@ public final class InventoryDumper extends AbstractPipelineLifecycleRunnable imp private final AtomicReference runningStatement = new AtomicReference<>(); + private PipelineTableMetaData tableMetaData; + + // TODO now Remove + private List uniqueKeysNames = Collections.emptyList(); + public InventoryDumper(final InventoryDumperContext dumperContext, final PipelineChannel channel, final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) { this.dumperContext = dumperContext; this.channel = channel; @@ -102,29 +111,58 @@ protected void runBlocking() { log.info("Ignored because of already finished."); return; } - PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData( - dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName()); + init(); try (Connection connection = dataSource.getConnection()) { - dump(tableMetaData, connection); - } catch (final SQLException ex) { - log.error("Inventory dump, ex caught, msg={}.", ex.getMessage()); + if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL()) || !dumperContext.hasUniqueKey() + || position instanceof PrimaryKeyIngestPosition && null == ((PrimaryKeyIngestPosition) position).getBeginValue() + && null == ((PrimaryKeyIngestPosition) position).getEndValue()) { + dumpWithStreamingQuery(connection); + } else { + dumpPageByPage(connection); + } + // CHECKSTYLE:OFF + } catch (final SQLException | RuntimeException ex) { + // CHECKSTYLE:ON + log.error("Inventory dump failed on {}", dumperContext.getActualTableName(), ex); throw new IngestException("Inventory dump failed on " + dumperContext.getActualTableName(), ex); } } + /** + * Initialize. + */ + public void init() { + if (null == uniqueKeysNames) { + uniqueKeysNames = getUniqueKeysNames(); + } + if (null == tableMetaData) { + tableMetaData = metaDataLoader.getTableMetaData( + dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName()); + } + } + + private List getUniqueKeysNames() { + if (dumperContext.hasUniqueKey()) { + return dumperContext.getUniqueKeyColumns().stream().map(each -> new CaseInsensitiveIdentifier(each.getName())).collect(Collectors.toList()); + } + return Collections.emptyList(); + } + @SuppressWarnings("MagicConstant") - private void dump(final PipelineTableMetaData tableMetaData, final Connection connection) throws SQLException { + private void dumpWithStreamingQuery(final Connection connection) throws SQLException { int batchSize = dumperContext.getBatchSize(); DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType(); if (null != dumperContext.getTransactionIsolation()) { connection.setTransactionIsolation(dumperContext.getTransactionIsolation()); } - try (PreparedStatement preparedStatement = JDBCStreamQueryBuilder.build(databaseType, connection, buildInventoryDumpSQL())) { + try (PreparedStatement preparedStatement = JDBCStreamQueryBuilder.build(databaseType, connection, buildInventoryDumpSQLWithStreamingQuery())) { runningStatement.set(preparedStatement); - if (!(databaseType instanceof MySQLDatabaseType)) { + if (!DatabaseTypeUtils.isMySQL(databaseType)) { preparedStatement.setFetchSize(batchSize); } - setParameters(preparedStatement); + PrimaryKeyIngestPosition primaryPosition = (PrimaryKeyIngestPosition) dumperContext.getCommonContext().getPosition(); + InventoryQueryParameter queryParam = InventoryQueryParameter.buildForRangeQuery(new QueryRange(primaryPosition.getBeginValue(), true, primaryPosition.getEndValue())); + setParameters(preparedStatement, queryParam, true); try (ResultSet resultSet = preparedStatement.executeQuery()) { int rowCount = 0; JobRateLimitAlgorithm rateLimitAlgorithm = dumperContext.getRateLimitAlgorithm(); @@ -135,7 +173,7 @@ private void dump(final PipelineTableMetaData tableMetaData, final Connection co channel.push(dataRecords); dataRecords = new LinkedList<>(); } - dataRecords.add(loadDataRecord(resultSet, resultSetMetaData, tableMetaData)); + dataRecords.add(loadDataRecord(resultSet, resultSetMetaData)); ++rowCount; if (!isRunning()) { log.info("Broke because of inventory dump is not running."); @@ -147,72 +185,166 @@ private void dump(final PipelineTableMetaData tableMetaData, final Connection co } dataRecords.add(new FinishedRecord(new IngestFinishedPosition())); channel.push(dataRecords); - log.info("Inventory dump done, rowCount={}, dataSource={}, actualTable={}", rowCount, dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName()); + log.info("Inventory dump with streaming query done, rowCount={}, dataSource={}, actualTable={}", + rowCount, dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName()); } finally { runningStatement.set(null); } } } - private String buildInventoryDumpSQL() { + private String buildInventoryDumpSQLWithStreamingQuery() { if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) { return dumperContext.getQuerySQL(); } String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); - if (!dumperContext.hasUniqueKey()) { - return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName()); + List columnNames = getQueryColumnNames(); + return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName(), columnNames); + } + + @SuppressWarnings("MagicConstant") + private void dumpPageByPage(final Connection connection) throws SQLException { + if (null != dumperContext.getTransactionIsolation()) { + connection.setTransactionIsolation(dumperContext.getTransactionIsolation()); } - PrimaryKeyIngestPosition primaryKeyPosition = (PrimaryKeyIngestPosition) dumperContext.getCommonContext().getPosition(); - PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0); - Collection columnNames = Collections.singleton("*"); - if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) { - if (null != primaryKeyPosition.getBeginValue() && null != primaryKeyPosition.getEndValue()) { - return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName()); + boolean firstQuery = true; + AtomicLong rowCount = new AtomicLong(); + IngestPosition position = dumperContext.getCommonContext().getPosition(); + while (true) { + PrimaryKeyIngestPosition primaryPosition = (PrimaryKeyIngestPosition) position; + InventoryQueryParameter queryParam = InventoryQueryParameter.buildForRangeQuery(new QueryRange(primaryPosition.getBeginValue(), firstQuery, primaryPosition.getEndValue())); + List dataRecords = dumpPageByPage0(connection, queryParam, rowCount); + if (dataRecords.size() > 1 && Objects.deepEquals(getFirstUniqueKeyValue(dataRecords, 0), getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1))) { + queryParam = InventoryQueryParameter.buildForPointQuery(getFirstUniqueKeyValue(dataRecords, 0)); + dataRecords = dumpPageByPage0(connection, queryParam, rowCount); + } + firstQuery = false; + if (dataRecords.isEmpty()) { + position = new IngestFinishedPosition(); + dataRecords.add(new FinishedRecord(position)); + log.info("Inventory dump done, rowCount={}, dataSource={}, actualTable={}", rowCount, dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName()); + } else { + position = PrimaryKeyIngestPositionFactory.newInstance(getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1), primaryPosition.getEndValue()); } - if (null != primaryKeyPosition.getBeginValue() && null == primaryKeyPosition.getEndValue()) { - return inventoryDumpSQLBuilder.buildUnlimitedDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName()); + channel.push(dataRecords); + dumperContext.getCommonContext().setPosition(position); + if (position instanceof IngestFinishedPosition) { + break; } } - return inventoryDumpSQLBuilder.buildIndivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName()); } - private void setParameters(final PreparedStatement preparedStatement) throws SQLException { - if (!dumperContext.hasUniqueKey()) { - return; + private Object getFirstUniqueKeyValue(final List dataRecords, final int index) { + return ((DataRecord) dataRecords.get(index)).getUniqueKeyValue().iterator().next(); + } + + private List dumpPageByPage0(final Connection connection, final InventoryQueryParameter queryParam, + final AtomicLong rowCount) throws SQLException { + DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType(); + int batchSize = dumperContext.getBatchSize(); + try (PreparedStatement preparedStatement = JDBCStreamQueryBuilder.build(databaseType, connection, buildInventoryDumpPageByPageSQL(queryParam))) { + runningStatement.set(preparedStatement); + if (!DatabaseTypeUtils.isMySQL(databaseType)) { + preparedStatement.setFetchSize(batchSize); + } + setParameters(preparedStatement, queryParam, false); + try (ResultSet resultSet = preparedStatement.executeQuery()) { + JobRateLimitAlgorithm rateLimitAlgorithm = dumperContext.getRateLimitAlgorithm(); + ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); + List result = new LinkedList<>(); + while (resultSet.next()) { + if (result.size() >= batchSize) { + if (!dumperContext.hasUniqueKey()) { + channel.push(result); + } + result = new LinkedList<>(); + } + result.add(loadDataRecord(resultSet, resultSetMetaData)); + rowCount.incrementAndGet(); + if (!isRunning()) { + log.info("Broke because of inventory dump is not running."); + break; + } + if (null != rateLimitAlgorithm && 0 == rowCount.get() % batchSize) { + rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT, 1); + } + } + return result; + } finally { + runningStatement.set(null); + } } + } + + private String buildInventoryDumpPageByPageSQL(final InventoryQueryParameter queryParam) { + String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0); - PrimaryKeyIngestPosition position = (PrimaryKeyIngestPosition) dumperContext.getCommonContext().getPosition(); - if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) && null != position.getBeginValue() && null != position.getEndValue()) { - preparedStatement.setObject(1, position.getBeginValue()); - preparedStatement.setObject(2, position.getEndValue()); + List columnNames = getQueryColumnNames(); + if (QueryType.POINT_QUERY == queryParam.getQueryType()) { + return inventoryDumpSQLBuilder.buildPointQuerySQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName()); + } + QueryRange queryRange = queryParam.getUniqueKeyValueRange(); + boolean lowerInclusive = queryRange.isLowerInclusive(); + if (null != queryRange.getLower() && null != queryRange.getUpper()) { + return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive); + } + if (null != queryRange.getLower()) { + return inventoryDumpSQLBuilder.buildUnlimitedDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive); + } + throw new PipelineInternalException("Primary key position is invalid."); + } + + private List getQueryColumnNames() { + return Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.singletonList("*")); + } + + private void setParameters(final PreparedStatement preparedStatement, final InventoryQueryParameter queryParam, final boolean streamingQuery) throws SQLException { + if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) { + for (int i = 0; i < dumperContext.getQueryParams().size(); i++) { + preparedStatement.setObject(i + 1, dumperContext.getQueryParams().get(i)); + } return; } - if (PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) { - if (null != position.getBeginValue()) { - preparedStatement.setObject(1, position.getBeginValue()); + if (!dumperContext.hasUniqueKey()) { + return; + } + int parameterIndex = 1; + if (QueryType.RANGE_QUERY == queryParam.getQueryType()) { + Object lower = queryParam.getUniqueKeyValueRange().getLower(); + if (null != lower) { + preparedStatement.setObject(parameterIndex++, lower); + } + Object upper = queryParam.getUniqueKeyValueRange().getUpper(); + if (null != upper) { + preparedStatement.setObject(parameterIndex++, upper); } - if (null != position.getEndValue()) { - preparedStatement.setObject(2, position.getEndValue()); + if (!streamingQuery) { + preparedStatement.setInt(parameterIndex, dumperContext.getBatchSize()); } + } else if (QueryType.POINT_QUERY == queryParam.getQueryType()) { + preparedStatement.setObject(parameterIndex, queryParam.getUniqueKeyValue()); + } else { + throw new UnsupportedOperationException("Query type: " + queryParam.getQueryType()); } } - private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData) throws SQLException { + private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData) throws SQLException { int columnCount = resultSetMetaData.getColumnCount(); - DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, dumperContext.getLogicTableName(), newPosition(resultSet), columnCount); + DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, dumperContext.getLogicTableName(), newDataRecordPosition(resultSet), columnCount); List insertColumnNames = Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList()); ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || insertColumnNames.size() == resultSetMetaData.getColumnCount(), - () -> new PipelineInvalidParameterException("Insert colum names count not equals ResultSet column count")); + () -> new PipelineInvalidParameterException("Insert column names count not equals ResultSet column count")); for (int i = 1; i <= columnCount; i++) { String columnName = insertColumnNames.isEmpty() ? resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1); ShardingSpherePreconditions.checkNotNull(tableMetaData.getColumnMetaData(columnName), () -> new PipelineInvalidParameterException(String.format("Column name is %s", columnName))); boolean isUniqueKey = tableMetaData.getColumnMetaData(columnName).isUniqueKey(); result.addColumn(new Column(columnName, columnValueReaderEngine.read(resultSet, resultSetMetaData, i), true, isUniqueKey)); } + result.setActualTableName(dumperContext.getActualTableName()); return result; } - private IngestPosition newPosition(final ResultSet resultSet) throws SQLException { + protected IngestPosition newDataRecordPosition(final ResultSet resultSet) throws SQLException { return dumperContext.hasUniqueKey() ? PrimaryKeyIngestPositionFactory.newInstance( resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyIngestPosition) dumperContext.getCommonContext().getPosition()).getEndValue()) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java index e5d3d33c83cc7..a3a0c580e6a48 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java @@ -46,6 +46,8 @@ public final class InventoryDumperContext { private String querySQL; + private List queryParams; + private Integer transactionIsolation; private int shardingItem; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryQueryParameter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryQueryParameter.java new file mode 100644 index 0000000000000..c43714f265db6 --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryQueryParameter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory; + +import lombok.AccessLevel; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * Inventory query parameter. + */ +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +@Getter +public final class InventoryQueryParameter { + + private final QueryType queryType; + + private final Object uniqueKeyValue; + + private final QueryRange uniqueKeyValueRange; + + /** + * Build for point query. + * + * @param uniqueKeyValue unique key value + * @return inventory query parameter + */ + public static InventoryQueryParameter buildForPointQuery(final Object uniqueKeyValue) { + return new InventoryQueryParameter(QueryType.POINT_QUERY, uniqueKeyValue, null); + } + + /** + * Build for range query. + * + * @param uniqueKeyValueRange unique key value range + * @return inventory query parameter + */ + public static InventoryQueryParameter buildForRangeQuery(final QueryRange uniqueKeyValueRange) { + return new InventoryQueryParameter(QueryType.RANGE_QUERY, null, uniqueKeyValueRange); + } +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/QueryRange.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/QueryRange.java new file mode 100644 index 0000000000000..f9ff1d1537e41 --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/QueryRange.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * Query range. + */ +@RequiredArgsConstructor +@Getter +public final class QueryRange { + + private final Object lower; + + private final boolean lowerInclusive; + + private final Object upper; +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/QueryType.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/QueryType.java new file mode 100644 index 0000000000000..6fecae89a293f --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/QueryType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory; + +/** + * Query type. + */ +public enum QueryType { + + RANGE_QUERY, POINT_QUERY +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java index db1c5418702ec..3c190ce57ba5b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java @@ -100,4 +100,14 @@ default Optional buildCRC32SQL(final String qualifiedTableName, final St default Optional buildQueryCurrentPositionSQL() { return Optional.empty(); } + + /** + * Wrap with page query. + * + * @param sql SQL + * @return wrapped SQL + */ + default String wrapWithPageQuery(String sql) { + return sql + " LIMIT ?"; + } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java index c3a6f313fca8c..4064502548d84 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java @@ -17,12 +17,16 @@ package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryRange; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder; -import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; +import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -41,22 +45,78 @@ public PipelineDataConsistencyCalculateSQLBuilder(final DatabaseType databaseTyp } /** - * Build query all ordering SQL. + * Build query range ordering SQL. * * @param schemaName schema name * @param tableName table name * @param columnNames column names - * @param uniqueKey unique key, it may be primary key, not null - * @param firstQuery first query + * @param uniqueKeys unique keys, it may be primary key, not null + * @param queryRange query range + * @param shardingColumnsNames sharding columns names * @return built SQL */ - public String buildQueryAllOrderingSQL(final String schemaName, final String tableName, final Collection columnNames, final String uniqueKey, final boolean firstQuery) { + public String buildQueryRangeOrderingSQL(final String schemaName, final String tableName, final Collection columnNames, final List uniqueKeys, final QueryRange queryRange, + @Nullable final List shardingColumnsNames) { + return dialectSQLBuilder.wrapWithPageQuery(buildQueryRangeOrderingSQL0(schemaName, tableName, columnNames, uniqueKeys, queryRange, shardingColumnsNames)); + } + + private String buildQueryRangeOrderingSQL0(final String schemaName, final String tableName, final Collection columnNames, final List uniqueKeys, final QueryRange queryRange, + @Nullable final List shardingColumnsNames) { String qualifiedTableName = sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName); - String escapedUniqueKey = sqlSegmentBuilder.getEscapedIdentifier(uniqueKey); String queryColumns = columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(",")); - return firstQuery - ? String.format("SELECT %s FROM %s ORDER BY %s ASC", queryColumns, qualifiedTableName, escapedUniqueKey) - : String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s ASC", queryColumns, qualifiedTableName, escapedUniqueKey, escapedUniqueKey); + String firstUniqueKey = uniqueKeys.get(0); + String orderByColumns = joinColumns(uniqueKeys, shardingColumnsNames).stream().map(each -> sqlSegmentBuilder.getEscapedIdentifier(each) + " ASC").collect(Collectors.joining(", ")); + if (null != queryRange.getLower() && null != queryRange.getUpper()) { + return String.format("SELECT %s FROM %s WHERE %s AND %s ORDER BY %s", queryColumns, qualifiedTableName, + buildLowerQueryRangeCondition(queryRange.isLowerInclusive(), firstUniqueKey), + buildUpperQueryRangeCondition(firstUniqueKey), orderByColumns); + } else if (null != queryRange.getLower()) { + return String.format("SELECT %s FROM %s WHERE %s ORDER BY %s", queryColumns, qualifiedTableName, + buildLowerQueryRangeCondition(queryRange.isLowerInclusive(), firstUniqueKey), orderByColumns); + } else if (null != queryRange.getUpper()) { + return String.format("SELECT %s FROM %s WHERE %s ORDER BY %s", queryColumns, qualifiedTableName, + buildUpperQueryRangeCondition(firstUniqueKey), orderByColumns); + } else { + return String.format("SELECT %s FROM %s ORDER BY %s", queryColumns, qualifiedTableName, orderByColumns); + } + } + + private String buildLowerQueryRangeCondition(final boolean inclusive, final String firstUniqueKey) { + String delimiter = inclusive ? ">=?" : ">?"; + return sqlSegmentBuilder.getEscapedIdentifier(firstUniqueKey) + delimiter; + } + + private String buildUpperQueryRangeCondition(final String firstUniqueKey) { + return sqlSegmentBuilder.getEscapedIdentifier(firstUniqueKey) + "<=?"; + } + + /** + * Build point query SQL. + * + * @param schemaName schema name + * @param tableName table name + * @param columnNames column names + * @param uniqueKeys unique keys, it may be primary key, not null + * @param shardingColumnsNames sharding columns names, nullable + * @return built SQL + */ + public String buildPointQuerySQL(final String schemaName, final String tableName, final Collection columnNames, final List uniqueKeys, + @Nullable final List shardingColumnsNames) { + String qualifiedTableName = sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName); + String queryColumns = columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(",")); + String equalsConditions = joinColumns(uniqueKeys, shardingColumnsNames).stream().map(each -> sqlSegmentBuilder.getEscapedIdentifier(each) + "=?").collect(Collectors.joining(" AND ")); + return String.format("SELECT %s FROM %s WHERE %s", queryColumns, qualifiedTableName, equalsConditions); + } + + private List joinColumns(final List uniqueKeys, final @Nullable List shardingColumnsNames) { + if (null == shardingColumnsNames || shardingColumnsNames.isEmpty()) { + return uniqueKeys; + } + // TODO Avoid new list creation + List result = new ArrayList<>(uniqueKeys.size() + shardingColumnsNames.size()); + result.addAll(uniqueKeys); + result.addAll(shardingColumnsNames); + return result; } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilder.java index d7c7e7861def7..f6474ad179ca7 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilder.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineImportSQLBuilder.java @@ -23,8 +23,8 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder; -import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; +import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import java.util.Collection; import java.util.Objects; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java index c53793fd9173a..2ba3a8ce85df2 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java @@ -17,7 +17,9 @@ package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql; +import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder; +import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import java.util.Collection; @@ -28,9 +30,12 @@ */ public final class PipelineInventoryDumpSQLBuilder { + private final DialectPipelineSQLBuilder dialectSQLBuilder; + private final PipelineSQLSegmentBuilder sqlSegmentBuilder; public PipelineInventoryDumpSQLBuilder(final DatabaseType databaseType) { + dialectSQLBuilder = DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, databaseType); sqlSegmentBuilder = new PipelineSQLSegmentBuilder(databaseType); } @@ -41,12 +46,15 @@ public PipelineInventoryDumpSQLBuilder(final DatabaseType databaseType) { * @param tableName table name * @param columnNames column names * @param uniqueKey unique key + * @param lowerInclusive lower inclusive or not * @return built SQL */ - public String buildDivisibleSQL(final String schemaName, final String tableName, final Collection columnNames, final String uniqueKey) { + public String buildDivisibleSQL(final String schemaName, final String tableName, final Collection columnNames, final String uniqueKey, final boolean lowerInclusive) { String qualifiedTableName = sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName); String escapedUniqueKey = sqlSegmentBuilder.getEscapedIdentifier(uniqueKey); - return String.format("SELECT %s FROM %s WHERE %s>=? AND %s<=? ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, escapedUniqueKey, escapedUniqueKey, escapedUniqueKey); + String result = String.format("SELECT %s FROM %s WHERE %s%s? AND %s<=? ORDER BY %s ASC", + buildQueryColumns(columnNames), qualifiedTableName, escapedUniqueKey, lowerInclusive ? ">=" : ">", escapedUniqueKey, escapedUniqueKey); + return dialectSQLBuilder.wrapWithPageQuery(result); } /** @@ -56,16 +64,19 @@ public String buildDivisibleSQL(final String schemaName, final String tableName, * @param tableName table name * @param columnNames column names * @param uniqueKey unique key + * @param lowerInclusive lower inclusive or not * @return built SQL */ - public String buildUnlimitedDivisibleSQL(final String schemaName, final String tableName, final Collection columnNames, final String uniqueKey) { + public String buildUnlimitedDivisibleSQL(final String schemaName, final String tableName, final Collection columnNames, final String uniqueKey, final boolean lowerInclusive) { String qualifiedTableName = sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName); String escapedUniqueKey = sqlSegmentBuilder.getEscapedIdentifier(uniqueKey); - return String.format("SELECT %s FROM %s WHERE %s>=? ORDER BY %s ASC", buildQueryColumns(columnNames), qualifiedTableName, escapedUniqueKey, escapedUniqueKey); + String result = String.format("SELECT %s FROM %s WHERE %s%s? ORDER BY %s ASC", + buildQueryColumns(columnNames), qualifiedTableName, escapedUniqueKey, lowerInclusive ? ">=" : ">", escapedUniqueKey); + return dialectSQLBuilder.wrapWithPageQuery(result); } /** - * Build indivisible inventory dump first SQL. + * Build indivisible inventory dump SQL. * * @param schemaName schema name * @param tableName table name @@ -83,15 +94,33 @@ private String buildQueryColumns(final Collection columnNames) { return columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(",")); } + /** + * Build point query SQL. + * + * @param schemaName schema name + * @param tableName table name + * @param columnNames column names + * @param uniqueKey unique key + * @return built SQL + */ + public String buildPointQuerySQL(final String schemaName, final String tableName, final Collection columnNames, final String uniqueKey) { + String qualifiedTableName = sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName); + String queryColumns = columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(",")); + String escapedUniqueKey = sqlSegmentBuilder.getEscapedIdentifier(uniqueKey); + return String.format("SELECT %s FROM %s WHERE %s=?", queryColumns, qualifiedTableName, escapedUniqueKey); + } + /** * Build fetch all inventory dump SQL. * * @param schemaName schema name * @param tableName tableName + * @param columnNames column names * @return built SQL */ - public String buildFetchAllSQL(final String schemaName, final String tableName) { + public String buildFetchAllSQL(final String schemaName, final String tableName, final Collection columnNames) { String qualifiedTableName = sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName); - return String.format("SELECT * FROM %s", qualifiedTableName); + String queryColumns = columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(",")); + return String.format("SELECT %s FROM %s", queryColumns, qualifiedTableName); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java index 6218a482e51ae..e42387dd7b4db 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java @@ -19,8 +19,8 @@ import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder; -import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; +import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import java.util.Optional; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/DatabaseTypeUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/DatabaseTypeUtils.java new file mode 100644 index 0000000000000..f5be779e7defd --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/DatabaseTypeUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.util; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType; + +/** + * Database type utility class. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class DatabaseTypeUtils { + + /** + * Check whether database type is MySQL, include trunk database type. + * + * @param databaseType database type + * @return is MySQL or not + */ + public static boolean isMySQL(final DatabaseType databaseType) { + return getTrunkDatabaseType(databaseType) instanceof MySQLDatabaseType; + } + + /** + * Get trunk database type. + * + * @param databaseType database type + * @return trunk database type + */ + public static DatabaseType getTrunkDatabaseType(final DatabaseType databaseType) { + return databaseType.getTrunkDatabaseType().orElse(databaseType); + } +} diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilderTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilderTest.java index 6a28e99797016..83ad0f9ac0f44 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilderTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineDataConsistencyCalculateSQLBuilderTest.java @@ -17,34 +17,89 @@ package org.apache.shardingsphere.data.pipeline.core.sqlbuilder; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryRange; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineDataConsistencyCalculateSQLBuilder; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.List; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; class PipelineDataConsistencyCalculateSQLBuilderTest { + private static final Collection COLUMN_NAMES = Arrays.asList("order_id", "user_id", "status"); + + private static final List UNIQUE_KEYS = Arrays.asList("order_id", "status"); + + private static final List SHARDING_COLUMNS_NAMES = Collections.singletonList("user_id"); + private final PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder = new PipelineDataConsistencyCalculateSQLBuilder(TypedSPILoader.getService(DatabaseType.class, "FIXTURE")); @Test - void assertBuildQueryAllOrderingSQLFirstQuery() { - String actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order", Collections.singletonList("*"), "order_id", true); - assertThat(actual, is("SELECT * FROM t_order ORDER BY order_id ASC")); - actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id", true); - assertThat(actual, is("SELECT order_id,user_id,status FROM t_order ORDER BY order_id ASC")); + void assertBuildQueryRangeOrderingSQLWithoutQueryCondition() { + String actual = pipelineSQLBuilder.buildQueryRangeOrderingSQL(null, "t_order", COLUMN_NAMES, UNIQUE_KEYS, + new QueryRange(1, true, 5), SHARDING_COLUMNS_NAMES); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>=? AND order_id<=? ORDER BY order_id ASC, status ASC, user_id ASC LIMIT ?")); + actual = pipelineSQLBuilder.buildQueryRangeOrderingSQL(null, "t_order", COLUMN_NAMES, UNIQUE_KEYS, + new QueryRange(1, false, 5), SHARDING_COLUMNS_NAMES); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>? AND order_id<=? ORDER BY order_id ASC, status ASC, user_id ASC LIMIT ?")); + actual = pipelineSQLBuilder.buildQueryRangeOrderingSQL(null, "t_order", COLUMN_NAMES, UNIQUE_KEYS, + new QueryRange(1, false, null), SHARDING_COLUMNS_NAMES); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>? ORDER BY order_id ASC, status ASC, user_id ASC LIMIT ?")); + actual = pipelineSQLBuilder.buildQueryRangeOrderingSQL(null, "t_order", COLUMN_NAMES, UNIQUE_KEYS, + new QueryRange(null, false, 5), SHARDING_COLUMNS_NAMES); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id<=? ORDER BY order_id ASC, status ASC, user_id ASC LIMIT ?")); + actual = pipelineSQLBuilder.buildQueryRangeOrderingSQL(null, "t_order", COLUMN_NAMES, UNIQUE_KEYS, + new QueryRange(null, false, null), SHARDING_COLUMNS_NAMES); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order ORDER BY order_id ASC, status ASC, user_id ASC LIMIT ?")); + } + + @Test + void assertBuildQueryRangeOrderingSQLWithQueryCondition() { + String actual = pipelineSQLBuilder.buildQueryRangeOrderingSQL(null, "t_order", COLUMN_NAMES, UNIQUE_KEYS, + new QueryRange(1, true, 5), SHARDING_COLUMNS_NAMES); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>=? AND order_id<=?" + + " ORDER BY order_id ASC, status ASC, user_id ASC LIMIT ?")); + actual = pipelineSQLBuilder.buildQueryRangeOrderingSQL(null, "t_order", COLUMN_NAMES, UNIQUE_KEYS, + new QueryRange(1, false, 5), SHARDING_COLUMNS_NAMES); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>? AND order_id<=?" + + " ORDER BY order_id ASC, status ASC, user_id ASC LIMIT ?")); + actual = pipelineSQLBuilder.buildQueryRangeOrderingSQL(null, "t_order", COLUMN_NAMES, UNIQUE_KEYS, + new QueryRange(1, true, null), SHARDING_COLUMNS_NAMES); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>=?" + + " ORDER BY order_id ASC, status ASC, user_id ASC LIMIT ?")); + actual = pipelineSQLBuilder.buildQueryRangeOrderingSQL(null, "t_order", COLUMN_NAMES, UNIQUE_KEYS, + new QueryRange(null, true, 5), SHARDING_COLUMNS_NAMES); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id<=?" + + " ORDER BY order_id ASC, status ASC, user_id ASC LIMIT ?")); + actual = pipelineSQLBuilder.buildQueryRangeOrderingSQL(null, "t_order", COLUMN_NAMES, UNIQUE_KEYS, + new QueryRange(null, false, null), SHARDING_COLUMNS_NAMES); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order ORDER BY order_id ASC, status ASC, user_id ASC LIMIT ?")); + } + + @Test + void assertBuildPointQuerySQLWithoutQueryCondition() { + String actual = pipelineSQLBuilder.buildPointQuerySQL(null, "t_order", COLUMN_NAMES, UNIQUE_KEYS, null); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id=? AND status=?")); + actual = pipelineSQLBuilder.buildPointQuerySQL(null, "t_order", COLUMN_NAMES, UNIQUE_KEYS, Collections.emptyList()); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id=? AND status=?")); + actual = pipelineSQLBuilder.buildPointQuerySQL(null, "t_order", COLUMN_NAMES, UNIQUE_KEYS, Collections.singletonList("user_id")); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id=? AND status=? AND user_id=?")); } @Test - void assertBuildQueryAllOrderingSQLNonFirstQuery() { - String actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order", Collections.singletonList("*"), "order_id", false); - assertThat(actual, is("SELECT * FROM t_order WHERE order_id>? ORDER BY order_id ASC")); - actual = pipelineSQLBuilder.buildQueryAllOrderingSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id", false); - assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>? ORDER BY order_id ASC")); + void assertBuildPointQuerySQLWithQueryCondition() { + String actual = pipelineSQLBuilder.buildPointQuerySQL(null, "t_order", COLUMN_NAMES, UNIQUE_KEYS, null); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id=? AND status=?")); + actual = pipelineSQLBuilder.buildPointQuerySQL(null, "t_order", COLUMN_NAMES, UNIQUE_KEYS, Collections.emptyList()); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id=? AND status=?")); + actual = pipelineSQLBuilder.buildPointQuerySQL(null, "t_order", COLUMN_NAMES, UNIQUE_KEYS, Collections.singletonList("user_id")); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id=? AND status=? AND user_id=?")); } } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilderTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilderTest.java index f765003f5fc1a..9638a7cbae104 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilderTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineInventoryDumpSQLBuilderTest.java @@ -23,6 +23,7 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collections; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -33,14 +34,18 @@ class PipelineInventoryDumpSQLBuilderTest { @Test void assertBuildDivisibleSQL() { - String actual = inventoryDumpSQLBuilder.buildDivisibleSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id"); - assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>=? AND order_id<=? ORDER BY order_id ASC")); + String actual = inventoryDumpSQLBuilder.buildDivisibleSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id", true); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>=? AND order_id<=? ORDER BY order_id ASC LIMIT ?")); + actual = inventoryDumpSQLBuilder.buildDivisibleSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id", false); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>? AND order_id<=? ORDER BY order_id ASC LIMIT ?")); } @Test void assertBuildUnlimitedDivisibleSQL() { - String actual = inventoryDumpSQLBuilder.buildUnlimitedDivisibleSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id"); - assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>=? ORDER BY order_id ASC")); + String actual = inventoryDumpSQLBuilder.buildUnlimitedDivisibleSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id", true); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>=? ORDER BY order_id ASC LIMIT ?")); + actual = inventoryDumpSQLBuilder.buildUnlimitedDivisibleSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id", false); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id>? ORDER BY order_id ASC LIMIT ?")); } @Test @@ -49,9 +54,17 @@ void assertBuildIndivisibleSQL() { assertThat(actual, is("SELECT order_id,user_id,status FROM t_order ORDER BY order_id ASC")); } + @Test + void assertBuildPointQuerySQL() { + String actual = inventoryDumpSQLBuilder.buildPointQuerySQL(null, "t_order", Arrays.asList("order_id", "user_id", "status"), "order_id"); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order WHERE order_id=?")); + } + @Test void assertBuildFetchAllSQL() { - String actual = inventoryDumpSQLBuilder.buildFetchAllSQL(null, "t_order"); + String actual = inventoryDumpSQLBuilder.buildFetchAllSQL(null, "t_order", Arrays.asList("order_id", "user_id", "status")); + assertThat(actual, is("SELECT order_id,user_id,status FROM t_order")); + actual = inventoryDumpSQLBuilder.buildFetchAllSQL(null, "t_order", Collections.singletonList("*")); assertThat(actual, is("SELECT * FROM t_order")); } } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java index b57af6a326b9c..01ed9e9932fe9 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java @@ -19,13 +19,16 @@ import com.zaxxer.hikari.HikariDataSource; import org.apache.commons.lang3.RandomStringUtils; -import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable; -import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordSingleTableInventoryCalculatedResult; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.RecordSingleTableInventoryCalculator; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter; +import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryRange; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryType; +import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -35,8 +38,11 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Types; +import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; import static org.hamcrest.CoreMatchers.is; @@ -72,18 +78,29 @@ private static HikariDataSource createHikariDataSource(final String databaseName private static void createTableAndInitData(final PipelineDataSourceWrapper dataSource) throws SQLException { try (Connection connection = dataSource.getConnection()) { - String sql = "CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id INT NOT NULL, status VARCHAR(12))"; + String sql = "CREATE TABLE t_order (user_id INT NOT NULL, order_id INT, status VARCHAR(12), PRIMARY KEY (user_id, order_id))"; connection.createStatement().execute(sql); - PreparedStatement preparedStatement = connection.prepareStatement("INSERT INTO t_order (order_id, user_id, status) VALUES (?, ?, ?)"); - for (int i = 0; i < 10; i++) { - preparedStatement.setInt(1, i + 1); - preparedStatement.setInt(2, i + 1); - preparedStatement.setString(3, "test"); - preparedStatement.execute(); - } + PreparedStatement preparedStatement = connection.prepareStatement("INSERT INTO t_order (user_id, order_id, status) VALUES (?, ?, ?)"); + insertRecord(preparedStatement, 1, 1); + insertRecord(preparedStatement, 2, 2); + insertRecord(preparedStatement, 3, 3); + insertRecord(preparedStatement, 3, 4); + insertRecord(preparedStatement, 3, 5); + insertRecord(preparedStatement, 3, 6); + insertRecord(preparedStatement, 3, 7); + insertRecord(preparedStatement, 4, 8); + insertRecord(preparedStatement, 5, 9); + insertRecord(preparedStatement, 6, 10); } } + private static void insertRecord(final PreparedStatement preparedStatement, final int userId, final int orderId) throws SQLException { + preparedStatement.setInt(1, userId); + preparedStatement.setInt(2, orderId); + preparedStatement.setString(3, "OK"); + preparedStatement.executeUpdate(); + } + @Test void assertCalculateOfAllQueryFromBegin() { RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(5); @@ -92,7 +109,7 @@ void assertCalculateOfAllQueryFromBegin() { assertTrue(calculateResult.isPresent()); SingleTableInventoryCalculatedResult actual = calculateResult.get(); assertTrue(actual.getMaxUniqueKeyValue().isPresent()); - assertThat(actual.getMaxUniqueKeyValue().get(), is(5)); + assertThat(actual.getMaxUniqueKeyValue().get(), is(4)); } @Test @@ -103,11 +120,85 @@ void assertCalculateOfAllQueryFromMiddle() { assertTrue(calculateResult.isPresent()); SingleTableInventoryCalculatedResult actual = calculateResult.get(); assertTrue(actual.getMaxUniqueKeyValue().isPresent()); - assertThat(actual.getMaxUniqueKeyValue().get(), is(10)); + assertThat(actual.getMaxUniqueKeyValue().get(), is(9)); } private SingleTableInventoryCalculateParameter generateParameter(final PipelineDataSourceWrapper dataSource, final Object dataCheckPosition) { List uniqueKeys = Collections.singletonList(new PipelineColumnMetaData(1, "order_id", Types.INTEGER, "integer", false, true, true)); return new SingleTableInventoryCalculateParameter(dataSource, new CaseInsensitiveQualifiedTable(null, "t_order"), Collections.emptyList(), uniqueKeys, dataCheckPosition); } + + @Test + void assertCalculateOfRangeQuery() { + RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(1000); + SingleTableInventoryCalculateParameter param = new SingleTableInventoryCalculateParameter(dataSource, new CaseInsensitiveQualifiedTable(null, "t_order"), + Collections.emptyList(), buildUniqueKeys(), QueryType.RANGE_QUERY); + param.setQueryRange(new QueryRange(3, true, 7)); + Optional calculatedResult = calculator.calculateChunk(param); + assertTrue(calculatedResult.isPresent()); + SingleTableInventoryCalculatedResult actual = calculatedResult.get(); + assertThat(actual.getRecordsCount(), is(8)); + assertTrue(actual.getMaxUniqueKeyValue().isPresent()); + assertThat(actual.getMaxUniqueKeyValue().get(), is(6)); + } + + @Test + void assertCalculateOfRangeQueryAll() { + RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(3); + SingleTableInventoryCalculateParameter param = new SingleTableInventoryCalculateParameter(dataSource, new CaseInsensitiveQualifiedTable(null, "t_order"), + Collections.emptyList(), buildUniqueKeys(), QueryType.RANGE_QUERY); + param.setQueryRange(new QueryRange(null, false, null)); + Iterator resultIterator = calculator.calculate(param).iterator(); + RecordSingleTableInventoryCalculatedResult actual = (RecordSingleTableInventoryCalculatedResult) resultIterator.next(); + assertThat(actual.getRecordsCount(), is(2)); + assertRecord(actual.getRecords().get(0), 1, 1); + assertRecord(actual.getRecords().get(1), 2, 2); + assertTrue(actual.getMaxUniqueKeyValue().isPresent()); + assertThat(actual.getMaxUniqueKeyValue().get(), is(2)); + actual = (RecordSingleTableInventoryCalculatedResult) resultIterator.next(); + assertThat(actual.getRecordsCount(), is(5)); + assertRecord(actual.getRecords().get(0), 3, 3); + assertRecord(actual.getRecords().get(1), 3, 4); + assertRecord(actual.getRecords().get(2), 3, 5); + assertRecord(actual.getRecords().get(3), 3, 6); + assertRecord(actual.getRecords().get(4), 3, 7); + assertTrue(actual.getMaxUniqueKeyValue().isPresent()); + assertThat(actual.getMaxUniqueKeyValue().get(), is(3)); + actual = (RecordSingleTableInventoryCalculatedResult) resultIterator.next(); + assertThat(actual.getRecordsCount(), is(2)); + assertRecord(actual.getRecords().get(0), 4, 8); + assertRecord(actual.getRecords().get(1), 5, 9); + assertTrue(actual.getMaxUniqueKeyValue().isPresent()); + assertThat(actual.getMaxUniqueKeyValue().get(), is(5)); + actual = (RecordSingleTableInventoryCalculatedResult) resultIterator.next(); + assertThat(actual.getRecordsCount(), is(1)); + assertRecord(actual.getRecords().get(0), 6, 10); + assertTrue(actual.getMaxUniqueKeyValue().isPresent()); + assertThat(actual.getMaxUniqueKeyValue().get(), is(6)); + } + + private void assertRecord(final Map record, final int userId, final int orderId) { + assertThat(record.get("user_id"), is(userId)); + assertThat(record.get("order_id"), is(orderId)); + } + + @Test + void assertCalculateOfPointQuery() { + RecordSingleTableInventoryCalculator calculator = new RecordSingleTableInventoryCalculator(3); + SingleTableInventoryCalculateParameter param = new SingleTableInventoryCalculateParameter(dataSource, new CaseInsensitiveQualifiedTable(null, "t_order"), + Collections.emptyList(), buildUniqueKeys(), QueryType.POINT_QUERY); + param.setUniqueKeysValues(Arrays.asList(3, 3)); + Optional calculatedResult = calculator.calculateChunk(param); + assertTrue(calculatedResult.isPresent()); + SingleTableInventoryCalculatedResult actual = calculatedResult.get(); + assertThat(actual.getRecordsCount(), is(1)); + assertTrue(actual.getMaxUniqueKeyValue().isPresent()); + assertThat(actual.getMaxUniqueKeyValue().get(), is(3)); + } + + private List buildUniqueKeys() { + return Arrays.asList( + new PipelineColumnMetaData(1, "user_id", Types.INTEGER, "integer", false, true, true), + new PipelineColumnMetaData(2, "order_id", Types.INTEGER, "integer", false, true, true)); + } }