Skip to content

Commit

Permalink
Add from_arrow_pycapsule method on each Python class (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron authored May 29, 2024
1 parent 949cf90 commit ee244ae
Show file tree
Hide file tree
Showing 20 changed files with 296 additions and 119 deletions.
14 changes: 14 additions & 0 deletions arro3-core/python/arro3/core/_rust.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class Array:
def __len__(self) -> bool: ...
@classmethod
def from_arrow(cls, input: ArrowArrayExportable) -> Self: ...
@classmethod
def from_arrow_pycapsule(cls, schema_capsule, array_capsule) -> Self: ...
def to_numpy(self) -> NDArray: ...

class ChunkedArray:
Expand All @@ -19,36 +21,48 @@ class ChunkedArray:
def __len__(self) -> bool: ...
@classmethod
def from_arrow(cls, input: ArrowStreamExportable) -> Self: ...
@classmethod
def from_arrow_pycapsule(cls, capsule) -> Self: ...
def to_numpy(self) -> NDArray: ...

class Field:
def __arrow_c_schema__(self) -> object: ...
def __eq__(self) -> bool: ...
@classmethod
def from_arrow(cls, input: ArrowSchemaExportable) -> Self: ...
@classmethod
def from_arrow_pycapsule(cls, capsule) -> Self: ...

class RecordBatch:
def __arrow_c_array__(self, requested_schema) -> object: ...
def __eq__(self) -> bool: ...
@classmethod
def from_arrow(cls, input: ArrowArrayExportable) -> Self: ...
@classmethod
def from_arrow_pycapsule(cls, schema_capsule, array_capsule) -> Self: ...

class RecordBatchReader:
def __arrow_c_stream__(self, requested_schema) -> object: ...
@classmethod
def from_arrow(cls, input: ArrowStreamExportable) -> Self: ...
@classmethod
def from_arrow_pycapsule(cls, capsule) -> Self: ...
def schema(self) -> Schema: ...

class Schema:
def __arrow_c_schema__(self) -> object: ...
def __eq__(self) -> bool: ...
@classmethod
def from_arrow(cls, input: ArrowSchemaExportable) -> Self: ...
@classmethod
def from_arrow_pycapsule(cls, capsule) -> Self: ...

class Table:
def __arrow_c_stream__(self, requested_schema) -> object: ...
def __eq__(self) -> bool: ...
def __len__(self) -> bool: ...
@classmethod
def from_arrow(cls, input: ArrowStreamExportable) -> Self: ...
@classmethod
def from_arrow_pycapsule(cls, capsule) -> Self: ...
def schema(self) -> Schema: ...
13 changes: 13 additions & 0 deletions arro3-core/src/array.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::ffi::CString;
use std::sync::Arc;

use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use arrow_array::ArrayRef;
Expand All @@ -7,6 +8,7 @@ use pyo3::prelude::*;
use pyo3::types::{PyCapsule, PyTuple, PyType};

use crate::error::PyArrowResult;
use crate::ffi::from_python::utils::import_array_pycapsules;
use crate::interop::numpy::to_numpy::to_numpy;

#[pyclass(module = "arro3.core._rust", name = "Array", subclass)]
Expand Down Expand Up @@ -73,6 +75,17 @@ impl PyArray {
input.extract()
}

/// Construct this object from a bare Arrow PyCapsule
#[classmethod]
pub fn from_arrow_pycapsule(
_cls: &PyType,
schema_capsule: &PyCapsule,
array_capsule: &PyCapsule,
) -> PyResult<Self> {
let (array, field) = import_array_pycapsules(schema_capsule, array_capsule)?;
Ok(Self::new(array, Arc::new(field)))
}

/// Copy this array to a `numpy` NDArray
pub fn to_numpy(&self, py: Python) -> PyResult<PyObject> {
self.__array__(py)
Expand Down
29 changes: 28 additions & 1 deletion arro3-core/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ use std::sync::Arc;

use arrow_array::Array;
use arrow_schema::FieldRef;
use pyo3::exceptions::{PyTypeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyCapsule;
use pyo3::types::{PyCapsule, PyType};

use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader;
use crate::ffi::from_python::utils::import_stream_pycapsule;
use crate::ffi::to_python::chunked::ArrayIterator;
use crate::ffi::to_python::ffi_stream::new_stream;
use crate::interop::numpy::to_numpy::chunked_to_numpy;
Expand Down Expand Up @@ -72,4 +75,28 @@ impl PyChunkedArray {
pub fn to_numpy(&self, py: Python) -> PyResult<PyObject> {
self.__array__(py)
}

#[classmethod]
pub fn from_arrow(_cls: &PyType, input: &PyAny) -> PyResult<Self> {
input.extract()
}

/// Construct this object from a bare Arrow PyCapsule
#[classmethod]
pub fn from_arrow_pycapsule(_cls: &PyType, capsule: &PyCapsule) -> PyResult<Self> {
let stream = import_stream_pycapsule(capsule)?;

let stream_reader = ArrowArrayStreamReader::try_new(stream)
.map_err(|err| PyValueError::new_err(err.to_string()))?;

let field = stream_reader.field();

let mut chunks = vec![];
for array in stream_reader {
let array = array.map_err(|err| PyTypeError::new_err(err.to_string()))?;
chunks.push(array);
}

Ok(PyChunkedArray::new(chunks, field))
}
}
10 changes: 5 additions & 5 deletions arro3-core/src/ffi/from_python/array.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::sync::Arc;

use crate::array::*;
use crate::ffi::from_python::utils::import_arrow_c_array;
use crate::ffi::from_python::utils::call_arrow_c_array;
use pyo3::prelude::*;
use pyo3::{PyAny, PyResult};

impl<'a> FromPyObject<'a> for PyArray {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
let (array, field) = import_arrow_c_array(ob)?;
Ok(PyArray::new(array, Arc::new(field)))
let (schema_capsule, array_capsule) = call_arrow_c_array(ob)?;
Python::with_gil(|py| {
Self::from_arrow_pycapsule(py.get_type::<PyArray>(), schema_capsule, array_capsule)
})
}
}
19 changes: 3 additions & 16 deletions arro3-core/src/ffi/from_python/chunked.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,11 @@
use crate::chunked::PyChunkedArray;
use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader;
use crate::ffi::from_python::utils::import_arrow_c_stream;
use pyo3::exceptions::{PyTypeError, PyValueError};
use crate::ffi::from_python::utils::call_arrow_c_stream;
use pyo3::prelude::*;
use pyo3::{PyAny, PyResult};

impl<'a> FromPyObject<'a> for PyChunkedArray {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
let stream = import_arrow_c_stream(ob)?;
let stream_reader = ArrowArrayStreamReader::try_new(stream)
.map_err(|err| PyValueError::new_err(err.to_string()))?;

let field = stream_reader.field();

let mut chunks = vec![];
for array in stream_reader {
let array = array.map_err(|err| PyTypeError::new_err(err.to_string()))?;
chunks.push(array);
}

Ok(PyChunkedArray::new(chunks, field))
let capsule = call_arrow_c_stream(ob)?;
Python::with_gil(|py| Self::from_arrow_pycapsule(py.get_type::<PyChunkedArray>(), capsule))
}
}
12 changes: 3 additions & 9 deletions arro3-core/src/ffi/from_python/field.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
use std::sync::Arc;

use crate::ffi::from_python::utils::import_arrow_c_schema;
use crate::ffi::from_python::utils::call_arrow_c_schema;
use crate::field::PyField;
use arrow_schema::Field;
use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use pyo3::{PyAny, PyResult};

impl<'a> FromPyObject<'a> for PyField {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
let schema_ptr = import_arrow_c_schema(ob)?;
let field =
Field::try_from(schema_ptr).map_err(|err| PyTypeError::new_err(err.to_string()))?;
Ok(Self::new(Arc::new(field)))
let capsule = call_arrow_c_schema(ob)?;
Python::with_gil(|py| Self::from_arrow_pycapsule(py.get_type::<PyField>(), capsule))
}
}
40 changes: 9 additions & 31 deletions arro3-core/src/ffi/from_python/record_batch.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,17 @@
use std::sync::Arc;

