Skip to content

Commit

Permalink
fix: addressing reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
vibhatha committed May 30, 2024
1 parent 9cf8544 commit 4dddac9
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -845,8 +845,7 @@ public void splitAndTransferTo(int startIndex, int length,
target.clear();
if (length > 0) {
splitAndTransferValidityBuffer(startIndex, length, target);
splitAndTransferViewBuffer(startIndex, length, target);
splitAndTransferDataBuffers(startIndex, length, target);
splitAndTransferViewBufferAndDataBuffer(startIndex, length, target);
target.setLastSet(length - 1);
target.setValueCount(length);
}
Expand Down Expand Up @@ -921,11 +920,18 @@ private void splitAndTransferValidityBuffer(int startIndex, int length,
}
}

private void splitAndTransferViewBuffer(int startIndex, int length,
/**
* In split and transfer, the view buffer and the data buffer will be allocated.
* Then the values will be copied from the source vector to the target vector.
* Allocation and setting are preferred over transfer
* since the buf index and buf offset needs to be overwritten
* when large strings are added.
* @param startIndex starting index
* @param length number of elements to be copied
* @param target target vector
*/
private void splitAndTransferViewBufferAndDataBuffer(int startIndex, int length,
BaseVariableWidthViewVector target) {
final int startingByte = startIndex * ELEMENT_SIZE;
final int lengthInBytes = length * ELEMENT_SIZE;

if (length == 0) {
return;
}
Expand All @@ -934,23 +940,55 @@ private void splitAndTransferViewBuffer(int startIndex, int length,
target.viewBuffer.getReferenceManager().release();
}

final ArrowBuf slicedViewBuffer = viewBuffer.slice(startingByte, lengthInBytes);
target.viewBuffer = transferBuffer(slicedViewBuffer, target.allocator);
}
// allocate target view buffer
target.viewBuffer = target.allocator.buffer(length * ELEMENT_SIZE);

private void splitAndTransferDataBuffers(int startIndex, int length,
BaseVariableWidthViewVector target) {
for (int i = startIndex; i < startIndex + length; i++) {
final int stringLength = getValueLength(i);
if (stringLength > INLINE_SIZE) {
final int bufIndex = viewBuffer.getInt(((long) i * ELEMENT_SIZE) +

// keeping track of writing index in the target view buffer
int writePosition = (i - startIndex) * ELEMENT_SIZE;
// keeping track of reading index in the source view buffer
int readPosition = i * ELEMENT_SIZE;

// to clear the memory segment of view being written to
// this is helpful in case of overwriting the value
target.viewBuffer.setZero(writePosition, ELEMENT_SIZE);

// set length
target.viewBuffer.setInt(writePosition, stringLength);

if (stringLength <= INLINE_SIZE) {
// handle inline buffer
writePosition += LENGTH_WIDTH;
readPosition += LENGTH_WIDTH;
// set data by copying the required portion from the source buffer
target.viewBuffer.setBytes(writePosition, viewBuffer, readPosition, stringLength);
} else {
// handle non-inline buffer
final int readBufIndex = viewBuffer.getInt(((long) i * ELEMENT_SIZE) +
LENGTH_WIDTH + PREFIX_WIDTH);
final int bufOffset = viewBuffer.getInt(((long) i * ELEMENT_SIZE) +
final int readBufOffset = viewBuffer.getInt(((long) i * ELEMENT_SIZE) +
LENGTH_WIDTH + PREFIX_WIDTH + BUF_INDEX_WIDTH);
final ArrowBuf dataBuf = dataBuffers.get(bufIndex);
final ArrowBuf slicedDataBuffer = dataBuf.slice(bufOffset, stringLength);
final ArrowBuf dataBuf = dataBuffers.get(readBufIndex);

// allocate data buffer
ArrowBuf currentDataBuf = target.allocateOrGetLastDataBuffer(stringLength);
currentDataBuf.setBytes(currentDataBuf.writerIndex(), slicedDataBuffer, 0, stringLength);
final long currentOffSet = currentDataBuf.writerIndex();

writePosition += LENGTH_WIDTH;
readPosition += LENGTH_WIDTH;
// set prefix
target.viewBuffer.setBytes(writePosition, viewBuffer, readPosition, PREFIX_WIDTH);
writePosition += PREFIX_WIDTH;
// set buf id
target.viewBuffer.setInt(writePosition, target.dataBuffers.size() - 1);
writePosition += BUF_INDEX_WIDTH;
// set offset
target.viewBuffer.setInt(writePosition, (int) currentOffSet);

currentDataBuf.setBytes(currentOffSet, dataBuf, readBufOffset, stringLength);
currentDataBuf.writerIndex(currentOffSet + stringLength);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public void test() throws Exception {
}
}

@Test /* ViewVarCharVector */
@Test
public void testView() throws Exception {
try (final ViewVarCharVector viewVarCharVector = new ViewVarCharVector("myvector", allocator)) {
viewVarCharVector.allocateNew(10000, 1000);
Expand Down Expand Up @@ -285,7 +285,15 @@ public void testMemoryConstrainedTransfer() {
@Test
public void testMemoryConstrainedTransferInViews() {
try (final ViewVarCharVector viewVarCharVector = new ViewVarCharVector("myvector", allocator)) {
allocator.setLimit(32768); /* set limit of 32KB */
// Here we have the target vector being transferred with a long string
// hence, the data buffer will be allocated.
// The default data buffer allocation takes
// BaseVariableWidthViewVector.INITIAL_VIEW_VALUE_ALLOCATION * BaseVariableWidthViewVector.ELEMENT_SIZE
// set limit = BaseVariableWidthViewVector.INITIAL_VIEW_VALUE_ALLOCATION *
// BaseVariableWidthViewVector.ELEMENT_SIZE
final int setLimit = BaseVariableWidthViewVector.INITIAL_VIEW_VALUE_ALLOCATION *
BaseVariableWidthViewVector.ELEMENT_SIZE;
allocator.setLimit(setLimit);

viewVarCharVector.allocateNew(16000, 1000);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1738,14 +1738,14 @@ public void testSplitAndTransfer1() {
final int dataRefCnt = sourceVector.getDataBuffer().refCnt();

sourceVector.splitAndTransferTo(0, 2, targetVector);
// split and transfer with slice starting at the beginning:
// this should not allocate anything new (all are inline strings)
assertEquals(allocatedMem, allocator.getAllocatedMemory());
// we allocate view and data buffers for the target vector
assertTrue(allocatedMem < allocator.getAllocatedMemory());

// The validity buffer is sliced from the same buffer.See BaseFixedWidthViewVector#allocateBytes.
// Therefore, the refcnt of the validity buffer is increased once since the startIndex is 0.
assertEquals(validityRefCnt + 1, sourceVector.getValidityBuffer().refCnt());
assertEquals(dataRefCnt + 1, sourceVector.getDataBuffer().refCnt());
// since the new view buffer is allocated, the refcnt is the same as the source vector.
assertEquals(dataRefCnt, sourceVector.getDataBuffer().refCnt());
}
assertArrayEquals(STR4, targetVector.get(0));
assertArrayEquals(STR5, targetVector.get(1));
Expand Down Expand Up @@ -1774,14 +1774,18 @@ public void testSplitAndTransfer2() {
final int dataRefCnt = sourceVector.getDataBuffer().refCnt();

sourceVector.splitAndTransferTo(0, 2, targetVector);
// split and transfer with slice starting at the beginning:
// this should allocate new (first is a long string)
// we allocate view and data buffers for the target vector
assertTrue(allocatedMem < allocator.getAllocatedMemory());

// The validity buffer is sliced from the same buffer.See BaseFixedWidthViewVector#allocateBytes.
// Therefore, the refcnt of the validity buffer is increased once since the startIndex is 0.
assertEquals(validityRefCnt + 1, sourceVector.getValidityBuffer().refCnt());
assertEquals(dataRefCnt + 1, sourceVector.getDataBuffer().refCnt());
// since the new view buffer is allocated, the refcnt is the same as the source vector.
assertEquals(dataRefCnt, sourceVector.getDataBuffer().refCnt());

assertArrayEquals(STR2, sourceVector.get(0));
assertArrayEquals(STR5, sourceVector.get(1));
assertArrayEquals(STR6, sourceVector.get(2));
}
assertArrayEquals(STR2, targetVector.get(0));
assertArrayEquals(STR5, targetVector.get(1));
Expand Down Expand Up @@ -1809,13 +1813,17 @@ public void testSplitAndTransfer3() {
final int validityRefCnt = sourceVector.getValidityBuffer().refCnt();
final int dataRefCnt = sourceVector.getDataBuffer().refCnt();

// split and transfer with slice starting at the beginning: this should not allocate anything new
sourceVector.splitAndTransferTo(0, 2, targetVector);
assertEquals(allocatedMem, allocator.getAllocatedMemory());
// we allocate view and data buffers for the target vector
assertTrue(allocatedMem < allocator.getAllocatedMemory());
// The validity buffer is sliced from the same buffer.See BaseFixedWidthViewVector#allocateBytes.
// Therefore, the refcnt of the validity buffer is increased once since the startIndex is 0.
assertEquals(validityRefCnt + 1, sourceVector.getValidityBuffer().refCnt());
assertEquals(dataRefCnt + 1, sourceVector.getDataBuffer().refCnt());
// since the new view buffer is allocated, the refcnt is the same as the source vector.
assertEquals(dataRefCnt, sourceVector.getDataBuffer().refCnt());

assertArrayEquals(STR4, targetVector.get(0));
assertArrayEquals(STR5, targetVector.get(1));
}
assertArrayEquals(STR4, sourceVector.get(0));
assertArrayEquals(STR5, sourceVector.get(1));
Expand Down Expand Up @@ -1845,13 +1853,16 @@ public void testSplitAndTransfer4() {
final int dataRefCnt = sourceVector.getDataBuffer().refCnt();

sourceVector.splitAndTransferTo(0, 2, targetVector);
// split and transfer with slice starting at the beginning:
// this should allocate new (first is a long string)
// we allocate view and data buffers for the target vector
assertTrue(allocatedMem < allocator.getAllocatedMemory());
// The validity buffer is sliced from the same buffer.See BaseFixedWidthViewVector#allocateBytes.
// Therefore, the refcnt of the validity buffer is increased once since the startIndex is 0.
assertEquals(validityRefCnt + 1, sourceVector.getValidityBuffer().refCnt());
assertEquals(dataRefCnt + 1, sourceVector.getDataBuffer().refCnt());
// since the new view buffer is allocated, the refcnt is the same as the source vector.
assertEquals(dataRefCnt, sourceVector.getDataBuffer().refCnt());

assertArrayEquals(STR2, targetVector.get(0));
assertArrayEquals(STR5, targetVector.get(1));
}
assertArrayEquals(STR2, sourceVector.get(0));
assertArrayEquals(STR5, sourceVector.get(1));
Expand Down Expand Up @@ -1887,14 +1898,22 @@ public void testSplitAndTransfer5() {
final long validitySize =
DefaultRoundingPolicy.DEFAULT_ROUNDING_POLICY.getRoundedSize(
BaseValueVector.getValidityBufferSizeFromCount(2));
assertEquals(allocatedMem + validitySize, allocator.getAllocatedMemory());
// we allocate view and data buffers for the target vector
assertTrue(allocatedMem + validitySize < allocator.getAllocatedMemory());
// The validity is sliced from the same buffer.See BaseFixedWidthViewVector#allocateBytes.
// Since values up to the startIndex are empty/null validity refcnt should not change.
assertEquals(validityRefCnt, sourceVector.getValidityBuffer().refCnt());
assertEquals(dataRefCnt + 1, sourceVector.getDataBuffer().refCnt());
// since the new view buffer is allocated, the refcnt is the same as the source vector.
assertEquals(dataRefCnt, sourceVector.getDataBuffer().refCnt());

assertArrayEquals(STR4, targetVector.get(0));
assertArrayEquals(STR5, targetVector.get(1));

assertArrayEquals(new byte[0], sourceVector.get(0));
assertTrue(sourceVector.isNull(1));
assertArrayEquals(STR4, sourceVector.get(2));
assertArrayEquals(STR5, sourceVector.get(3));
assertArrayEquals(STR6, sourceVector.get(4));
}
}

Expand Down Expand Up @@ -1927,14 +1946,22 @@ public void testSplitAndTransfer6() {
final long validitySize =
DefaultRoundingPolicy.DEFAULT_ROUNDING_POLICY.getRoundedSize(
BaseValueVector.getValidityBufferSizeFromCount(2));
// we allocate view and data buffers for the target vector
assertTrue(allocatedMem + validitySize < allocator.getAllocatedMemory());
// The validity is sliced from the same buffer.See BaseFixedWidthViewVector#allocateBytes.
// Since values up to the startIndex are empty/null validity refcnt should not change.
assertEquals(validityRefCnt, sourceVector.getValidityBuffer().refCnt());
assertEquals(dataRefCnt + 1, sourceVector.getDataBuffer().refCnt());
// since the new view buffer is allocated, the refcnt is the same as the source vector.
assertEquals(dataRefCnt, sourceVector.getDataBuffer().refCnt());

assertArrayEquals(STR1, targetVector.get(0));
assertArrayEquals(STR2, targetVector.get(1));

assertArrayEquals(new byte[0], sourceVector.get(0));
assertTrue(sourceVector.isNull(1));
assertArrayEquals(STR1, sourceVector.get(2));
assertArrayEquals(STR2, sourceVector.get(3));
assertArrayEquals(STR3, sourceVector.get(4));
}
}

Expand All @@ -1946,9 +1973,10 @@ public void testSplitAndTransfer6() {
*/
@Test
public void testSplitAndTransfer7() {
try (final BufferAllocator targetAllocator = allocator.newChildAllocator("target-alloc", 256, 256);
final int maxAllocation = 512;
try (final BufferAllocator targetAllocator = allocator.newChildAllocator("target-alloc", 256, maxAllocation);
final ViewVarCharVector targetVector = newViewVarCharVector("split-target", targetAllocator)) {
try (final BufferAllocator sourceAllocator = allocator.newChildAllocator("source-alloc", 256, 256);
try (final BufferAllocator sourceAllocator = allocator.newChildAllocator("source-alloc", 256, maxAllocation);
final ViewVarCharVector sourceVector = newViewVarCharVector(EMPTY_SCHEMA_PATH, sourceAllocator)) {
sourceVector.allocateNew(50, 3);

Expand All @@ -1961,15 +1989,19 @@ public void testSplitAndTransfer7() {
final int validityRefCnt = sourceVector.getValidityBuffer().refCnt();
final int dataRefCnt = sourceVector.getDataBuffer().refCnt();

// split and transfer with slice starting at the beginning:
// this should not allocate anything new
sourceVector.splitAndTransferTo(0, 2, targetVector);
// no extra allocation as strings are all inline
assertEquals(allocatedMem, allocator.getAllocatedMemory());
// Unlike testSplitAndTransfer1 where the buffers originated from the same allocator,

// the refcnts of each buffer for this test should be the same as what
// the source allocator ended up with.
assertEquals(validityRefCnt, sourceVector.getValidityBuffer().refCnt());
// since the new view buffer is allocated, the refcnt is the same as the source vector.
assertEquals(dataRefCnt, sourceVector.getDataBuffer().refCnt());

assertArrayEquals(STR4, sourceVector.get(0));
assertArrayEquals(STR5, sourceVector.get(1));
assertArrayEquals(STR6, sourceVector.get(2));
}
assertArrayEquals(STR4, targetVector.get(0));
assertArrayEquals(STR5, targetVector.get(1));
Expand All @@ -1985,6 +2017,10 @@ public void testSplitAndTransfer7() {
@Test
public void testSplitAndTransfer8() {
final int initialReservation = 1024;
// Here we have the target vector being transferred with a long string
// hence, the data buffer will be allocated.
// The default data buffer allocation takes
// BaseVariableWidthViewVector.INITIAL_VIEW_VALUE_ALLOCATION * BaseVariableWidthViewVector.ELEMENT_SIZE
final int maxAllocation = initialReservation +
BaseVariableWidthViewVector.INITIAL_VIEW_VALUE_ALLOCATION * BaseVariableWidthViewVector.ELEMENT_SIZE;
try (final BufferAllocator targetAllocator = allocator.newChildAllocator("target-alloc",
Expand All @@ -2004,15 +2040,19 @@ public void testSplitAndTransfer8() {
final int validityRefCnt = sourceVector.getValidityBuffer().refCnt();
final int dataRefCnt = sourceVector.getDataBuffer().refCnt();

// split and transfer with slice starting at the beginning:
// this should not allocate anything new
sourceVector.splitAndTransferTo(0, 2, targetVector);
// we allocate view and data buffers for the target vector
assertTrue(allocatedMem < allocator.getAllocatedMemory());
// Unlike testSplitAndTransfer1 where the buffers originated from the same allocator,

// the refcnts of each buffer for this test should be the same as what
// the source allocator ended up with.
assertEquals(validityRefCnt, sourceVector.getValidityBuffer().refCnt());
// since the new view buffer is allocated, the refcnt is the same as the source vector.
assertEquals(dataRefCnt, sourceVector.getDataBuffer().refCnt());

assertArrayEquals(STR1, sourceVector.get(0));
assertArrayEquals(STR2, sourceVector.get(1));
assertArrayEquals(STR3, sourceVector.get(2));
}
assertArrayEquals(STR1, targetVector.get(0));
assertArrayEquals(STR2, targetVector.get(1));
Expand Down Expand Up @@ -2122,6 +2162,56 @@ public void testReallocAfterVectorTransfer1() {
}
}

/**
* ARROW-7831:
* ensures that data is transferred from one allocator to another in case of 0-index
* start special cases.
* With long strings and multiple data buffers.
* Check multi-data buffer source copying
*/
@Test
public void testSplitAndTransfer9() {
try (final ViewVarCharVector targetVector = new ViewVarCharVector("target", allocator)) {
String str4 = generateRandomString(35);
try (final ViewVarCharVector sourceVector = new ViewVarCharVector("source", allocator)) {
sourceVector.allocateNew(48, 4);

sourceVector.set(0, STR1);
sourceVector.set(1, STR2);
sourceVector.set(2, STR3);
sourceVector.set(3, str4.getBytes(StandardCharsets.UTF_8));
sourceVector.setValueCount(4);

// we should have multiple data buffers
assertTrue(sourceVector.getDataBuffers().size() > 1);

final long allocatedMem = allocator.getAllocatedMemory();
final int validityRefCnt = sourceVector.getValidityBuffer().refCnt();
final int dataRefCnt = sourceVector.getDataBuffer().refCnt();

// split and transfer with slice starting at the beginning:
// this should not allocate anything new
sourceVector.splitAndTransferTo(1, 3, targetVector);
// we allocate view and data buffers for the target vector
assertTrue(allocatedMem < allocator.getAllocatedMemory());

// the refcnts of each buffer for this test should be the same as what
// the source allocator ended up with.
assertEquals(validityRefCnt, sourceVector.getValidityBuffer().refCnt());
// since the new view buffer is allocated, the refcnt is the same as the source vector.
assertEquals(dataRefCnt, sourceVector.getDataBuffer().refCnt());

assertArrayEquals(STR1, sourceVector.get(0));
assertArrayEquals(STR2, sourceVector.get(1));
assertArrayEquals(STR3, sourceVector.get(2));
assertArrayEquals(str4.getBytes(StandardCharsets.UTF_8), sourceVector.get(3));
}
assertArrayEquals(STR2, targetVector.get(0));
assertArrayEquals(STR3, targetVector.get(1));
assertArrayEquals(str4.getBytes(StandardCharsets.UTF_8), targetVector.get(2));
}
}

private String generateRandomString(int length) {
Random random = new Random();
StringBuilder sb = new StringBuilder(length);
Expand Down

0 comments on commit 4dddac9

Please sign in to comment.