Skip to content

Commit

Permalink
Refactor inventory dumper and data consistency streaming query to pag…
Browse files Browse the repository at this point in the history
…e query (#32607)

* Refactor inventory dumper and data consistency streaming query to page query

* Fix spotless
  • Loading branch information
sandynz authored Aug 21, 2024
1 parent 52c6e1f commit e6b29d7
Show file tree
Hide file tree
Showing 20 changed files with 939 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.builder.EqualsBuilder;

import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.sql.Array;
import java.sql.SQLException;
Expand Down Expand Up @@ -76,30 +78,49 @@ public static boolean recordsEquals(final Map<String, Object> thisRecord, final
@SneakyThrows(SQLException.class)
public static boolean isMatched(final EqualsBuilder equalsBuilder, final Object thisColumnValue, final Object thatColumnValue) {
equalsBuilder.reset();
if (isInteger(thisColumnValue) && isInteger(thatColumnValue)) {
return isIntegerEquals((Number) thisColumnValue, (Number) thatColumnValue);
if (thisColumnValue instanceof Number && thatColumnValue instanceof Number) {
return isNumberEquals((Number) thisColumnValue, (Number) thatColumnValue);
}
if (thisColumnValue instanceof SQLXML && thatColumnValue instanceof SQLXML) {
return ((SQLXML) thisColumnValue).getString().equals(((SQLXML) thatColumnValue).getString());
}
if (thisColumnValue instanceof BigDecimal && thatColumnValue instanceof BigDecimal) {
return isBigDecimalEquals((BigDecimal) thisColumnValue, (BigDecimal) thatColumnValue);
}
if (thisColumnValue instanceof Array && thatColumnValue instanceof Array) {
return Objects.deepEquals(((Array) thisColumnValue).getArray(), ((Array) thatColumnValue).getArray());
}
return equalsBuilder.append(thisColumnValue, thatColumnValue).isEquals();
}

private static boolean isInteger(final Object value) {
if (!(value instanceof Number)) {
return false;
private static boolean isNumberEquals(final Number one, final Number another) {
if (isInteger(one) && isInteger(another)) {
return one.longValue() == another.longValue();
}
return isBigDecimalEquals(convertToBigDecimal(one), convertToBigDecimal(another));
}

private static boolean isInteger(final Number value) {
return value instanceof Long || value instanceof Integer || value instanceof Short || value instanceof Byte;
}

private static boolean isIntegerEquals(final Number one, final Number another) {
return one.longValue() == another.longValue();
/**
* Convert number to BigDecimal.
*
* @param value number
* @return BigDecimal
*/
public static BigDecimal convertToBigDecimal(final Number value) {
if (value instanceof BigDecimal) {
return (BigDecimal) value;
}
if (isInteger(value)) {
return BigDecimal.valueOf(value.longValue());
}
if (value instanceof Float || value instanceof Double) {
return BigDecimal.valueOf(value.doubleValue());
}
if (value instanceof BigInteger) {
return new BigDecimal((BigInteger) value);
}
return new BigDecimal(value.toString());
}

/**
Expand Down Expand Up @@ -128,4 +149,18 @@ public static boolean isBigDecimalEquals(final BigDecimal one, final BigDecimal
}
return 0 == decimalOne.compareTo(decimalTwo);
}

/**
* Get first unique key value.
*
* @param rawRecord raw record
* @param uniqueKey unique key
* @return first unique key value
*/
public static Object getFirstUniqueKeyValue(final Map<String, Object> rawRecord, final @Nullable String uniqueKey) {
if (rawRecord.isEmpty() || null == uniqueKey) {
return null;
}
return rawRecord.get(uniqueKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ public boolean equals(final Object o) {
final RecordSingleTableInventoryCalculatedResult that = (RecordSingleTableInventoryCalculatedResult) o;
EqualsBuilder equalsBuilder = new EqualsBuilder();
if (recordsCount != that.recordsCount || !DataConsistencyCheckUtils.isMatched(equalsBuilder, maxUniqueKeyValue, that.maxUniqueKeyValue)) {
log.warn("Record count or max unique key value not match, recordCount1={}, recordCount2={}, maxUniqueKeyValue1={}, maxUniqueKeyValue2={}.",
recordsCount, that.recordsCount, maxUniqueKeyValue, that.maxUniqueKeyValue);
log.warn("Record count or max unique key value not match, recordCount1={}, recordCount2={}, maxUniqueKeyValue1={}, maxUniqueKeyValue2={}, value1.class={}, value2.class={}.",
recordsCount, that.recordsCount, maxUniqueKeyValue, that.maxUniqueKeyValue,
null == maxUniqueKeyValue ? "" : maxUniqueKeyValue.getClass().getName(), null == that.maxUniqueKeyValue ? "" : that.maxUniqueKeyValue.getClass().getName());
return false;
}
Iterator<Map<String, Object>> thisRecordsIterator = records.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -35,6 +36,8 @@ public final class CalculationContext implements AutoCloseable {

private final AtomicReference<ResultSet> resultSet = new AtomicReference<>();

private final AtomicBoolean closed = new AtomicBoolean(false);

/**
* Get connection.
*
Expand Down Expand Up @@ -80,10 +83,23 @@ public void setResultSet(final ResultSet resultSet) {
this.resultSet.set(resultSet);
}

/**
* Is closed.
*
* @return closed or not
*/
public boolean isClosed() {
return closed.get();
}

@Override
public void close() {
closed.set(true);
QuietlyCloser.close(resultSet.get());
QuietlyCloser.close(preparedStatement.get());
QuietlyCloser.close(connection.get());
resultSet.set(null);
preparedStatement.set(null);
connection.set(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCheckUtils;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordSingleTableInventoryCalculatedResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryRange;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.QueryType;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
import org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineDataConsistencyCalculateSQLBuilder;
Expand All @@ -42,6 +45,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
Expand All @@ -55,10 +59,37 @@ public final class RecordSingleTableInventoryCalculator extends AbstractStreamin

@Override
public Optional<SingleTableInventoryCalculatedResult> calculateChunk(final SingleTableInventoryCalculateParameter param) {
CalculationContext calculationContext = getOrCreateCalculationContext(param);
try {
List<Map<String, Object>> records = new LinkedList<>();
Object maxUniqueKeyValue = null;
List<Map<String, Object>> records = calculateChunk0(param, QueryType.RANGE_QUERY == param.getQueryType());
if (records.isEmpty()) {
return Optional.empty();
}
String firstUniqueKey = param.getFirstUniqueKey().getName();
if (QueryType.POINT_QUERY == param.getQueryType()) {
return convertRecordsToResult(records, firstUniqueKey);
}
if (records.size() == chunkSize) {
Object minUniqueKeyValue = DataConsistencyCheckUtils.getFirstUniqueKeyValue(records.get(0), firstUniqueKey);
removeLastRecords(records, param);
if (!records.isEmpty()) {
updateQueryRangeLower(param, records, firstUniqueKey);
return convertRecordsToResult(records, firstUniqueKey);
}
SingleTableInventoryCalculateParameter newParam = buildNewCalculateParameter(param, minUniqueKeyValue);
records = calculateChunk0(newParam, false);
if (!records.isEmpty()) {
updateQueryRangeLower(param, records, firstUniqueKey);
return convertRecordsToResult(records, firstUniqueKey);
}
return Optional.empty();
} else {
updateQueryRangeLower(param, records, firstUniqueKey);
return convertRecordsToResult(records, firstUniqueKey);
}
}

private List<Map<String, Object>> calculateChunk0(final SingleTableInventoryCalculateParameter param, final boolean isRangeQuery) {
try (CalculationContext calculationContext = getOrCreateCalculationContext(param)) {
List<Map<String, Object>> result = new LinkedList<>();
InventoryColumnValueReaderEngine columnValueReaderEngine = new InventoryColumnValueReaderEngine(param.getDatabaseType());
ResultSet resultSet = calculationContext.getResultSet();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
Expand All @@ -69,30 +100,24 @@ public Optional<SingleTableInventoryCalculatedResult> calculateChunk(final Singl
for (int columnIndex = 1, columnCount = resultSetMetaData.getColumnCount(); columnIndex <= columnCount; columnIndex++) {
columnRecord.put(resultSetMetaData.getColumnLabel(columnIndex), columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex));
}
records.add(columnRecord);
maxUniqueKeyValue = columnValueReaderEngine.read(resultSet, resultSetMetaData, param.getFirstUniqueKey().getOrdinalPosition());
if (records.size() == chunkSize) {
result.add(columnRecord);
if (isRangeQuery && result.size() == chunkSize) {
break;
}
}
if (records.isEmpty()) {
calculationContext.close();
}
return records.isEmpty() ? Optional.empty() : Optional.of(new RecordSingleTableInventoryCalculatedResult(maxUniqueKeyValue, records));
return result;
} catch (final PipelineSQLException | PipelineJobCancelingException ex) {
calculationContext.close();
throw ex;
// CHECKSTYLE:OFF
} catch (final SQLException | RuntimeException ex) {
// CHECKSTYLE:ON
calculationContext.close();
throw new PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), param.getLogicTableName(), ex);
}
}

private CalculationContext getOrCreateCalculationContext(final SingleTableInventoryCalculateParameter param) {
CalculationContext result = (CalculationContext) param.getCalculationContext();
if (null != result) {
if (null != result && !result.isClosed()) {
return result;
}
try {
Expand Down Expand Up @@ -134,14 +159,86 @@ private String getQuerySQL(final SingleTableInventoryCalculateParameter param) {
}
PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder = new PipelineDataConsistencyCalculateSQLBuilder(param.getDatabaseType());
Collection<String> columnNames = param.getColumnNames().isEmpty() ? Collections.singleton("*") : param.getColumnNames();
boolean firstQuery = null == param.getTableCheckPosition();
return pipelineSQLBuilder.buildQueryAllOrderingSQL(param.getSchemaName(), param.getLogicTableName(), columnNames, param.getFirstUniqueKey().getName(), firstQuery);
switch (param.getQueryType()) {
case RANGE_QUERY:
return pipelineSQLBuilder.buildQueryRangeOrderingSQL(param.getSchemaName(), param.getLogicTableName(),
columnNames, param.getUniqueKeysNames(), param.getQueryRange(), param.getShardingColumnsNames());
case POINT_QUERY:
return pipelineSQLBuilder.buildPointQuerySQL(param.getSchemaName(), param.getLogicTableName(), columnNames, param.getUniqueKeysNames(), param.getShardingColumnsNames());
default:
throw new UnsupportedOperationException("Query type: " + param.getQueryType());
}
}

private void setParameters(final PreparedStatement preparedStatement, final SingleTableInventoryCalculateParameter param) throws SQLException {
Object tableCheckPosition = param.getTableCheckPosition();
if (null != tableCheckPosition) {
preparedStatement.setObject(1, tableCheckPosition);
QueryType queryType = param.getQueryType();
if (queryType == QueryType.RANGE_QUERY) {
QueryRange queryRange = param.getQueryRange();
ShardingSpherePreconditions.checkNotNull(queryRange, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(
param.getSchemaName(), param.getLogicTableName(), new RuntimeException("Unique keys values range is null")));
int parameterIndex = 1;
if (null != queryRange.getLower()) {
preparedStatement.setObject(parameterIndex++, queryRange.getLower());
}
if (null != queryRange.getUpper()) {
preparedStatement.setObject(parameterIndex++, queryRange.getUpper());
}
preparedStatement.setObject(parameterIndex, chunkSize);
} else if (queryType == QueryType.POINT_QUERY) {
Collection<Object> uniqueKeysValues = param.getUniqueKeysValues();
ShardingSpherePreconditions.checkNotNull(uniqueKeysValues, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(
param.getSchemaName(), param.getLogicTableName(), new RuntimeException("Unique keys values is null")));
int parameterIndex = 1;
for (Object each : uniqueKeysValues) {
preparedStatement.setObject(parameterIndex++, each);
}
if (null != param.getShardingColumnsNames() && !param.getShardingColumnsNames().isEmpty()) {
List<Object> shardingColumnsValues = param.getShardingColumnsValues();
ShardingSpherePreconditions.checkNotNull(shardingColumnsValues, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(
param.getSchemaName(), param.getLogicTableName(), new RuntimeException("Sharding columns values is null when names not empty")));
for (Object each : shardingColumnsValues) {
preparedStatement.setObject(parameterIndex++, each);
}
}
} else {
throw new UnsupportedOperationException("Query type: " + queryType);
}
}

private void removeLastRecords(final List<Map<String, Object>> records, final SingleTableInventoryCalculateParameter param) {
Object minUniqueKeyValue = DataConsistencyCheckUtils.getFirstUniqueKeyValue(records.get(0), param.getFirstUniqueKey().getName());
Object maxUniqueKeyValue = DataConsistencyCheckUtils.getFirstUniqueKeyValue(records.get(records.size() - 1), param.getFirstUniqueKey().getName());
if (Objects.equals(minUniqueKeyValue, maxUniqueKeyValue)) {
records.clear();
return;
}
records.remove(records.size() - 1);
for (int i = records.size() - 1; i >= 0; i--) {
if (Objects.deepEquals(maxUniqueKeyValue, DataConsistencyCheckUtils.getFirstUniqueKeyValue(records.get(i), param.getFirstUniqueKey().getName()))) {
records.remove(i);
} else {
break;
}
}
}

private Optional<SingleTableInventoryCalculatedResult> convertRecordsToResult(final List<Map<String, Object>> records, final String firstUniqueKey) {
Object maxUniqueKeyValue = DataConsistencyCheckUtils.getFirstUniqueKeyValue(records.get(records.size() - 1), firstUniqueKey);
return Optional.of(new RecordSingleTableInventoryCalculatedResult(maxUniqueKeyValue, records));
}

private SingleTableInventoryCalculateParameter buildNewCalculateParameter(final SingleTableInventoryCalculateParameter param, final Object maxUniqueKeyValue) {
SingleTableInventoryCalculateParameter result = new SingleTableInventoryCalculateParameter(param.getDataSource(), param.getTable(), param.getColumnNames(),
Collections.singletonList(param.getFirstUniqueKey()), QueryType.POINT_QUERY);
result.setUniqueKeysValues(Collections.singletonList(maxUniqueKeyValue));
result.setQueryRange(param.getQueryRange());
result.setShardingColumnsNames(param.getShardingColumnsNames());
result.setShardingColumnsValues(param.getShardingColumnsValues());
return result;
}

private void updateQueryRangeLower(final SingleTableInventoryCalculateParameter param, final List<Map<String, Object>> records, final String firstUniqueKey) {
Object maxUniqueKeyValue = DataConsistencyCheckUtils.getFirstUniqueKeyValue(records.get(records.size() - 1), firstUniqueKey);
param.setQueryRange(new QueryRange(maxUniqueKeyValue, false, param.getQueryRange().getUpper()));
}
}
Loading

0 comments on commit e6b29d7

Please sign in to comment.