diff --git a/swift/Arrow/Package.swift b/swift/Arrow/Package.swift index 065afe62640ea..dcbd9304d5ca4 100644 --- a/swift/Arrow/Package.swift +++ b/swift/Arrow/Package.swift @@ -32,7 +32,7 @@ let package = Package( targets: ["Arrow"]), ], dependencies: [ - .package(url: "https://github.com/google/flatbuffers.git", from: "23.3.3") + .package(url: "https://github.com/google/flatbuffers.git", branch: "master") ], targets: [ // Targets are the basic building blocks of a package. A target can define a module or a test suite. diff --git a/swift/Arrow/Sources/Arrow/ArrowReader.swift b/swift/Arrow/Sources/Arrow/ArrowReader.swift index ef995b18052a8..d9dc1bdb470e6 100644 --- a/swift/Arrow/Sources/Arrow/ArrowReader.swift +++ b/swift/Arrow/Sources/Arrow/ArrowReader.swift @@ -132,7 +132,8 @@ public class ArrowReader { } public func fromStream( // swiftlint:disable:this function_body_length - _ fileData: Data + _ fileData: Data, + useUnalignedBuffers: Bool = false ) -> Result { let footerLength = fileData.withUnsafeBytes { rawBuffer in rawBuffer.loadUnaligned(fromByteOffset: fileData.count - 4, as: Int32.self) @@ -141,7 +142,9 @@ public class ArrowReader { let result = ArrowReaderResult() let footerStartOffset = fileData.count - Int(footerLength + 4) let footerData = fileData[footerStartOffset...] - let footerBuffer = ByteBuffer(data: footerData) + let footerBuffer = ByteBuffer( + data: footerData, + allowReadingUnalignedBuffers: useUnalignedBuffers) let footer = org_apache_arrow_flatbuf_Footer.getRootAsFooter(bb: footerBuffer) let schemaResult = loadSchema(footer.schema!) switch schemaResult { @@ -170,7 +173,9 @@ public class ArrowReader { let messageStartOffset = recordBatch.offset + (Int64(MemoryLayout.size) * messageOffset) let messageEndOffset = messageStartOffset + Int64(messageLength) let recordBatchData = fileData[messageStartOffset ..< messageEndOffset] - let mbb = ByteBuffer(data: recordBatchData) + let mbb = ByteBuffer( + data: recordBatchData, + allowReadingUnalignedBuffers: useUnalignedBuffers) let message = org_apache_arrow_flatbuf_Message.getRootAsMessage(bb: mbb) switch message.headerType { case .recordbatch: @@ -219,9 +224,12 @@ public class ArrowReader { public func fromMessage( _ dataHeader: Data, dataBody: Data, - result: ArrowReaderResult + result: ArrowReaderResult, + useUnalignedBuffers: Bool = false ) -> Result { - let mbb = ByteBuffer(data: dataHeader) + let mbb = ByteBuffer( + data: dataHeader, + allowReadingUnalignedBuffers: useUnalignedBuffers) let message = org_apache_arrow_flatbuf_Message.getRootAsMessage(bb: mbb) switch message.headerType { case .schema: diff --git a/swift/ArrowFlight/Sources/ArrowFlight/FlightClient.swift b/swift/ArrowFlight/Sources/ArrowFlight/FlightClient.swift index 7a572ceca5bd6..ef3e4fa239e84 100644 --- a/swift/ArrowFlight/Sources/ArrowFlight/FlightClient.swift +++ b/swift/ArrowFlight/Sources/ArrowFlight/FlightClient.swift @@ -24,8 +24,11 @@ import Arrow public class FlightClient { let client: Arrow_Flight_Protocol_FlightServiceAsyncClient - public init(channel: GRPCChannel) { + let allowReadingUnalignedBuffers: Bool + + public init(channel: GRPCChannel, allowReadingUnalignedBuffers: Bool = false ) { client = Arrow_Flight_Protocol_FlightServiceAsyncClient(channel: channel) + self.allowReadingUnalignedBuffers = allowReadingUnalignedBuffers } private func readMessages( @@ -34,7 +37,11 @@ public class FlightClient { let reader = ArrowReader() let arrowResult = ArrowReader.makeArrowReaderResult() for try await data in responseStream { - switch reader.fromMessage(data.dataHeader, dataBody: data.dataBody, result: arrowResult) { + switch reader.fromMessage( + data.dataHeader, + dataBody: data.dataBody, + result: arrowResult, + useUnalignedBuffers: allowReadingUnalignedBuffers) { case .success: continue case .failure(let error): diff --git a/swift/ArrowFlight/Sources/ArrowFlight/FlightServer.swift b/swift/ArrowFlight/Sources/ArrowFlight/FlightServer.swift index a34bf5c0acee9..19644d632e997 100644 --- a/swift/ArrowFlight/Sources/ArrowFlight/FlightServer.swift +++ b/swift/ArrowFlight/Sources/ArrowFlight/FlightServer.swift @@ -63,6 +63,7 @@ public func schemaFromMessage(_ schemaData: Data) -> ArrowSchema? { } public protocol ArrowFlightServer: Sendable { + var allowReadingUnalignedBuffers: Bool { get } func listFlights(_ criteria: FlightCriteria, writer: FlightInfoStreamWriter) async throws func getFlightInfo(_ request: FlightDescriptor) async throws -> FlightInfo func getSchema(_ request: FlightDescriptor) async throws -> ArrowFlight.FlightSchemaResult @@ -73,6 +74,12 @@ public protocol ArrowFlightServer: Sendable { func doExchange(_ reader: RecordBatchStreamReader, writer: RecordBatchStreamWriter) async throws } +extension ArrowFlightServer { + var allowReadingUnalignedBuffers: Bool { + return false + } +} + public func makeFlightServer(_ handler: ArrowFlightServer) -> CallHandlerProvider { return InternalFlightServer(handler) } diff --git a/swift/ArrowFlight/Sources/ArrowFlight/RecordBatchStreamReader.swift b/swift/ArrowFlight/Sources/ArrowFlight/RecordBatchStreamReader.swift index 972d19435ddfc..464752dbcbeea 100644 --- a/swift/ArrowFlight/Sources/ArrowFlight/RecordBatchStreamReader.swift +++ b/swift/ArrowFlight/Sources/ArrowFlight/RecordBatchStreamReader.swift @@ -27,10 +27,13 @@ public class RecordBatchStreamReader: AsyncSequence, AsyncIteratorProtocol { var descriptor: FlightDescriptor? var batchIndex = 0 var streamIterator: any AsyncIteratorProtocol + var useUnalignedBuffers: Bool let stream: GRPC.GRPCAsyncRequestStream - init(_ stream: GRPC.GRPCAsyncRequestStream) { + init(_ stream: GRPC.GRPCAsyncRequestStream, + useUnalignedBuffers: Bool = false) { self.stream = stream self.streamIterator = self.stream.makeAsyncIterator() + self.useUnalignedBuffers = useUnalignedBuffers } public func next() async throws -> (Arrow.RecordBatch?, FlightDescriptor?)? { @@ -55,7 +58,11 @@ public class RecordBatchStreamReader: AsyncSequence, AsyncIteratorProtocol { let dataBody = flightData.dataBody let dataHeader = flightData.dataHeader descriptor = FlightDescriptor(flightData.flightDescriptor) - switch reader.fromMessage(dataHeader, dataBody: dataBody, result: result) { + switch reader.fromMessage( + dataHeader, + dataBody: dataBody, + result: result, + useUnalignedBuffers: useUnalignedBuffers) { case .success(()): if result.batches.count > 0 { batches = result.batches