Skip to content

Commit

Permalink
apacheGH-39984: [Python] Add ChunkedArray import/export to/from C (ap…
Browse files Browse the repository at this point in the history
…ache#39985)

### Rationale for this change

ChunkedArrays have an unambiguous representation as a stream of arrays. apache#39455 added the ability to import/export in C++...this PR wires up the new functions in pyarrow.

### What changes are included in this PR?

- Added `__arrow_c_stream__()` and `_import_from_c_capsule()` to the `ChunkedArray`

### Are these changes tested?

Yes! Tests were added.

### Are there any user-facing changes?

Yes! But I'm not sure where the protocol methods are documented.

```python
import pyarrow as pa
import nanoarrow as na
chunked = pa.chunked_array([pa.array([0, 1, 2]), pa.array([3, 4, 5])])
[na.c_array_view(item) for item in na.c_array_stream(chunked)]
```

    [<nanoarrow.c_lib.CArrayView>
     - storage_type: 'int64'
     - length: 3
     - offset: 0
     - null_count: 0
     - buffers[2]:
       - <bool validity[0 b] >
       - <int64 data[24 b] 0 1 2>
     - dictionary: NULL
     - children[0]:,
     <nanoarrow.c_lib.CArrayView>
     - storage_type: 'int64'
     - length: 3
     - offset: 0
     - null_count: 0
     - buffers[2]:
       - <bool validity[0 b] >
       - <int64 data[24 b] 3 4 5>
     - dictionary: NULL
     - children[0]:]

```python
stream_capsule = chunked.__arrow_c_stream__()
chunked2 = chunked._import_from_c_capsule(stream_capsule)
chunked2
```

    <pyarrow.lib.ChunkedArray object at 0x105bb70b0>
    [
      [
        0,
        1,
        2
      ],
      [
        3,
        4,
        5
      ]
    ]

* Closes: apache#39984

Lead-authored-by: Dewey Dunnington <[email protected]>
Co-authored-by: Dewey Dunnington <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
paleolimbot and paleolimbot authored Feb 14, 2024
1 parent 967831b commit 91bf1c9
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 0 deletions.
3 changes: 3 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2930,6 +2930,9 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil:
CResult[shared_ptr[CRecordBatchReader]] ImportRecordBatchReader(
ArrowArrayStream*)

CStatus ExportChunkedArray(shared_ptr[CChunkedArray], ArrowArrayStream*)
CResult[shared_ptr[CChunkedArray]] ImportChunkedArray(ArrowArrayStream*)


cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil:
CResult[int64_t] ReferencedBufferSize(const CArray& array_data)
Expand Down
61 changes: 61 additions & 0 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,67 @@ cdef class ChunkedArray(_PandasConvertible):
result += self.chunk(i).to_pylist()
return result

def __arrow_c_stream__(self, requested_schema=None):
"""
Export to a C ArrowArrayStream PyCapsule.
Parameters
----------
requested_schema : PyCapsule, default None
The schema to which the stream should be casted, passed as a
PyCapsule containing a C ArrowSchema representation of the
requested schema.
Returns
-------
PyCapsule
A capsule containing a C ArrowArrayStream struct.
"""
cdef:
ArrowArrayStream* c_stream = NULL

if requested_schema is not None:
out_type = DataType._import_from_c_capsule(requested_schema)
if self.type != out_type:
raise NotImplementedError("Casting to requested_schema")

stream_capsule = alloc_c_stream(&c_stream)

with nogil:
check_status(ExportChunkedArray(self.sp_chunked_array, c_stream))

return stream_capsule

@staticmethod
def _import_from_c_capsule(stream):
"""
Import ChunkedArray from a C ArrowArrayStream PyCapsule.
Parameters
----------
stream: PyCapsule
A capsule containing a C ArrowArrayStream PyCapsule.
Returns
-------
ChunkedArray
"""
cdef:
ArrowArrayStream* c_stream
shared_ptr[CChunkedArray] c_chunked_array
ChunkedArray self

c_stream = <ArrowArrayStream*>PyCapsule_GetPointer(
stream, 'arrow_array_stream'
)

with nogil:
c_chunked_array = GetResultValue(ImportChunkedArray(c_stream))

self = ChunkedArray.__new__(ChunkedArray)
self.init(c_chunked_array)
return self


def chunked_array(arrays, type=None):
"""
Expand Down
26 changes: 26 additions & 0 deletions python/pyarrow/tests/test_cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,3 +601,29 @@ def test_roundtrip_batch_reader_capsule():
assert imported_reader.read_next_batch().equals(batch)
with pytest.raises(StopIteration):
imported_reader.read_next_batch()


def test_roundtrip_chunked_array_capsule():
chunked = pa.chunked_array([pa.array(["a", "b", "c"])])

capsule = chunked.__arrow_c_stream__()
assert PyCapsule_IsValid(capsule, b"arrow_array_stream") == 1
imported_chunked = pa.ChunkedArray._import_from_c_capsule(capsule)
assert imported_chunked.type == chunked.type
assert imported_chunked == chunked


def test_roundtrip_chunked_array_capsule_requested_schema():
chunked = pa.chunked_array([pa.array(["a", "b", "c"])])

# Requesting the same type should work
requested_capsule = chunked.type.__arrow_c_schema__()
capsule = chunked.__arrow_c_stream__(requested_capsule)
imported_chunked = pa.ChunkedArray._import_from_c_capsule(capsule)
assert imported_chunked == chunked

# Casting to something else should error
requested_type = pa.binary()
requested_capsule = requested_type.__arrow_c_schema__()
with pytest.raises(NotImplementedError):
chunked.__arrow_c_stream__(requested_capsule)

0 comments on commit 91bf1c9

Please sign in to comment.