use crate::ffi::from_python::utils::import_arrow_c_array;
use crate::ffi::from_python::utils::call_arrow_c_array;
use crate::record_batch::PyRecordBatch;
use arrow::array::AsArray;
use arrow::datatypes::{DataType, SchemaBuilder};
use arrow_array::Array;
use arrow_array::RecordBatch;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{PyAny, PyResult};

impl<'a> FromPyObject<'a> for PyRecordBatch {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
let (array, field) = import_arrow_c_array(ob)?;
match field.data_type() {
DataType::Struct(fields) => {
let struct_array = array.as_struct();
let schema = SchemaBuilder::from(fields)
.finish()
.with_metadata(field.metadata().clone());
assert_eq!(
struct_array.null_count(),
0,
"Cannot convert nullable StructArray to RecordBatch"
);

let columns = struct_array.columns().to_vec();
let batch = RecordBatch::try_new(Arc::new(schema), columns)
.map_err(|err| PyValueError::new_err(err.to_string()))?;
Ok(Self::new(batch))
}
dt => Err(PyValueError::new_err(format!(
"Unexpected data type {}",
dt
))),
}
let (schema_capsule, array_capsule) = call_arrow_c_array(ob)?;
Python::with_gil(|py| {
Self::from_arrow_pycapsule(
py.get_type::<PyRecordBatch>(),
schema_capsule,
array_capsule,
)
})
}
}
12 changes: 5 additions & 7 deletions arro3-core/src/ffi/from_python/record_batch_reader.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use crate::ffi::from_python::utils::import_arrow_c_stream;
use crate::ffi::from_python::utils::call_arrow_c_stream;
use crate::record_batch_reader::PyRecordBatchReader;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{PyAny, PyResult};

