Skip to content

Commit

Permalink
fix: addressing review: refactor view allocation function
Browse files Browse the repository at this point in the history
  • Loading branch information
vibhatha committed Apr 8, 2024
1 parent 2d87b57 commit fcfa21c
Showing 1 changed file with 50 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,23 @@ protected final int getLength(int index) {
return viewBuffer.getInt(((long) index * ELEMENT_SIZE));
}

protected ArrowBuf allocateOrGetLastBuffer(BufferAllocator allocator, int length, List<ArrowBuf> dataBuffers) {
long dataBufferSize;
if (initialDataBufferSize > 0) {
dataBufferSize = initialDataBufferSize;
} else {
dataBufferSize = lastValueAllocationSizeInBytes;
}

if (dataBuffers.isEmpty() || dataBuffers.get(dataBuffers.size() - 1).capacity() -
dataBuffers.get(dataBuffers.size() - 1).writerIndex() < length) {
ArrowBuf newBuf = allocator.buffer(dataBufferSize);
dataBuffers.add(newBuf);
}

return dataBuffers.get(dataBuffers.size() - 1);
}

/**
* This method is used to create a view buffer for a variable width vector.
* It handles both inline and data buffers.
Expand All @@ -1208,95 +1225,51 @@ protected final int getLength(int index) {
* The valueBuffer stores the length of the value followed by the value itself.
* If the length of the value is greater than {@link #INLINE_SIZE}, a new buffer is allocated and added to dataBuffers
* to hold the value.
* The valueBuffer in this case stores the length of the value, a prefix of the value, the index of the
* The viewBuffer in this case stores the length of the value, a prefix of the value, the index of the
* new buffer in dataBuffers, and the offset of the value in the new buffer.
*
* @param allocator The allocator used to create new ArrowBuf instances.
* @param index The index at which the new value will be inserted.
* @param value The byte array that contains the data to be inserted.
* @param start The start index in the byte array from where the data for the new value begins.
* @param length The length of the data in the byte array that belongs to the new value.
* @param valueBuffer The ArrowBuf instance that will hold the data of the new value.
* @param dataBuffers The list of ArrowBuf instances that hold the data of all values in the vector.
* @param viewBuffer The ArrowBuf instance that will hold the data of the new value.
* @param dataBuffers The list of ArrowBuf instances that hold the data of all long binary values in the vector.
*/
protected void createViewBuffer(
BufferAllocator allocator,
int index,
byte[] value,
int start,
int length,
ArrowBuf valueBuffer,
List<ArrowBuf> dataBuffers) {
BufferAllocator allocator,
int index,
byte[] value,
int start,
int length,
ArrowBuf viewBuffer,
List<ArrowBuf> dataBuffers) {
int writePosition = index * ELEMENT_SIZE;
if (value.length <= INLINE_SIZE) {
// allocate inline buffer
// set length
valueBuffer.setInt(writePosition, length);
viewBuffer.setInt(writePosition, length);
writePosition += LENGTH_WIDTH;
// set data
valueBuffer.setBytes(writePosition, value, start, length);
viewBuffer.setBytes(writePosition, value, start, length);
} else {
// allocate data buffer
ArrowBuf currentBuf = allocateOrGetLastBuffer(allocator, length, dataBuffers);

// pre-allocate the data buffer depending on initial capacity setup
long dataBufferSize;
if (initialDataBufferSize > 0) {
dataBufferSize = initialDataBufferSize;
} else {
dataBufferSize = lastValueAllocationSizeInBytes;
}
if (dataBuffers.isEmpty()) {
// the first data buffer needs to be added
ArrowBuf newDataBuf = allocator.buffer(dataBufferSize);
// set length
valueBuffer.setInt(writePosition, length);
writePosition += LENGTH_WIDTH;
// set prefix
valueBuffer.setBytes(writePosition, value, start, PREFIX_WIDTH);
writePosition += PREFIX_WIDTH;
// set buf id
valueBuffer.setInt(writePosition, /*first buffer*/0);
writePosition += BUF_INDEX_WIDTH;
// set offset
valueBuffer.setInt(writePosition, 0);
newDataBuf.setBytes(0, value, 0, length);
newDataBuf.writerIndex(length);
dataBuffers.add(newDataBuf);
} else {
// insert to the last buffer in the data buffers or allocate new if the last buffer isn't enough
// set lengths
valueBuffer.setInt(writePosition, length);
writePosition += LENGTH_WIDTH;
// set prefix
valueBuffer.setBytes(writePosition, value, start, PREFIX_WIDTH);
writePosition += PREFIX_WIDTH;
// set buf id
int currentBufId = dataBuffers.size() - 1;
ArrowBuf currentBuf = dataBuffers.get(currentBufId);
if (currentBuf.capacity() - currentBuf.writerIndex() >= length) {
// current buffer is enough
// set buf indexes
valueBuffer.setInt(writePosition, currentBufId);
writePosition += BUF_INDEX_WIDTH;
// set offset
valueBuffer.setInt(writePosition, (int) currentBuf.writerIndex());
currentBuf.setBytes(currentBuf.writerIndex(), value, start, length);
currentBuf.writerIndex(currentBuf.writerIndex() + length);
dataBuffers.set(currentBufId, currentBuf);
} else {
// current buffer is not enough
// allocate new buffer
ArrowBuf newBuf = allocator.buffer(dataBufferSize);
// set buf index
valueBuffer.setInt(writePosition, dataBuffers.size());
writePosition += BUF_INDEX_WIDTH;
// set offset
valueBuffer.setInt(writePosition, 0);
newBuf.setBytes(0, value, start, length);
newBuf.writerIndex(newBuf.writerIndex() + length);
dataBuffers.add(newBuf);
}
}
// set length
viewBuffer.setInt(writePosition, length);
writePosition += LENGTH_WIDTH;
// set prefix
viewBuffer.setBytes(writePosition, value, start, PREFIX_WIDTH);
writePosition += PREFIX_WIDTH;
// set buf id
viewBuffer.setInt(writePosition, dataBuffers.size() - 1);
writePosition += BUF_INDEX_WIDTH;
// set offset
viewBuffer.setInt(writePosition, (int) currentBuf.writerIndex());

currentBuf.setBytes(currentBuf.writerIndex(), value, start, length);
currentBuf.writerIndex(currentBuf.writerIndex() + length);
}
}

