From 91bf1c9c170c1917ad47bb0dbb38aa5c9fbbbfb2 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 14 Feb 2024 04:55:27 -0400 Subject: [PATCH] GH-39984: [Python] Add ChunkedArray import/export to/from C (#39985) ### Rationale for this change ChunkedArrays have an unambiguous representation as a stream of arrays. #39455 added the ability to import/export in C++...this PR wires up the new functions in pyarrow. ### What changes are included in this PR? - Added `__arrow_c_stream__()` and `_import_from_c_capsule()` to the `ChunkedArray` ### Are these changes tested? Yes! Tests were added. ### Are there any user-facing changes? Yes! But I'm not sure where the protocol methods are documented. ```python import pyarrow as pa import nanoarrow as na chunked = pa.chunked_array([pa.array([0, 1, 2]), pa.array([3, 4, 5])]) [na.c_array_view(item) for item in na.c_array_stream(chunked)] ``` [ - storage_type: 'int64' - length: 3 - offset: 0 - null_count: 0 - buffers[2]: - - - dictionary: NULL - children[0]:, - storage_type: 'int64' - length: 3 - offset: 0 - null_count: 0 - buffers[2]: - - - dictionary: NULL - children[0]:] ```python stream_capsule = chunked.__arrow_c_stream__() chunked2 = chunked._import_from_c_capsule(stream_capsule) chunked2 ``` [ [ 0, 1, 2 ], [ 3, 4, 5 ] ] * Closes: #39984 Lead-authored-by: Dewey Dunnington Co-authored-by: Dewey Dunnington Signed-off-by: Antoine Pitrou --- python/pyarrow/includes/libarrow.pxd | 3 ++ python/pyarrow/table.pxi | 61 ++++++++++++++++++++++++++++ python/pyarrow/tests/test_cffi.py | 26 ++++++++++++ 3 files changed, 90 insertions(+) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 8056d99354965..935fb4d34b318 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2930,6 +2930,9 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil: CResult[shared_ptr[CRecordBatchReader]] ImportRecordBatchReader( ArrowArrayStream*) + CStatus ExportChunkedArray(shared_ptr[CChunkedArray], ArrowArrayStream*) + CResult[shared_ptr[CChunkedArray]] ImportChunkedArray(ArrowArrayStream*) + cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil: CResult[int64_t] ReferencedBufferSize(const CArray& array_data) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index abda784fb7c18..ee3872aa3a242 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -1327,6 +1327,67 @@ cdef class ChunkedArray(_PandasConvertible): result += self.chunk(i).to_pylist() return result + def __arrow_c_stream__(self, requested_schema=None): + """ + Export to a C ArrowArrayStream PyCapsule. + + Parameters + ---------- + requested_schema : PyCapsule, default None + The schema to which the stream should be casted, passed as a + PyCapsule containing a C ArrowSchema representation of the + requested schema. + + Returns + ------- + PyCapsule + A capsule containing a C ArrowArrayStream struct. + """ + cdef: + ArrowArrayStream* c_stream = NULL + + if requested_schema is not None: + out_type = DataType._import_from_c_capsule(requested_schema) + if self.type != out_type: + raise NotImplementedError("Casting to requested_schema") + + stream_capsule = alloc_c_stream(&c_stream) + + with nogil: + check_status(ExportChunkedArray(self.sp_chunked_array, c_stream)) + + return stream_capsule + + @staticmethod + def _import_from_c_capsule(stream): + """ + Import ChunkedArray from a C ArrowArrayStream PyCapsule. + + Parameters + ---------- + stream: PyCapsule + A capsule containing a C ArrowArrayStream PyCapsule. + + Returns + ------- + ChunkedArray + """ + cdef: + ArrowArrayStream* c_stream + shared_ptr[CChunkedArray] c_chunked_array + ChunkedArray self + + c_stream = PyCapsule_GetPointer( + stream, 'arrow_array_stream' + ) + + with nogil: + c_chunked_array = GetResultValue(ImportChunkedArray(c_stream)) + + self = ChunkedArray.__new__(ChunkedArray) + self.init(c_chunked_array) + return self + def chunked_array(arrays, type=None): """ diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index ff81b06440f03..3a0c7b5b7152f 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -601,3 +601,29 @@ def test_roundtrip_batch_reader_capsule(): assert imported_reader.read_next_batch().equals(batch) with pytest.raises(StopIteration): imported_reader.read_next_batch() + + +def test_roundtrip_chunked_array_capsule(): + chunked = pa.chunked_array([pa.array(["a", "b", "c"])]) + + capsule = chunked.__arrow_c_stream__() + assert PyCapsule_IsValid(capsule, b"arrow_array_stream") == 1 + imported_chunked = pa.ChunkedArray._import_from_c_capsule(capsule) + assert imported_chunked.type == chunked.type + assert imported_chunked == chunked + + +def test_roundtrip_chunked_array_capsule_requested_schema(): + chunked = pa.chunked_array([pa.array(["a", "b", "c"])]) + + # Requesting the same type should work + requested_capsule = chunked.type.__arrow_c_schema__() + capsule = chunked.__arrow_c_stream__(requested_capsule) + imported_chunked = pa.ChunkedArray._import_from_c_capsule(capsule) + assert imported_chunked == chunked + + # Casting to something else should error + requested_type = pa.binary() + requested_capsule = requested_type.__arrow_c_schema__() + with pytest.raises(NotImplementedError): + chunked.__arrow_c_stream__(requested_capsule)