impl<'a> FromPyObject<'a> for PyRecordBatchReader {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
let stream = import_arrow_c_stream(ob)?;
let stream_reader = arrow::ffi_stream::ArrowArrayStreamReader::try_new(stream)
.map_err(|err| PyValueError::new_err(err.to_string()))?;

Ok(Self(Some(Box::new(stream_reader))))
let capsule = call_arrow_c_stream(ob)?;
Python::with_gil(|py| {
Self::from_arrow_pycapsule(py.get_type::<PyRecordBatchReader>(), capsule)
})
}
}
12 changes: 3 additions & 9 deletions arro3-core/src/ffi/from_python/schema.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
use std::sync::Arc;

use crate::ffi::from_python::utils::import_arrow_c_schema;
use crate::ffi::from_python::utils::call_arrow_c_schema;
use crate::schema::PySchema;
use arrow_schema::Schema;
use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use pyo3::{PyAny, PyResult};

impl<'a> FromPyObject<'a> for PySchema {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
let schema_ptr = import_arrow_c_schema(ob)?;
let schema =
Schema::try_from(schema_ptr).map_err(|err| PyTypeError::new_err(err.to_string()))?;
Ok(Self::new(Arc::new(schema)))
let schema_ptr = call_arrow_c_schema(ob)?;
Python::with_gil(|py| Self::from_arrow_pycapsule(py.get_type::<PySchema>(), schema_ptr))
}
}
19 changes: 3 additions & 16 deletions arro3-core/src/ffi/from_python/table.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,11 @@
use crate::ffi::from_python::utils::import_arrow_c_stream;
use crate::ffi::from_python::utils::call_arrow_c_stream;
use crate::table::PyTable;
use arrow::ffi_stream::ArrowArrayStreamReader as ArrowRecordBatchStreamReader;
use arrow_array::RecordBatchReader;
use pyo3::exceptions::{PyTypeError, PyValueError};
use pyo3::prelude::*;
use pyo3::{PyAny, PyResult};

impl<'a> FromPyObject<'a> for PyTable {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
let stream = import_arrow_c_stream(ob)?;
let stream_reader = ArrowRecordBatchStreamReader::try_new(stream)
.map_err(|err| PyValueError::new_err(err.to_string()))?;
let schema = stream_reader.schema();

let mut batches = vec![];
for batch in stream_reader {
let batch = batch.map_err(|err| PyTypeError::new_err(err.to_string()))?;
batches.push(batch);
}

Ok(PyTable::new(schema, batches))
let capsule = call_arrow_c_stream(ob)?;
Python::with_gil(|py| Self::from_arrow_pycapsule(py.get_type::<PyTable>(), capsule))
}
}
29 changes: 22 additions & 7 deletions arro3-core/src/ffi/from_python/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,27 @@ pub fn validate_pycapsule_name(capsule: &PyCapsule, expected_name: &str) -> PyRe
}

