diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index bc862963405f2..b4fbbb2d41498 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1974,7 +1974,6 @@ def _temp_path(): generate_run_end_encoded_case() .skip_tester('C#') - .skip_tester('Java') .skip_tester('JS') # TODO(https://github.com/apache/arrow-nanoarrow/issues/618) .skip_tester('nanoarrow') diff --git a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java index 150c11e41edff..2661c12cda3af 100644 --- a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java +++ b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java @@ -187,7 +187,7 @@ public List visit(ArrowType.Union type) { @Override public List visit(ArrowType.RunEndEncoded type) { - throw new UnsupportedOperationException("Importing buffers for type: " + type); + return List.of(); } @Override diff --git a/java/c/src/main/java/org/apache/arrow/c/Format.java b/java/c/src/main/java/org/apache/arrow/c/Format.java index f77a555d18481..7ce99614d2a7a 100644 --- a/java/c/src/main/java/org/apache/arrow/c/Format.java +++ b/java/c/src/main/java/org/apache/arrow/c/Format.java @@ -233,6 +233,8 @@ static String asString(ArrowType arrowType) { return "+vl"; case LargeListView: return "+vL"; + case RunEndEncoded: + return "+r"; case NONE: throw new IllegalArgumentException("Arrow type ID is NONE"); default: @@ -321,6 +323,8 @@ static ArrowType asType(String format, long flags) return new ArrowType.ListView(); case "+vL": return new ArrowType.LargeListView(); + case "+r": + return new ArrowType.RunEndEncoded(); default: String[] parts = format.split(":", 2); if (parts.length == 2) { diff --git a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java index d8286465e475f..67ab282de5a32 100644 --- a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java +++ b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java @@ -88,6 +88,7 @@ import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.ListViewVector; import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.complex.impl.UnionMapWriter; @@ -770,6 +771,22 @@ public void testStructVector() { } } + @Test + public void testRunEndEncodedVector() { + try (final RunEndEncodedVector vector = RunEndEncodedVector.empty("v", allocator)) { + setVector(vector, List.of(1, 3), List.of(1, 2)); + assertTrue(roundtrip(vector, RunEndEncodedVector.class)); + } + } + + @Test + public void testEmptyRunEndEncodedVector() { + try (final RunEndEncodedVector vector = RunEndEncodedVector.empty("v", allocator)) { + setVector(vector, List.of(), List.of()); + assertTrue(roundtrip(vector, RunEndEncodedVector.class)); + } + } + @Test public void testExtensionTypeVector() { ExtensionTypeRegistry.register(new UuidType()); diff --git a/java/c/src/test/python/integration_tests.py b/java/c/src/test/python/integration_tests.py index b0a86e9c66e59..3e14be11c4644 100644 --- a/java/c/src/test/python/integration_tests.py +++ b/java/c/src/test/python/integration_tests.py @@ -399,6 +399,20 @@ def recreate_batch(): return reader.read_next_batch() self.round_trip_record_batch(recreate_batch) + + def test_runendencoded_array(self): + # empty vector + self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([], [], pa.run_end_encoded(pa.int64(), pa.int64()))) + + # constant null vector + self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([10], [None])) + # constant int vector + self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([10], [10])) + + # run end int vector + self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([3, 5, 10, 12, 19], [1, 2, 1, None, 3])) + # run end string vector + self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([3, 5, 10, 12, 19], ["1", "2", "1", None, "3"])) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java index e8de86f6e9549..1bb9a3d6c05f3 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java @@ -30,8 +30,11 @@ import org.apache.arrow.memory.util.hash.ArrowBufHasher; import org.apache.arrow.vector.BaseIntVector; import org.apache.arrow.vector.BaseValueVector; +import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BufferBacked; import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.ZeroVector; import org.apache.arrow.vector.compare.VectorVisitor; @@ -50,6 +53,7 @@ * values vector of any type. There are no buffers associated with the parent vector. */ public class RunEndEncodedVector extends BaseValueVector implements FieldVector { + public static final FieldVector DEFAULT_VALUE_VECTOR = ZeroVector.INSTANCE; public static final FieldVector DEFAULT_RUN_END_VECTOR = ZeroVector.INSTANCE; @@ -203,6 +207,7 @@ public void clear() { for (FieldVector v : getChildrenFromFields()) { v.clear(); } + this.valueCount = 0; } /** @@ -234,19 +239,6 @@ public MinorType getMinorType() { return MinorType.RUNENDENCODED; } - /** - * To transfer quota responsibility. - * - * @param allocator the target allocator - * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new - * target vector of the same type. - */ - @Override - public TransferPair getTransferPair(BufferAllocator allocator) { - throw new UnsupportedOperationException( - "RunEndEncodedVector does not support getTransferPair(BufferAllocator)"); - } - /** * To transfer quota responsibility. * @@ -284,8 +276,7 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator) { */ @Override public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) { - throw new UnsupportedOperationException( - "RunEndEncodedVector does not support getTransferPair(String, BufferAllocator, CallBack)"); + return new TransferImpl(ref, allocator, callBack); } /** @@ -299,8 +290,7 @@ public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallB */ @Override public TransferPair getTransferPair(Field field, BufferAllocator allocator, CallBack callBack) { - throw new UnsupportedOperationException( - "RunEndEncodedVector does not support getTransferPair(Field, BufferAllocator, CallBack)"); + return new TransferImpl(field, allocator, callBack); } /** @@ -312,8 +302,156 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator, Call */ @Override public TransferPair makeTransferPair(ValueVector target) { - throw new UnsupportedOperationException( - "RunEndEncodedVector does not support makeTransferPair(ValueVector)"); + return new TransferImpl((RunEndEncodedVector) target); + } + + private class TransferImpl implements TransferPair { + + RunEndEncodedVector to; + TransferPair dataTransferPair; + TransferPair reeTransferPair; + + public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) { + this(new RunEndEncodedVector(name, allocator, field.getFieldType(), callBack)); + } + + public TransferImpl(Field field, BufferAllocator allocator, CallBack callBack) { + this(new RunEndEncodedVector(field, allocator, callBack)); + } + + public TransferImpl(RunEndEncodedVector to) { + this.to = to; + if (to.getRunEndsVector() instanceof ZeroVector) { + to.initializeChildrenFromFields(field.getChildren()); + } + reeTransferPair = getRunEndsVector().makeTransferPair(to.getRunEndsVector()); + dataTransferPair = getValuesVector().makeTransferPair(to.getValuesVector()); + } + + /** + * Transfer the vector data to another vector. The memory associated with this vector is + * transferred to the allocator of target vector for accounting and management purposes. + */ + @Override + public void transfer() { + to.clear(); + dataTransferPair.transfer(); + reeTransferPair.transfer(); + if (valueCount > 0) { + to.setValueCount(valueCount); + } + clear(); + } + + /** + * Slice this vector at the desired index and length, then transfer the corresponding data to + * the target vector. + * + * @param startIndex start position of the split in source vector. + * @param length length of the split. + */ + @Override + public void splitAndTransfer(int startIndex, int length) { + to.clear(); + if (length <= 0) { + return; + } + + int physicalStartIndex = getPhysicalIndex(startIndex); + int physicalEndIndex = getPhysicalIndex(startIndex + length - 1); + int physicalLength = physicalEndIndex - physicalStartIndex + 1; + dataTransferPair.splitAndTransfer(physicalStartIndex, physicalLength); + FieldVector toRunEndsVector = to.runEndsVector; + if (startIndex == 0) { + if (((BaseIntVector) runEndsVector).getValueAsLong(physicalEndIndex) == length) { + reeTransferPair.splitAndTransfer(physicalStartIndex, physicalLength); + } else { + reeTransferPair.splitAndTransfer(physicalStartIndex, physicalLength - 1); + toRunEndsVector.setValueCount(physicalLength); + if (toRunEndsVector instanceof SmallIntVector) { + ((SmallIntVector) toRunEndsVector).set(physicalEndIndex, length); + } else if (toRunEndsVector instanceof IntVector) { + ((IntVector) toRunEndsVector).set(physicalEndIndex, length); + } else if (toRunEndsVector instanceof BigIntVector) { + ((BigIntVector) toRunEndsVector).set(physicalEndIndex, length); + } else { + throw new IllegalArgumentException( + "Run-end vector and must be of type int with size 16, 32, or 64 bits."); + } + } + } else { + shiftRunEndsVector( + toRunEndsVector, + startIndex, + length, + physicalStartIndex, + physicalEndIndex, + physicalLength); + } + getTo().setValueCount(length); + } + + private void shiftRunEndsVector( + ValueVector toRunEndVector, + int startIndex, + int length, + int physicalStartIndex, + int physicalEndIndex, + int physicalLength) { + toRunEndVector.setValueCount(physicalLength); + toRunEndVector.getValidityBuffer().setOne(0, toRunEndVector.getValidityBuffer().capacity()); + ArrowBuf fromRunEndBuffer = runEndsVector.getDataBuffer(); + ArrowBuf toRunEndBuffer = toRunEndVector.getDataBuffer(); + int physicalLastIndex = physicalLength - 1; + if (toRunEndVector instanceof SmallIntVector) { + byte typeWidth = SmallIntVector.TYPE_WIDTH; + for (int i = 0; i < physicalLastIndex; i++) { + toRunEndBuffer.setShort( + (long) i * typeWidth, + fromRunEndBuffer.getShort((long) (i + physicalStartIndex) * typeWidth) - startIndex); + } + int lastEnd = + Math.min( + fromRunEndBuffer.getShort((long) physicalEndIndex * typeWidth) - startIndex, + length); + toRunEndBuffer.setShort((long) physicalLastIndex * typeWidth, lastEnd); + } else if (toRunEndVector instanceof IntVector) { + byte typeWidth = IntVector.TYPE_WIDTH; + for (int i = 0; i < physicalLastIndex; i++) { + toRunEndBuffer.setInt( + (long) i * typeWidth, + fromRunEndBuffer.getInt((long) (i + physicalStartIndex) * typeWidth) - startIndex); + } + int lastEnd = + Math.min( + fromRunEndBuffer.getInt((long) physicalEndIndex * typeWidth) - startIndex, length); + toRunEndBuffer.setInt((long) physicalLastIndex * typeWidth, lastEnd); + } else if (toRunEndVector instanceof BigIntVector) { + byte typeWidth = BigIntVector.TYPE_WIDTH; + for (int i = 0; i < physicalLastIndex; i++) { + toRunEndBuffer.setLong( + (long) i * typeWidth, + fromRunEndBuffer.getLong((long) (i + physicalStartIndex) * typeWidth) - startIndex); + } + long lastEnd = + Math.min( + fromRunEndBuffer.getLong((long) physicalEndIndex * typeWidth) - startIndex, length); + toRunEndBuffer.setLong((long) physicalLastIndex * typeWidth, lastEnd); + } else { + throw new IllegalArgumentException( + "Run-end vector and must be of type int with size 16, 32, or 64 bits."); + } + } + + @Override + public ValueVector getTo() { + return to; + } + + @Override + public void copyValueSafe(int from, int to) { + this.to.copyFrom(from, to, RunEndEncodedVector.this); + } } /** @@ -568,6 +706,7 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers throw new UnsupportedOperationException( "Run-end encoded vectors do not have any associated buffers."); } + this.valueCount = fieldNode.getLength(); } /** diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java index 5668325a87eeb..fe0803d2984cb 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java @@ -909,10 +909,12 @@ private void readFromJsonIntoVector(Field field, FieldVector vector) throws IOEx variadicBufferIndices)); } - int nullCount = 0; - if (type instanceof ArrowType.Null) { + int nullCount; + if (type instanceof ArrowType.RunEndEncoded || type instanceof Union) { + nullCount = 0; + } else if (type instanceof ArrowType.Null) { nullCount = valueCount; - } else if (!(type instanceof Union)) { + } else { nullCount = BitVectorHelper.getNullCount(vectorBuffers.get(0), valueCount); } final ArrowFieldNode fieldNode = new ArrowFieldNode(valueCount, nullCount); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java index ef31b4f837344..5c7215437f8ec 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java @@ -305,7 +305,7 @@ public Void visit(RunEndEncodedVector vector, Void value) { if (runCount == 0) { validateOrThrow(valueCount == 0, "Run end vector does not contain enough elements"); } else if (runCount > 0) { - double lastEnd = ((BaseIntVector) runEndsVector).getValueAsLong(runCount - 1); + long lastEnd = ((BaseIntVector) runEndsVector).getValueAsLong(runCount - 1); validateOrThrow( valueCount == lastEnd, "Vector logic length not equal to the last end in run ends vector. Logical length %s, last end %s", diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java index 3f4be2e52ce56..adf51c07301f3 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java @@ -32,6 +32,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType.RunEndEncoded; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.util.TransferPair; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -106,25 +107,28 @@ public void testBasicRunEndEncodedVector() { setBasicVector(reeVector, runCount, i -> i % 2 == 0 ? null : i + 1, i -> i + 1); assertEquals(15, reeVector.getValueCount()); - int index = 0; - for (int run = 0; run < runCount; run++) { - long expectedRunValue = (long) run + 1; - for (int j = 0; j <= run; j++) { - if (run % 2 == 0) { - assertNull(reeVector.getObject(index)); - } else { - assertEquals(expectedRunValue, reeVector.getObject(index)); - } - index++; - } - } - + checkBasic(runCount, reeVector); // test index out of bound assertThrows(IndexOutOfBoundsException.class, () -> reeVector.getObject(-1)); assertThrows(IndexOutOfBoundsException.class, () -> reeVector.getObject(logicalValueCount)); } } + private static void checkBasic(int runCount, RunEndEncodedVector reeVector) { + int index = 0; + for (int run = 0; run < runCount; run++) { + long expectedRunValue = (long) run + 1; + for (int j = 0; j <= run; j++) { + if (run % 2 == 0) { + assertNull(reeVector.getObject(index)); + } else { + assertEquals(expectedRunValue, reeVector.getObject(index)); + } + index++; + } + } + } + @Test public void testRangeCompare() { // test compare same constant vector @@ -228,4 +232,102 @@ private static int setBasicVector( reeVector.setValueCount(logicalValueCount); return logicalValueCount; } + + @Test + public void testTransfer() { + // constant vector + try (RunEndEncodedVector reeVector = + new RunEndEncodedVector(createBigIntRunEndEncodedField("constant"), allocator, null)) { + Long value = 65536L; + int logicalValueCount = 100; + setConstantVector(reeVector, value, logicalValueCount); + assertEquals(logicalValueCount, reeVector.getValueCount()); + for (int i = 0; i < logicalValueCount; i++) { + assertEquals(value, reeVector.getObject(i)); + } + + TransferPair transferPair = reeVector.getTransferPair(allocator); + transferPair.transfer(); + assertEquals(0, reeVector.getValueCount()); + assertEquals(0, reeVector.getValuesVector().getValueCount()); + assertEquals(0, reeVector.getRunEndsVector().getValueCount()); + try (RunEndEncodedVector toVector = (RunEndEncodedVector) transferPair.getTo()) { + assertEquals(logicalValueCount, toVector.getValueCount()); + for (int i = 0; i < logicalValueCount; i++) { + assertEquals(value, toVector.getObject(i)); + } + } + } + + // basic run end encoded vector + try (RunEndEncodedVector reeVector = + new RunEndEncodedVector(createBigIntRunEndEncodedField("basic"), allocator, null)) { + // Create REE vector representing: + // [null, 2, 2, null, null, null, 4, 4, 4, 4, null, null, null, null, null]. + int runCount = 5; + final int logicalValueCount = + setBasicVector(reeVector, runCount, i -> i % 2 == 0 ? null : i + 1, i -> i + 1); + + assertEquals(15, reeVector.getValueCount()); + checkBasic(runCount, reeVector); + + TransferPair transferPair = reeVector.getTransferPair(allocator); + transferPair.transfer(); + assertEquals(0, reeVector.getValueCount()); + assertEquals(0, reeVector.getValuesVector().getValueCount()); + assertEquals(0, reeVector.getRunEndsVector().getValueCount()); + try (RunEndEncodedVector toVector = (RunEndEncodedVector) transferPair.getTo()) { + assertEquals(logicalValueCount, toVector.getValueCount()); + checkBasic(runCount, toVector); + } + } + } + + @Test + public void testSplitAndTransfer() { + // test compare same constant vector + try (RunEndEncodedVector constantVector = + new RunEndEncodedVector(createBigIntRunEndEncodedField("constant"), allocator, null)) { + int logicalValueCount = 15; + + setConstantVector(constantVector, 1L, logicalValueCount); + + try (RunEndEncodedVector toVector = RunEndEncodedVector.empty("constant", allocator)) { + TransferPair transferPair = constantVector.makeTransferPair(toVector); + int startIndex = 1; + int transferLength = 10; + transferPair.splitAndTransfer(startIndex, transferLength); + + toVector.validate(); + assertEquals(transferLength, toVector.getValueCount()); + assertTrue( + constantVector.accept( + new RangeEqualsVisitor(constantVector, toVector), new Range(1, 0, transferLength))); + } + } + + try (RunEndEncodedVector reeVector = + new RunEndEncodedVector(createBigIntRunEndEncodedField("ree"), allocator, null)) { + + setBasicVector(reeVector, 5, i -> i + 1, i -> i + 1); + + int[][] transferConfigs = {{0, 0}, {0, 1}, {0, 9}, {1, 0}, {1, 10}, {1, 14}}; + + try (RunEndEncodedVector toVector = RunEndEncodedVector.empty("ree", allocator)) { + TransferPair transferPair = reeVector.makeTransferPair(toVector); + for (final int[] transferConfig : transferConfigs) { + int startIndex = transferConfig[0]; + int transferLength = transferConfig[1]; + transferPair.splitAndTransfer(startIndex, transferLength); + + toVector.validate(); + assertEquals(transferLength, toVector.getValueCount()); + assertTrue( + reeVector.accept( + new RangeEqualsVisitor(reeVector, toVector), + new Range(startIndex, 0, transferLength))); + } + } + } + } } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/testing/ValueVectorDataPopulator.java b/java/vector/src/test/java/org/apache/arrow/vector/testing/ValueVectorDataPopulator.java index afbc30f019ef6..f599dfa539421 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/testing/ValueVectorDataPopulator.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/testing/ValueVectorDataPopulator.java @@ -68,10 +68,12 @@ import org.apache.arrow.vector.complex.LargeListViewVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.ListViewVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.holders.IntervalDayHolder; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; /** Utility for populating {@link org.apache.arrow.vector.ValueVector}. */ @@ -794,4 +796,41 @@ public static void setVector(LargeListViewVector vector, List... values dataVector.setValueCount(curPos); vector.setValueCount(values.length); } + + public static void setVector( + RunEndEncodedVector vector, List runEnds, List values) { + int runCount = runEnds.size(); + assert runCount == values.size(); + final FieldType valueType = FieldType.notNullable(MinorType.INT.getType()); + final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType()); + final Field valueField = new Field("value", valueType, null); + final Field runEndField = new Field("ree", runEndType, null); + vector.initializeChildrenFromFields(List.of(runEndField, valueField)); + + IntVector runEndsVector = (IntVector) vector.getRunEndsVector(); + runEndsVector.setValueCount(runCount); + for (int i = 0; i < runCount; i++) { + if (runEnds.get(i) == null) { + runEndsVector.setNull(i); + } else { + runEndsVector.set(i, runEnds.get(i)); + } + } + + IntVector valuesVector = (IntVector) vector.getValuesVector(); + valuesVector.setValueCount(runCount); + for (int i = 0; i < runCount; i++) { + if (runEnds.get(i) == null) { + valuesVector.setNull(i); + } else { + valuesVector.set(i, values.get(i)); + } + } + + if (runCount > 0) { + vector.setValueCount(runEnds.get(runCount - 1)); + } else { + vector.setValueCount(0); + } + } }