Skip to content

Commit

Permalink
apacheGH-40038: [Java] Export non empty offset buffer for variable-si…
Browse files Browse the repository at this point in the history
…ze layout through C Data Interface (apache#40043)

### Rationale for this change

We encountered an error when exchanging string array from Java to Rust through Arrow C data interface. At Rust side, it complains that the buffer at position 1 (offset buffer) is null. After tracing down and some debugging, it looks like the issue is Java Arrow `BaseVariableWidthVector` class assigns an empty offset buffer if the array is empty (value count 0).

According to Arrow [spec](https://arrow.apache.org/docs/format/Columnar.html#variable-size-binary-layout) for variable size binary layout:

> The offsets buffer contains length + 1 signed integers ...

So for an empty string array, its offset buffer should be a buffer with one element (generally it is `0`).

### What changes are included in this PR?

This patch replaces current empty offset buffer in variable-size layout vector classes when exporting arrays through C Data Interface.

### Are these changes tested?

Added test cases.

### Are there any user-facing changes?

No

* Closes: apache#40038

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
viirya authored Apr 2, 2024
1 parent 1552293 commit 5ddef63
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 25 deletions.
10 changes: 1 addition & 9 deletions java/c/src/main/java/org/apache/arrow/c/ArrayExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,7 @@ void export(ArrowArray array, FieldVector vector, DictionaryProvider dictionaryP
if (buffers != null) {
data.buffers = new ArrayList<>(buffers.size());
data.buffers_ptrs = allocator.buffer((long) buffers.size() * Long.BYTES);
for (ArrowBuf arrowBuf : buffers) {
if (arrowBuf != null) {
arrowBuf.getReferenceManager().retain();
data.buffers_ptrs.writeLong(arrowBuf.memoryAddress());
} else {
data.buffers_ptrs.writeLong(NULL);
}
data.buffers.add(arrowBuf);
}
vector.exportCDataBuffers(data.buffers, data.buffers_ptrs, NULL);
}

if (dictionaryEncoding != null) {
Expand Down
18 changes: 17 additions & 1 deletion java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.apache.arrow.memory.ArrowBuf;
Expand Down Expand Up @@ -165,10 +166,25 @@ VectorSchemaRoot vectorSchemaRootRoundtrip(VectorSchemaRoot root) {
}

boolean roundtrip(FieldVector vector, Class<?> clazz) {
List<ArrowBuf> fieldBuffers = vector.getFieldBuffers();
List<Integer> orgRefCnts = fieldBuffers.stream().map(buf -> buf.refCnt()).collect(Collectors.toList());
long orgMemorySize = allocator.getAllocatedMemory();

boolean result = false;
try (ValueVector imported = vectorRoundtrip(vector)) {
assertTrue(clazz.isInstance(imported), String.format("expected %s but was %s", clazz, imported.getClass()));
return VectorEqualsVisitor.vectorEquals(vector, imported);
result = VectorEqualsVisitor.vectorEquals(vector, imported);
}

// Check that the ref counts of the buffers are the same after the roundtrip
IntStream.range(0, orgRefCnts.size()).forEach(i -> {
ArrowBuf buf = fieldBuffers.get(i);
assertEquals(buf.refCnt(), orgRefCnts.get(i));
});

assertEquals(orgMemorySize, allocator.getAllocatedMemory());

return result;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,34 @@ public List<ArrowBuf> getFieldBuffers() {
return result;
}

/**
* Export the buffers of the fields for C Data Interface. This method traverse the buffers and
* export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers.
*/
@Override
public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
// before flight/IPC, we must bring the vector to a consistent state.
// this is because, it is possible that the offset buffers of some trailing values
// are not updated. this may cause some data in the data buffer being lost.
// for details, please see TestValueVector#testUnloadVariableWidthVector.
fillHoles(valueCount);

exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true);

if (offsetBuffer.capacity() == 0) {
// Empty offset buffer is allowed for historical reason.
// To export it through C Data interface, we need to allocate a buffer with one offset.
// We set `retain = false` to explicitly not increase the ref count for the exported buffer.
// The ref count of the newly created buffer (i.e., 1) already represents the usage
// at imported side.
exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false);
} else {
exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true);
}

exportBuffer(valueBuffer, buffers, buffersPtr, nullValue, true);
}

/**
* Set the reader and writer indexes for the inner buffers.
*/
Expand Down Expand Up @@ -456,10 +484,11 @@ private void allocateBytes(final long valueBufferSize, final int valueCount) {
}

/* allocate offset buffer */
private void allocateOffsetBuffer(final long size) {
offsetBuffer = allocator.buffer(size);
private ArrowBuf allocateOffsetBuffer(final long size) {
ArrowBuf offsetBuffer = allocator.buffer(size);
offsetBuffer.readerIndex(0);
initOffsetBuffer();
return offsetBuffer;
}

/* allocate validity buffer */
Expand Down Expand Up @@ -760,7 +789,7 @@ private void splitAndTransferOffsetBuffer(int startIndex, int length, BaseLargeV
final long start = getStartOffset(startIndex);
final long end = getStartOffset(startIndex + length);
final long dataLength = end - start;
target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH);
target.offsetBuffer = target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH);
for (int i = 0; i < length + 1; i++) {
final long relativeSourceOffset = getStartOffset(startIndex + i) - start;
target.offsetBuffer.setLong((long) i * OFFSET_WIDTH, relativeSourceOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,34 @@ public List<ArrowBuf> getFieldBuffers() {
return result;
}

/**
* Export the buffers of the fields for C Data Interface. This method traverse the buffers and
* export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers.
*/
@Override
public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
// before flight/IPC, we must bring the vector to a consistent state.
// this is because, it is possible that the offset buffers of some trailing values
// are not updated. this may cause some data in the data buffer being lost.
// for details, please see TestValueVector#testUnloadVariableWidthVector.
fillHoles(valueCount);

exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true);

if (offsetBuffer.capacity() == 0) {
// Empty offset buffer is allowed for historical reason.
// To export it through C Data interface, we need to allocate a buffer with one offset.
// We set `retain = false` to explicitly not increase the ref count for the exported buffer.
// The ref count of the newly created buffer (i.e., 1) already represents the usage
// at imported side.
exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false);
} else {
exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true);
}