/// Import `__arrow_c_schema__` across Python boundary
pub(crate) fn import_arrow_c_schema(ob: &PyAny) -> PyResult<&FFI_ArrowSchema> {
pub(crate) fn call_arrow_c_schema(ob: &PyAny) -> PyResult<&PyCapsule> {
if !ob.hasattr("__arrow_c_schema__")? {
return Err(PyValueError::new_err(
"Expected an object with dunder __arrow_c_schema__",
));
}

let capsule: &PyCapsule = PyTryInto::try_into(ob.getattr("__arrow_c_schema__")?.call0()?)?;
Ok(PyTryInto::try_into(
ob.getattr("__arrow_c_schema__")?.call0()?,
)?)
}

pub(crate) fn import_schema_pycapsule(capsule: &PyCapsule) -> PyResult<&FFI_ArrowSchema> {
validate_pycapsule_name(capsule, "arrow_schema")?;

let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
Ok(schema_ptr)
}

/// Import `__arrow_c_array__` across Python boundary
pub(crate) fn import_arrow_c_array(ob: &PyAny) -> PyResult<(ArrayRef, Field)> {
pub(crate) fn call_arrow_c_array(ob: &PyAny) -> PyResult<(&PyCapsule, &PyCapsule)> {
if !ob.hasattr("__arrow_c_array__")? {
return Err(PyValueError::new_err(
"Expected an object with dunder __arrow_c_array__",
Expand All @@ -57,9 +62,15 @@ pub(crate) fn import_arrow_c_array(ob: &PyAny) -> PyResult<(ArrayRef, Field)> {
));
}

let schema_capsule: &PyCapsule = PyTryInto::try_into(tuple.get_item(0)?)?;
let array_capsule: &PyCapsule = PyTryInto::try_into(tuple.get_item(1)?)?;
let schema_capsule = PyTryInto::try_into(tuple.get_item(0)?)?;
let array_capsule = PyTryInto::try_into(tuple.get_item(1)?)?;
Ok((schema_capsule, array_capsule))
}

pub(crate) fn import_array_pycapsules(
schema_capsule: &PyCapsule,
array_capsule: &PyCapsule,
) -> PyResult<(ArrayRef, Field)> {
validate_pycapsule_name(schema_capsule, "arrow_schema")?;
validate_pycapsule_name(array_capsule, "arrow_array")?;

Expand All @@ -73,14 +84,18 @@ pub(crate) fn import_arrow_c_array(ob: &PyAny) -> PyResult<(ArrayRef, Field)> {
}

/// Import `__arrow_c_stream__` across Python boundary.
pub(crate) fn import_arrow_c_stream(ob: &PyAny) -> PyResult<FFI_ArrowArrayStream> {
pub(crate) fn call_arrow_c_stream(ob: &PyAny) -> PyResult<&PyCapsule> {
if !ob.hasattr("__arrow_c_stream__")? {
return Err(PyValueError::new_err(
"Expected an object with dunder __arrow_c_stream__",
));
}

let capsule: &PyCapsule = PyTryInto::try_into(ob.getattr("__arrow_c_stream__")?.call0()?)?;
let capsule = PyTryInto::try_into(ob.getattr("__arrow_c_stream__")?.call0()?)?;
Ok(capsule)
}

pub(crate) fn import_stream_pycapsule(capsule: &PyCapsule) -> PyResult<FFI_ArrowArrayStream> {
validate_pycapsule_name(capsule, "arrow_array_stream")?;

let stream = unsafe { FFI_ArrowArrayStream::from_raw(capsule.pointer() as _) };
Expand Down
Empty file.
3 changes: 3 additions & 0 deletions arro3-core/src/ffi/to_python/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
pub mod array;
pub mod chunked;
pub mod ffi_stream;
pub mod schema;
pub mod stream;
Loading

0 comments on commit ee244ae

Please sign in to comment.