Skip to content

Commit

Permalink
apacheGH-37885: [Swift] allow reading of unaligned flatbuffer buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
abandy committed Dec 4, 2023
1 parent f7947cc commit 4114f5a
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 10 deletions.
2 changes: 1 addition & 1 deletion swift/Arrow/Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ let package = Package(
targets: ["Arrow"]),
],
dependencies: [
.package(url: "https://github.com/google/flatbuffers.git", from: "23.3.3")
.package(url: "https://github.com/google/flatbuffers.git", branch: "master")
],
targets: [
// Targets are the basic building blocks of a package. A target can define a module or a test suite.
Expand Down
18 changes: 13 additions & 5 deletions swift/Arrow/Sources/Arrow/ArrowReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ public class ArrowReader {
}

public func fromStream( // swiftlint:disable:this function_body_length
_ fileData: Data
_ fileData: Data,
useUnalignedBuffers: Bool = false
) -> Result<ArrowReaderResult, ArrowError> {
let footerLength = fileData.withUnsafeBytes { rawBuffer in
rawBuffer.loadUnaligned(fromByteOffset: fileData.count - 4, as: Int32.self)
Expand All @@ -141,7 +142,9 @@ public class ArrowReader {
let result = ArrowReaderResult()
let footerStartOffset = fileData.count - Int(footerLength + 4)
let footerData = fileData[footerStartOffset...]
let footerBuffer = ByteBuffer(data: footerData)
let footerBuffer = ByteBuffer(
data: footerData,
allowReadingUnalignedBuffers: useUnalignedBuffers)
let footer = org_apache_arrow_flatbuf_Footer.getRootAsFooter(bb: footerBuffer)
let schemaResult = loadSchema(footer.schema!)
switch schemaResult {
Expand Down Expand Up @@ -170,7 +173,9 @@ public class ArrowReader {
let messageStartOffset = recordBatch.offset + (Int64(MemoryLayout<Int32>.size) * messageOffset)
let messageEndOffset = messageStartOffset + Int64(messageLength)
let recordBatchData = fileData[messageStartOffset ..< messageEndOffset]
let mbb = ByteBuffer(data: recordBatchData)
let mbb = ByteBuffer(
data: recordBatchData,
allowReadingUnalignedBuffers: useUnalignedBuffers)
let message = org_apache_arrow_flatbuf_Message.getRootAsMessage(bb: mbb)
switch message.headerType {
case .recordbatch:
Expand Down Expand Up @@ -219,9 +224,12 @@ public class ArrowReader {
public func fromMessage(
_ dataHeader: Data,
dataBody: Data,
result: ArrowReaderResult
result: ArrowReaderResult,
useUnalignedBuffers: Bool = false
) -> Result<Void, ArrowError> {
let mbb = ByteBuffer(data: dataHeader)
let mbb = ByteBuffer(
data: dataHeader,
allowReadingUnalignedBuffers: useUnalignedBuffers)
let message = org_apache_arrow_flatbuf_Message.getRootAsMessage(bb: mbb)
switch message.headerType {
case .schema:
Expand Down
11 changes: 9 additions & 2 deletions swift/ArrowFlight/Sources/ArrowFlight/FlightClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import Arrow

public class FlightClient {
let client: Arrow_Flight_Protocol_FlightServiceAsyncClient
public init(channel: GRPCChannel) {
let allowReadingUnalignedBuffers: Bool

public init(channel: GRPCChannel, allowReadingUnalignedBuffers: Bool = false ) {
client = Arrow_Flight_Protocol_FlightServiceAsyncClient(channel: channel)
self.allowReadingUnalignedBuffers = allowReadingUnalignedBuffers
}

private func readMessages(
Expand All @@ -34,7 +37,11 @@ public class FlightClient {
let reader = ArrowReader()
let arrowResult = ArrowReader.makeArrowReaderResult()
for try await data in responseStream {
switch reader.fromMessage(data.dataHeader, dataBody: data.dataBody, result: arrowResult) {
switch reader.fromMessage(
data.dataHeader,
dataBody: data.dataBody,
result: arrowResult,
useUnalignedBuffers: allowReadingUnalignedBuffers) {
case .success:
continue
case .failure(let error):
Expand Down
7 changes: 7 additions & 0 deletions swift/ArrowFlight/Sources/ArrowFlight/FlightServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public func schemaFromMessage(_ schemaData: Data) -> ArrowSchema? {
}

public protocol ArrowFlightServer: Sendable {
var allowReadingUnalignedBuffers: Bool { get }
func listFlights(_ criteria: FlightCriteria, writer: FlightInfoStreamWriter) async throws
func getFlightInfo(_ request: FlightDescriptor) async throws -> FlightInfo
func getSchema(_ request: FlightDescriptor) async throws -> ArrowFlight.FlightSchemaResult
Expand All @@ -73,6 +74,12 @@ public protocol ArrowFlightServer: Sendable {
func doExchange(_ reader: RecordBatchStreamReader, writer: RecordBatchStreamWriter) async throws
}

extension ArrowFlightServer {
var allowReadingUnalignedBuffers: Bool {
return false
}
}

public func makeFlightServer(_ handler: ArrowFlightServer) -> CallHandlerProvider {
return InternalFlightServer(handler)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ public class RecordBatchStreamReader: AsyncSequence, AsyncIteratorProtocol {
var descriptor: FlightDescriptor?
var batchIndex = 0
var streamIterator: any AsyncIteratorProtocol
var useUnalignedBuffers: Bool
let stream: GRPC.GRPCAsyncRequestStream<Arrow_Flight_Protocol_FlightData>
init(_ stream: GRPC.GRPCAsyncRequestStream<Arrow_Flight_Protocol_FlightData>) {
init(_ stream: GRPC.GRPCAsyncRequestStream<Arrow_Flight_Protocol_FlightData>,
useUnalignedBuffers: Bool = false) {
self.stream = stream
self.streamIterator = self.stream.makeAsyncIterator()
self.useUnalignedBuffers = useUnalignedBuffers
}

public func next() async throws -> (Arrow.RecordBatch?, FlightDescriptor?)? {
Expand All @@ -55,7 +58,11 @@ public class RecordBatchStreamReader: AsyncSequence, AsyncIteratorProtocol {
let dataBody = flightData.dataBody
let dataHeader = flightData.dataHeader
descriptor = FlightDescriptor(flightData.flightDescriptor)
switch reader.fromMessage(dataHeader, dataBody: dataBody, result: result) {
switch reader.fromMessage(
dataHeader,
dataBody: dataBody,
result: result,
useUnalignedBuffers: useUnalignedBuffers) {
case .success(()):
if result.batches.count > 0 {
batches = result.batches
Expand Down

0 comments on commit 4114f5a

Please sign in to comment.