diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryPositionCalculator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryPositionCalculator.java new file mode 100644 index 0000000000000..194ac60a26e62 --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryPositionCalculator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.preparer.inventory; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.Range; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition; +import org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator; + +import java.math.BigInteger; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; + +/** + * Inventory position calculator. + */ +@NoArgsConstructor(access = AccessLevel.NONE) +public final class InventoryPositionCalculator { + + /** + * Get position by integer unique key range. + * + * @param tableRecordsCount table records count + * @param uniqueKeyValuesRange unique key values range + * @param shardingSize sharding size + * @return position collection + */ + public static Collection getPositionByIntegerUniqueKeyRange(final long tableRecordsCount, final Range uniqueKeyValuesRange, final long shardingSize) { + if (0 == tableRecordsCount) { + return Collections.singletonList(new IntegerPrimaryKeyIngestPosition(0, 0)); + } + Collection result = new LinkedList<>(); + long splitCount = tableRecordsCount / shardingSize + (tableRecordsCount % shardingSize > 0 ? 1 : 0); + long interval = BigInteger.valueOf(uniqueKeyValuesRange.getMaximum()).subtract(BigInteger.valueOf(uniqueKeyValuesRange.getMinimum())).divide(BigInteger.valueOf(splitCount)).longValue(); + IntervalToRangeIterator rangeIterator = new IntervalToRangeIterator(uniqueKeyValuesRange.getMinimum(), uniqueKeyValuesRange.getMaximum(), interval); + while (rangeIterator.hasNext()) { + Range range = rangeIterator.next(); + result.add(new IntegerPrimaryKeyIngestPosition(range.getMinimum(), range.getMaximum())); + } + return result; + } +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java index 0560697edc2af..d75f30ad0bdb8 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java @@ -25,17 +25,16 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition; -import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition; import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition; import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition; import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition; import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils; import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData; +import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryPositionCalculator; import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryRecordsCountCalculator; import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder; -import org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator; import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils; import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier; @@ -122,7 +121,9 @@ private Collection getInventoryPositions(final InventoryDumperCo if (1 == uniqueKeyColumns.size()) { int firstColumnDataType = uniqueKeyColumns.get(0).getDataType(); if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) { - return getPositionByIntegerUniqueKeyRange(dumperContext, tableRecordsCount, jobItemContext); + Range uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dumperContext); + int shardingSize = jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize(); + return InventoryPositionCalculator.getPositionByIntegerUniqueKeyRange(tableRecordsCount, uniqueKeyValuesRange, shardingSize); } if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) { // TODO Support string unique key table splitting. Ascii characters ordering are different in different versions of databases. @@ -132,23 +133,6 @@ private Collection getInventoryPositions(final InventoryDumperCo return Collections.singleton(new UnsupportedKeyIngestPosition()); } - private Collection getPositionByIntegerUniqueKeyRange(final InventoryDumperContext dumperContext, final long tableRecordsCount, final TransmissionJobItemContext jobItemContext) { - if (0L == tableRecordsCount) { - return Collections.singletonList(new IntegerPrimaryKeyIngestPosition(0L, 0L)); - } - Collection result = new LinkedList<>(); - Range uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dumperContext); - int shardingSize = jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize(); - long splitCount = tableRecordsCount / shardingSize + (tableRecordsCount % shardingSize > 0L ? 1 : 0); - long interval = (uniqueKeyValuesRange.getMaximum() - uniqueKeyValuesRange.getMinimum()) / splitCount; - IntervalToRangeIterator rangeIterator = new IntervalToRangeIterator(uniqueKeyValuesRange.getMinimum(), uniqueKeyValuesRange.getMaximum(), interval); - while (rangeIterator.hasNext()) { - Range range = rangeIterator.next(); - result.add(new IntegerPrimaryKeyIngestPosition(range.getMinimum(), range.getMaximum())); - } - return result; - } - private Range getUniqueKeyValuesRange(final TransmissionJobItemContext jobItemContext, final InventoryDumperContext dumperContext) { String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName(); PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType()); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/IntervalToRangeIterator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/IntervalToRangeIterator.java index ab8bbbfb7fa09..0718e4b3e9cfd 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/IntervalToRangeIterator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/IntervalToRangeIterator.java @@ -19,6 +19,7 @@ import org.apache.commons.lang3.Range; +import java.math.BigInteger; import java.util.Iterator; import java.util.NoSuchElementException; @@ -30,11 +31,11 @@ */ public final class IntervalToRangeIterator implements Iterator> { - private final long maximum; + private final BigInteger maximum; - private final long interval; + private final BigInteger interval; - private long current; + private BigInteger current; public IntervalToRangeIterator(final long minimum, final long maximum, final long interval) { if (minimum > maximum) { @@ -43,14 +44,14 @@ public IntervalToRangeIterator(final long minimum, final long maximum, final lon if (interval < 0L) { throw new IllegalArgumentException("interval is less than zero"); } - this.maximum = maximum; - this.interval = interval; - current = minimum; + this.maximum = BigInteger.valueOf(maximum); + this.interval = BigInteger.valueOf(interval); + this.current = BigInteger.valueOf(minimum); } @Override public boolean hasNext() { - return current <= maximum; + return current.compareTo(maximum) <= 0; } @Override @@ -58,9 +59,13 @@ public Range next() { if (!hasNext()) { throw new NoSuchElementException(""); } - long upperLimit = Math.min(maximum, current + interval); - Range result = Range.between(current, upperLimit); - current = upperLimit + 1L; + BigInteger upperLimit = min(maximum, current.add(interval)); + Range result = Range.between(current.longValue(), upperLimit.longValue()); + current = upperLimit.add(BigInteger.ONE); return result; } + + private BigInteger min(final BigInteger one, final BigInteger another) { + return one.compareTo(another) < 0 ? one : another; + } } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryPositionCalculatorTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryPositionCalculatorTest.java new file mode 100644 index 0000000000000..19c5e3a927cf9 --- /dev/null +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryPositionCalculatorTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.preparer.inventory; + +import org.apache.commons.lang3.Range; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +class InventoryPositionCalculatorTest { + + @Test + void assertGetPositionByIntegerUniqueKeyRange() { + List actualPositions = (List) InventoryPositionCalculator.getPositionByIntegerUniqueKeyRange(200L, Range.between(1L, 600L), 100L); + assertThat(actualPositions.size(), is(2)); + for (IngestPosition each : actualPositions) { + assertThat(each, instanceOf(IntegerPrimaryKeyIngestPosition.class)); + } + assertPosition(new IntegerPrimaryKeyIngestPosition(1L, 300L), (IntegerPrimaryKeyIngestPosition) actualPositions.get(0)); + assertPosition(new IntegerPrimaryKeyIngestPosition(301L, 600L), (IntegerPrimaryKeyIngestPosition) actualPositions.get(1)); + } + + private void assertPosition(final IntegerPrimaryKeyIngestPosition expected, final IntegerPrimaryKeyIngestPosition actual) { + assertThat(actual.getBeginValue(), is(expected.getBeginValue())); + assertThat(actual.getEndValue(), is(expected.getEndValue())); + } + + @Test + void assertGetPositionByIntegerUniqueKeyRangeWithZeroTotalRecordsCount() { + List actualPositions = (List) InventoryPositionCalculator.getPositionByIntegerUniqueKeyRange(0L, Range.between(0L, 0L), 1L); + assertThat(actualPositions.size(), is(1)); + assertThat(actualPositions.get(0), instanceOf(IntegerPrimaryKeyIngestPosition.class)); + assertPosition(new IntegerPrimaryKeyIngestPosition(0L, 0L), (IntegerPrimaryKeyIngestPosition) actualPositions.get(0)); + } + + @Test + void assertGetPositionByIntegerUniqueKeyRangeWithTheSameMinMax() { + List actualPositions = (List) InventoryPositionCalculator.getPositionByIntegerUniqueKeyRange(200L, Range.between(5L, 5L), 100L); + assertThat(actualPositions.size(), is(1)); + assertThat(actualPositions.get(0), instanceOf(IntegerPrimaryKeyIngestPosition.class)); + assertPosition(new IntegerPrimaryKeyIngestPosition(5L, 5L), (IntegerPrimaryKeyIngestPosition) actualPositions.get(0)); + } + + @Test + void assertGetPositionByIntegerUniqueKeyRangeOverflow() { + long tableRecordsCount = Long.MAX_VALUE - 1L; + long shardingSize = tableRecordsCount / 2L; + long minimum = Long.MIN_VALUE + 1L; + long maximum = Long.MAX_VALUE; + List actualPositions = (List) InventoryPositionCalculator.getPositionByIntegerUniqueKeyRange( + tableRecordsCount, Range.between(minimum, maximum), shardingSize); + assertThat(actualPositions.size(), is(2)); + for (IngestPosition each : actualPositions) { + assertThat(each, instanceOf(IntegerPrimaryKeyIngestPosition.class)); + } + assertPosition(new IntegerPrimaryKeyIngestPosition(minimum, 0L), (IntegerPrimaryKeyIngestPosition) actualPositions.get(0)); + assertPosition(new IntegerPrimaryKeyIngestPosition(1L, maximum), (IntegerPrimaryKeyIngestPosition) actualPositions.get(1)); + } +}