exportBuffer(valueBuffer, buffers, buffersPtr, nullValue, true);
}

/**
* Set the reader and writer indexes for the inner buffers.
*/
Expand Down Expand Up @@ -476,11 +504,12 @@ private void allocateBytes(final long valueBufferSize, final int valueCount) {
}

/* allocate offset buffer */
private void allocateOffsetBuffer(final long size) {
private ArrowBuf allocateOffsetBuffer(final long size) {
final int curSize = (int) size;
offsetBuffer = allocator.buffer(curSize);
ArrowBuf offsetBuffer = allocator.buffer(curSize);
offsetBuffer.readerIndex(0);
initOffsetBuffer();
return offsetBuffer;
}

/* allocate validity buffer */
Expand Down Expand Up @@ -805,7 +834,7 @@ private void splitAndTransferOffsetBuffer(int startIndex, int length, BaseVariab
(1 + length) * ((long) OFFSET_WIDTH));
target.offsetBuffer = transferBuffer(slicedOffsetBuffer, target.allocator);
} else {
target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH);
target.offsetBuffer = target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH);
for (int i = 0; i < length + 1; i++) {
final int relativeSourceOffset = getStartOffset(startIndex + i) - start;
target.offsetBuffer.setInt((long) i * OFFSET_WIDTH, relativeSourceOffset);
Expand Down
41 changes: 41 additions & 0 deletions java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,47 @@ public interface FieldVector extends ValueVector {
*/
List<ArrowBuf> getFieldBuffers();

/**
* Export a given buffer and its memory address into a list of buffers and a pointer to the list of buffers.
*
* @param buffer the buffer to export
* @param buffers the list of buffers
* @param buffersPtr the pointer to the list of buffers
* @param nullValue the null value to use for null buffer
* @param retain whether to retain the buffer when exporting
*/
default void exportBuffer(
ArrowBuf buffer,
List<ArrowBuf> buffers,
ArrowBuf buffersPtr,
long nullValue,
boolean retain) {
if (buffer != null) {
if (retain) {
buffer.getReferenceManager().retain();
}
buffersPtr.writeLong(buffer.memoryAddress());
} else {
buffersPtr.writeLong(nullValue);
}
buffers.add(buffer);
}

/**
* Export the buffers of the fields for C Data Interface. This method traverse the buffers and
* export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers.
*
* By default, when exporting a buffer, it will increase ref count for exported buffer that counts
* the usage at imported side.
*/
default void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
List<ArrowBuf> fieldBuffers = getFieldBuffers();

for (ArrowBuf arrowBuf : fieldBuffers) {
exportBuffer(arrowBuf, buffers, buffersPtr, nullValue, true);
}
}

/**
* Get the inner vectors.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public String getName() {
public boolean allocateNewSafe() {
boolean dataAlloc = false;
try {
allocateOffsetBuffer(offsetAllocationSizeInBytes);
offsetBuffer = allocateOffsetBuffer(offsetAllocationSizeInBytes);
dataAlloc = vector.allocateNewSafe();
} catch (Exception e) {
e.printStackTrace();
Expand All @@ -97,12 +97,13 @@ public boolean allocateNewSafe() {
return dataAlloc;
}

protected void allocateOffsetBuffer(final long size) {
protected ArrowBuf allocateOffsetBuffer(final long size) {
final int curSize = (int) size;
offsetBuffer = allocator.buffer(curSize);
ArrowBuf offsetBuffer = allocator.buffer(curSize);
offsetBuffer.readerIndex(0);
offsetAllocationSizeInBytes = curSize;
offsetBuffer.setZero(0, offsetBuffer.capacity());
return offsetBuffer;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,26 @@ public List<ArrowBuf> getFieldBuffers() {
return result;
}

/**
* Export the buffers of the fields for C Data Interface. This method traverse the buffers and
* export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers.
*/
@Override
public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true);