Expand All @@ -1321,14 +1294,14 @@ protected final void handleSafe(int index, int dataLength) {
final long lastSetCapacity = lastSet < 0 ? 0 : (long) index * ELEMENT_SIZE;
final long targetCapacity = roundUpToMultipleOf16(lastSetCapacity + dataLength);
// for views, we need each buffer with 16 byte alignment, so we need to check the last written index
// in the valueBuffer and allocate a new buffer which has 16 byte alignment for adding new values.
// in the viewBuffer and allocate a new buffer which has 16 byte alignment for adding new values.
long writePosition = (long) index * ELEMENT_SIZE;
if (viewBuffer.capacity() <= writePosition || viewBuffer.capacity() < targetCapacity) {
/*
* Everytime we want to increase the capacity of the valueBuffer, we need to make sure that the new capacity
* Everytime we want to increase the capacity of the viewBuffer, we need to make sure that the new capacity
* meets 16 byte alignment.
* If the targetCapacity is larger than the writePosition, we may not necessarily
* want to allocate the targetCapacity to valueBuffer since when it is >={@link #INLINE_SIZE} either way
* want to allocate the targetCapacity to viewBuffer since when it is >={@link #INLINE_SIZE} either way
* we are writing to the dataBuffer.
*/
reallocViewBuffer(Math.max(writePosition, targetCapacity));
Expand Down Expand Up @@ -1423,12 +1396,12 @@ public int hashCode(int index, ArrowBufHasher hasher) {
*
* <p>
* If the length of the data is greater than {@link #INLINE_SIZE}, the data is stored in an inline buffer.
* The method retrieves the buffer index and data offset from the valueBuffer, and then retrieves the data from the
* The method retrieves the buffer index and data offset from the viewBuffer, and then retrieves the data from the
* corresponding buffer in the dataBuffers list.
* <p>
* If the length of the data is less than or equal to {@link #INLINE_SIZE}, the data is stored directly in the
* valueBuffer.
* The method retrieves the data directly from the valueBuffer.
* viewBuffer.
* The method retrieves the data directly from the viewBuffer.
*
* @param index position of the element in the vector
* @return byte array containing the data of the element
Expand Down

0 comments on commit fcfa21c

Please sign in to comment.