From 67fc7ceadab5a7ff7446c95097ebe2e8c501296a Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 22 May 2024 11:28:33 +0530 Subject: [PATCH] fix: adding new constructors --- .../vector/ipc/message/ArrowRecordBatch.java | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index 6a529db598fff..0ea71d02ad814 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -71,6 +71,21 @@ public ArrowRecordBatch( this(length, nodes, buffers, bodyCompression, null, true); } + /** + * Construct a record batch from nodes. + * + * @param length how many rows in this batch + * @param nodes field level info + * @param buffers will be retained until this recordBatch is closed + * @param bodyCompression compression info. + * @param alignBuffers Whether to align buffers to an 8 byte boundary. + */ + public ArrowRecordBatch( + int length, List nodes, List buffers, + ArrowBodyCompression bodyCompression, boolean alignBuffers) { + this(length, nodes, buffers, bodyCompression, alignBuffers, /*retainBuffers*/ true); + } + /** * Construct a record batch from nodes. * @@ -87,6 +102,47 @@ public ArrowRecordBatch( this(length, nodes, buffers, bodyCompression, variadicBufferCounts, alignBuffers, /*retainBuffers*/ true); } + /** + * Construct a record batch from nodes. + * + * @param length how many rows in this batch + * @param nodes field level info + * @param buffers will be retained until this recordBatch is closed + * @param bodyCompression compression info. + * @param alignBuffers Whether to align buffers to an 8 byte boundary. + * @param retainBuffers Whether to retain() each source buffer in the constructor. If false, the caller is + * responsible for retaining the buffers beforehand. + */ + public ArrowRecordBatch( + int length, List nodes, List buffers, + ArrowBodyCompression bodyCompression, boolean alignBuffers, + boolean retainBuffers) { + super(); + this.length = length; + this.nodes = nodes; + this.buffers = buffers; + Preconditions.checkArgument(bodyCompression != null, "body compression cannot be null"); + this.bodyCompression = bodyCompression; + List arrowBuffers = new ArrayList<>(buffers.size()); + long offset = 0; + for (ArrowBuf arrowBuf : buffers) { + if (retainBuffers) { + arrowBuf.getReferenceManager().retain(); + } + long size = arrowBuf.readableBytes(); + arrowBuffers.add(new ArrowBuffer(offset, size)); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Buffer in RecordBatch at {}, length: {}", offset, size); + } + offset += size; + if (alignBuffers) { // align on 8 byte boundaries + offset = DataSizeRoundingUtil.roundUpTo8Multiple(offset); + } + } + this.buffersLayout = Collections.unmodifiableList(arrowBuffers); + this.variadicBufferCounts = null; + } + /** * Construct a record batch from nodes. *