if (offsetBuffer.capacity() == 0) {
// Empty offset buffer is allowed for historical reason.
// To export it through C Data interface, we need to allocate a buffer with one offset.
// We set `retain = false` to explicitly not increase the ref count for the exported buffer.
// The ref count of the newly created buffer (i.e., 1) already represents the usage
// at imported side.
exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false);
} else {
exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true);
}
}

/**
* Set the reader and writer indexes for the inner buffers.
*/
Expand Down Expand Up @@ -343,7 +363,7 @@ public boolean allocateNewSafe() {
/* allocate offset and data buffer */
boolean dataAlloc = false;
try {
allocateOffsetBuffer(offsetAllocationSizeInBytes);
offsetBuffer = allocateOffsetBuffer(offsetAllocationSizeInBytes);
dataAlloc = vector.allocateNewSafe();
} catch (Exception e) {
e.printStackTrace();
Expand Down Expand Up @@ -371,11 +391,12 @@ private void allocateValidityBuffer(final long size) {
validityBuffer.setZero(0, validityBuffer.capacity());
}

protected void allocateOffsetBuffer(final long size) {
offsetBuffer = allocator.buffer(size);
protected ArrowBuf allocateOffsetBuffer(final long size) {
ArrowBuf offsetBuffer = allocator.buffer(size);
offsetBuffer.readerIndex(0);
offsetAllocationSizeInBytes = size;
offsetBuffer.setZero(0, offsetBuffer.capacity());
return offsetBuffer;
}

/**
Expand Down Expand Up @@ -656,7 +677,7 @@ public void splitAndTransfer(int startIndex, int length) {
final long startPoint = offsetBuffer.getLong((long) startIndex * OFFSET_WIDTH);
final long sliceLength = offsetBuffer.getLong((long) (startIndex + length) * OFFSET_WIDTH) - startPoint;
to.clear();
to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
to.offsetBuffer = to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
/* splitAndTransfer offset buffer */
for (int i = 0; i < length + 1; i++) {
final long relativeOffset = offsetBuffer.getLong((long) (startIndex + i) * OFFSET_WIDTH) - startPoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,26 @@ public List<ArrowBuf> getFieldBuffers() {
return result;
}

/**
* Export the buffers of the fields for C Data Interface. This method traverse the buffers and
* export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers.
*/
@Override
public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true);

if (offsetBuffer.capacity() == 0) {
// Empty offset buffer is allowed for historical reason.
// To export it through C Data interface, we need to allocate a buffer with one offset.
// We set `retain = false` to explicitly not increase the ref count for the exported buffer.
// The ref count of the newly created buffer (i.e., 1) already represents the usage
// at imported side.
exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false);
} else {
exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true);
}
}

/**
* Set the reader and writer indexes for the inner buffers.
*/
Expand Down Expand Up @@ -535,7 +555,7 @@ public void splitAndTransfer(int startIndex, int length) {
final int startPoint = offsetBuffer.getInt(startIndex * OFFSET_WIDTH);
final int sliceLength = offsetBuffer.getInt((startIndex + length) * OFFSET_WIDTH) - startPoint;
to.clear();
to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
to.offsetBuffer = to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
/* splitAndTransfer offset buffer */
for (int i = 0; i < length + 1; i++) {
final int relativeOffset = offsetBuffer.getInt((startIndex + i) * OFFSET_WIDTH) - startPoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void splitAndTransfer(int startIndex, int length) {
final int startPoint = offsetBuffer.getInt(startIndex * OFFSET_WIDTH);
final int sliceLength = offsetBuffer.getInt((startIndex + length) * OFFSET_WIDTH) - startPoint;
to.clear();
to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
to.offsetBuffer = to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
/* splitAndTransfer offset buffer */
for (int i = 0; i < length + 1; i++) {
final int relativeOffset = offsetBuffer.getInt((startIndex + i) * OFFSET_WIDTH) - startPoint;
Expand Down

0 comments on commit 5ddef63

Please sign in to comment.