Skip to content

Commit

Permalink
apacheGH-37938:[Swift] fix null count when using reader
Browse files Browse the repository at this point in the history
  • Loading branch information
abandy committed Jan 8, 2024
1 parent 1f42e6d commit 25d789c
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 81 deletions.
48 changes: 41 additions & 7 deletions swift/Arrow/Sources/Arrow/ArrowBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ public class ArrowBuffer {
data.append(ptr, count: Int(capacity))
}

static func createEmptyBuffer() -> ArrowBuffer {
return ArrowBuffer(
length: 0,
capacity: 0,
rawPointer: UnsafeMutableRawPointer.allocate(byteCount: 0, alignment: .zero))
}

static func createBuffer(_ data: [UInt8], length: UInt) -> ArrowBuffer {
let byteCount = UInt(data.count)
let capacity = alignTo64(byteCount)
Expand All @@ -48,14 +55,10 @@ public class ArrowBuffer {
return ArrowBuffer(length: length, capacity: capacity, rawPointer: rawPointer)
}

static func createBuffer(_ length: UInt, size: UInt, doAlign: Bool = true) -> ArrowBuffer {
static func createBuffer(_ length: UInt, size: UInt) -> ArrowBuffer {
let actualLen = max(length, ArrowBuffer.minLength)
let byteCount = size * actualLen
var capacity = byteCount
if doAlign {
capacity = alignTo64(byteCount)
}

let capacity = alignTo64(byteCount)
let memory = MemoryAllocator(64)
let rawPointer = memory.allocateArray(Int(capacity))
rawPointer.initializeMemory(as: UInt8.self, repeating: 0, count: Int(capacity))
Expand All @@ -66,7 +69,11 @@ public class ArrowBuffer {
to.rawPointer.copyMemory(from: from.rawPointer, byteCount: Int(len))
}

private static func alignTo64(_ length: UInt) -> UInt {
static func copyCurrent(_ from: ArrowBuffer, to: inout ArrowNullBuffer, len: UInt) {
to.rawPointer.copyMemory(from: from.rawPointer, byteCount: Int(len))
}

fileprivate static func alignTo64(_ length: UInt) -> UInt {
let bufAlignment = length % 64
if bufAlignment != 0 {
return length + (64 - bufAlignment) + 8
Expand All @@ -75,3 +82,30 @@ public class ArrowBuffer {
return length + 8
}
}

public class ArrowNullBuffer: ArrowBuffer {
var nullCount: UInt
init(_ nullCount: UInt, length: UInt, capacity: UInt, rawPointer: UnsafeMutableRawPointer) {
self.nullCount = nullCount
super.init(length: length, capacity: capacity, rawPointer: rawPointer)
}

static func createBuffer(_ data: [UInt8], length: UInt, nullCount: UInt) -> ArrowNullBuffer {
let byteCount = UInt(data.count)
let capacity = alignTo64(byteCount)
let memory = MemoryAllocator(64)
let rawPointer = memory.allocateArray(Int(capacity))
rawPointer.copyMemory(from: data, byteCount: data.count)
return ArrowNullBuffer(nullCount, length: length, capacity: capacity, rawPointer: rawPointer)
}

static func createBuffer(_ length: UInt, size: UInt, nullCount: UInt) -> ArrowNullBuffer {
let actualLen = max(length, ArrowBuffer.minLength)
let byteCount = size * actualLen
let capacity = alignTo64(byteCount)
let memory = MemoryAllocator(64)
let rawPointer = memory.allocateArray(Int(capacity))
rawPointer.initializeMemory(as: UInt8.self, repeating: 0, count: Int(capacity))
return ArrowNullBuffer(nullCount, length: length, capacity: capacity, rawPointer: rawPointer)
}
}
37 changes: 22 additions & 15 deletions swift/Arrow/Sources/Arrow/ArrowBufferBuilder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ public protocol ArrowBufferBuilder {

public class BaseBufferBuilder<T> {
var values: ArrowBuffer
var nulls: ArrowBuffer
var nulls: ArrowNullBuffer
var stride: Int
public var offset: UInt = 0
public var capacity: UInt {return self.values.capacity}
public var length: UInt = 0
public var nullCount: UInt = 0
public var nullCount: UInt {return self.nulls.nullCount}

init(values: ArrowBuffer, nulls: ArrowBuffer, stride: Int = MemoryLayout<T>.stride) {
init(values: ArrowBuffer, nulls: ArrowNullBuffer, stride: Int = MemoryLayout<T>.stride) {
self.stride = stride
self.values = values
self.nulls = nulls
Expand Down Expand Up @@ -67,7 +67,7 @@ public class FixedBufferBuilder<T>: BaseBufferBuilder<T>, ArrowBufferBuilder {
public required init() throws {
self.defaultVal = try FixedBufferBuilder<T>.defaultValueForType()
let values = ArrowBuffer.createBuffer(0, size: UInt(MemoryLayout<T>.stride))
let nulls = ArrowBuffer.createBuffer(0, size: UInt(MemoryLayout<UInt8>.stride))
let nulls = ArrowNullBuffer.createBuffer(0, size: UInt(MemoryLayout<UInt8>.stride), nullCount: 0)
super.init(values: values, nulls: nulls)
}

Expand All @@ -83,7 +83,7 @@ public class FixedBufferBuilder<T>: BaseBufferBuilder<T>, ArrowBufferBuilder {
BitUtility.setBit(index + self.offset, buffer: self.nulls)
self.values.rawPointer.advanced(by: byteIndex).storeBytes(of: val, as: T.self)
} else {
self.nullCount += 1
self.nulls.nullCount += 1
BitUtility.clearBit(index + self.offset, buffer: self.nulls)
self.values.rawPointer.advanced(by: byteIndex).storeBytes(of: defaultVal, as: T.self)
}
Expand All @@ -93,7 +93,8 @@ public class FixedBufferBuilder<T>: BaseBufferBuilder<T>, ArrowBufferBuilder {
if length > self.values.length {
let resizeLength = resizeLength(self.values)
var values = ArrowBuffer.createBuffer(resizeLength, size: UInt(MemoryLayout<T>.size))
var nulls = ArrowBuffer.createBuffer(resizeLength/8 + 1, size: UInt(MemoryLayout<UInt8>.size))
var nulls = ArrowNullBuffer.createBuffer(resizeLength/8 + 1, size: UInt(MemoryLayout<UInt8>.size),
nullCount: self.nullCount)
ArrowBuffer.copyCurrent(self.values, to: &values, len: self.values.capacity)
ArrowBuffer.copyCurrent(self.nulls, to: &nulls, len: self.nulls.capacity)
self.values = values
Expand All @@ -104,7 +105,8 @@ public class FixedBufferBuilder<T>: BaseBufferBuilder<T>, ArrowBufferBuilder {
public func finish() -> [ArrowBuffer] {
let length = self.length
var values = ArrowBuffer.createBuffer(length, size: UInt(MemoryLayout<T>.size))
var nulls = ArrowBuffer.createBuffer(length/8 + 1, size: UInt(MemoryLayout<UInt8>.size))
var nulls = ArrowNullBuffer.createBuffer(length/8 + 1, size: UInt(MemoryLayout<UInt8>.size),
nullCount: self.nullCount)
ArrowBuffer.copyCurrent(self.values, to: &values, len: values.capacity)
ArrowBuffer.copyCurrent(self.nulls, to: &nulls, len: nulls.capacity)
return [nulls, values]
Expand Down Expand Up @@ -142,7 +144,8 @@ public class BoolBufferBuilder: BaseBufferBuilder<Bool>, ArrowBufferBuilder {
public typealias ItemType = Bool
public required init() throws {
let values = ArrowBuffer.createBuffer(0, size: UInt(MemoryLayout<UInt8>.stride))
let nulls = ArrowBuffer.createBuffer(0, size: UInt(MemoryLayout<UInt8>.stride))
let nulls = ArrowNullBuffer.createBuffer(0, size: UInt(MemoryLayout<UInt8>.stride),
nullCount: 0)
super.init(values: values, nulls: nulls)
}

Expand All @@ -162,7 +165,7 @@ public class BoolBufferBuilder: BaseBufferBuilder<Bool>, ArrowBufferBuilder {
}

} else {
self.nullCount += 1
self.nulls.nullCount += 1
BitUtility.clearBit(index + self.offset, buffer: self.nulls)
BitUtility.clearBit(index + self.offset, buffer: self.values)
}
Expand All @@ -172,7 +175,8 @@ public class BoolBufferBuilder: BaseBufferBuilder<Bool>, ArrowBufferBuilder {
if (length/8) > self.values.length {
let resizeLength = resizeLength(self.values)
var values = ArrowBuffer.createBuffer(resizeLength, size: UInt(MemoryLayout<UInt8>.size))
var nulls = ArrowBuffer.createBuffer(resizeLength, size: UInt(MemoryLayout<UInt8>.size))
var nulls = ArrowNullBuffer.createBuffer(resizeLength, size: UInt(MemoryLayout<UInt8>.size),
nullCount: nullCount)
ArrowBuffer.copyCurrent(self.values, to: &values, len: self.values.capacity)
ArrowBuffer.copyCurrent(self.nulls, to: &nulls, len: self.nulls.capacity)
self.values = values
Expand All @@ -183,7 +187,8 @@ public class BoolBufferBuilder: BaseBufferBuilder<Bool>, ArrowBufferBuilder {
public func finish() -> [ArrowBuffer] {
let length = self.length
var values = ArrowBuffer.createBuffer(length, size: UInt(MemoryLayout<UInt8>.size))
var nulls = ArrowBuffer.createBuffer(length, size: UInt(MemoryLayout<UInt8>.size))
var nulls = ArrowNullBuffer.createBuffer(length, size: UInt(MemoryLayout<UInt8>.size),
nullCount: nullCount)
ArrowBuffer.copyCurrent(self.values, to: &values, len: values.capacity)
ArrowBuffer.copyCurrent(self.nulls, to: &nulls, len: nulls.capacity)
return [nulls, values]
Expand All @@ -196,7 +201,7 @@ public class VariableBufferBuilder<T>: BaseBufferBuilder<T>, ArrowBufferBuilder
let binaryStride = MemoryLayout<UInt8>.stride
public required init() throws {
let values = ArrowBuffer.createBuffer(0, size: UInt(binaryStride))
let nulls = ArrowBuffer.createBuffer(0, size: UInt(binaryStride))
let nulls = ArrowNullBuffer.createBuffer(0, size: UInt(binaryStride), nullCount: 0)
self.offsets = ArrowBuffer.createBuffer(0, size: UInt(MemoryLayout<Int32>.stride))
super.init(values: values, nulls: nulls, stride: binaryStride)
}
Expand Down Expand Up @@ -229,7 +234,7 @@ public class VariableBufferBuilder<T>: BaseBufferBuilder<T>, ArrowBufferBuilder
}

if isNull {
self.nullCount += 1
self.nulls.nullCount += 1
BitUtility.clearBit(index + self.offset, buffer: self.nulls)
} else {
BitUtility.setBit(index + self.offset, buffer: self.nulls)
Expand Down Expand Up @@ -257,7 +262,8 @@ public class VariableBufferBuilder<T>: BaseBufferBuilder<T>, ArrowBufferBuilder
public func resize(_ length: UInt) {
if length > self.offsets.length {
let resizeLength = resizeLength(self.offsets, len: length)
var nulls = ArrowBuffer.createBuffer(resizeLength/8 + 1, size: UInt(MemoryLayout<UInt8>.size))
var nulls = ArrowNullBuffer.createBuffer(resizeLength/8 + 1, size: UInt(MemoryLayout<UInt8>.size),
nullCount: self.nullCount)
var offsets = ArrowBuffer.createBuffer(resizeLength, size: UInt(MemoryLayout<Int32>.size))
ArrowBuffer.copyCurrent(self.nulls, to: &nulls, len: self.nulls.capacity)
ArrowBuffer.copyCurrent(self.offsets, to: &offsets, len: self.offsets.capacity)
Expand All @@ -269,7 +275,8 @@ public class VariableBufferBuilder<T>: BaseBufferBuilder<T>, ArrowBufferBuilder
public func finish() -> [ArrowBuffer] {
let length = self.length
var values = ArrowBuffer.createBuffer(self.values.length, size: UInt(MemoryLayout<UInt8>.size))
var nulls = ArrowBuffer.createBuffer(length/8 + 1, size: UInt(MemoryLayout<UInt8>.size))
var nulls = ArrowNullBuffer.createBuffer(length/8 + 1, size: UInt(MemoryLayout<UInt8>.size),
nullCount: self.nullCount)
var offsets = ArrowBuffer.createBuffer(length, size: UInt(MemoryLayout<Int32>.size))
ArrowBuffer.copyCurrent(self.values, to: &values, len: values.capacity)
ArrowBuffer.copyCurrent(self.nulls, to: &nulls, len: nulls.capacity)
Expand Down
8 changes: 6 additions & 2 deletions swift/Arrow/Sources/Arrow/ArrowReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ public class ArrowReader {
private func loadPrimitiveData(_ loadInfo: DataLoadInfo) -> Result<ArrowArrayHolder, ArrowError> {
do {
let node = loadInfo.recordBatch.nodes(at: loadInfo.nodeIndex)!
let nullLength = UInt(ceil(Double(node.length) / 8))
try validateBufferIndex(loadInfo.recordBatch, index: loadInfo.bufferIndex)
let nullBuffer = loadInfo.recordBatch.buffers(at: loadInfo.bufferIndex)!
let arrowNullBuffer = makeBuffer(nullBuffer, fileData: loadInfo.fileData,
length: UInt(node.nullCount), messageOffset: loadInfo.messageOffset)
length: nullLength, messageOffset: loadInfo.messageOffset,
nullCount: UInt(node.nullCount))
try validateBufferIndex(loadInfo.recordBatch, index: loadInfo.bufferIndex + 1)
let valueBuffer = loadInfo.recordBatch.buffers(at: loadInfo.bufferIndex + 1)!
let arrowValueBuffer = makeBuffer(valueBuffer, fileData: loadInfo.fileData,
Expand All @@ -76,10 +78,12 @@ public class ArrowReader {
private func loadVariableData(_ loadInfo: DataLoadInfo) -> Result<ArrowArrayHolder, ArrowError> {
let node = loadInfo.recordBatch.nodes(at: loadInfo.nodeIndex)!
do {
let nullLength = UInt(ceil(Double(node.length) / 8))
try validateBufferIndex(loadInfo.recordBatch, index: loadInfo.bufferIndex)
let nullBuffer = loadInfo.recordBatch.buffers(at: loadInfo.bufferIndex)!
let arrowNullBuffer = makeBuffer(nullBuffer, fileData: loadInfo.fileData,
length: UInt(node.nullCount), messageOffset: loadInfo.messageOffset)
length: nullLength, messageOffset: loadInfo.messageOffset,
nullCount: UInt(node.nullCount))
try validateBufferIndex(loadInfo.recordBatch, index: loadInfo.bufferIndex + 1)
let offsetBuffer = loadInfo.recordBatch.buffers(at: loadInfo.bufferIndex + 1)!
let arrowOffsetBuffer = makeBuffer(offsetBuffer, fileData: loadInfo.fileData,
Expand Down
Loading

0 comments on commit 25d789c

Please sign in to comment.