diff --git a/arro3-core/python/arro3/core/_rust.pyi b/arro3-core/python/arro3/core/_rust.pyi index 02f945f..9238984 100644 --- a/arro3-core/python/arro3/core/_rust.pyi +++ b/arro3-core/python/arro3/core/_rust.pyi @@ -10,6 +10,8 @@ class Array: def __len__(self) -> bool: ... @classmethod def from_arrow(cls, input: ArrowArrayExportable) -> Self: ... + @classmethod + def from_arrow_pycapsule(cls, schema_capsule, array_capsule) -> Self: ... def to_numpy(self) -> NDArray: ... class ChunkedArray: @@ -19,6 +21,8 @@ class ChunkedArray: def __len__(self) -> bool: ... @classmethod def from_arrow(cls, input: ArrowStreamExportable) -> Self: ... + @classmethod + def from_arrow_pycapsule(cls, capsule) -> Self: ... def to_numpy(self) -> NDArray: ... class Field: @@ -26,17 +30,23 @@ class Field: def __eq__(self) -> bool: ... @classmethod def from_arrow(cls, input: ArrowSchemaExportable) -> Self: ... + @classmethod + def from_arrow_pycapsule(cls, capsule) -> Self: ... class RecordBatch: def __arrow_c_array__(self, requested_schema) -> object: ... def __eq__(self) -> bool: ... @classmethod def from_arrow(cls, input: ArrowArrayExportable) -> Self: ... + @classmethod + def from_arrow_pycapsule(cls, schema_capsule, array_capsule) -> Self: ... class RecordBatchReader: def __arrow_c_stream__(self, requested_schema) -> object: ... @classmethod def from_arrow(cls, input: ArrowStreamExportable) -> Self: ... + @classmethod + def from_arrow_pycapsule(cls, capsule) -> Self: ... def schema(self) -> Schema: ... class Schema: @@ -44,6 +54,8 @@ class Schema: def __eq__(self) -> bool: ... @classmethod def from_arrow(cls, input: ArrowSchemaExportable) -> Self: ... + @classmethod + def from_arrow_pycapsule(cls, capsule) -> Self: ... class Table: def __arrow_c_stream__(self, requested_schema) -> object: ... @@ -51,4 +63,6 @@ class Table: def __len__(self) -> bool: ... @classmethod def from_arrow(cls, input: ArrowStreamExportable) -> Self: ... + @classmethod + def from_arrow_pycapsule(cls, capsule) -> Self: ... def schema(self) -> Schema: ... diff --git a/arro3-core/src/array.rs b/arro3-core/src/array.rs index 345b61b..4bcf723 100644 --- a/arro3-core/src/array.rs +++ b/arro3-core/src/array.rs @@ -1,4 +1,5 @@ use std::ffi::CString; +use std::sync::Arc; use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; use arrow_array::ArrayRef; @@ -7,6 +8,7 @@ use pyo3::prelude::*; use pyo3::types::{PyCapsule, PyTuple, PyType}; use crate::error::PyArrowResult; +use crate::ffi::from_python::utils::import_array_pycapsules; use crate::interop::numpy::to_numpy::to_numpy; #[pyclass(module = "arro3.core._rust", name = "Array", subclass)] @@ -73,6 +75,17 @@ impl PyArray { input.extract() } + /// Construct this object from a bare Arrow PyCapsule + #[classmethod] + pub fn from_arrow_pycapsule( + _cls: &PyType, + schema_capsule: &PyCapsule, + array_capsule: &PyCapsule, + ) -> PyResult { + let (array, field) = import_array_pycapsules(schema_capsule, array_capsule)?; + Ok(Self::new(array, Arc::new(field))) + } + /// Copy this array to a `numpy` NDArray pub fn to_numpy(&self, py: Python) -> PyResult { self.__array__(py) diff --git a/arro3-core/src/chunked.rs b/arro3-core/src/chunked.rs index b85f0a5..30366f6 100644 --- a/arro3-core/src/chunked.rs +++ b/arro3-core/src/chunked.rs @@ -3,9 +3,12 @@ use std::sync::Arc; use arrow_array::Array; use arrow_schema::FieldRef; +use pyo3::exceptions::{PyTypeError, PyValueError}; use pyo3::prelude::*; -use pyo3::types::PyCapsule; +use pyo3::types::{PyCapsule, PyType}; +use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader; +use crate::ffi::from_python::utils::import_stream_pycapsule; use crate::ffi::to_python::chunked::ArrayIterator; use crate::ffi::to_python::ffi_stream::new_stream; use crate::interop::numpy::to_numpy::chunked_to_numpy; @@ -72,4 +75,28 @@ impl PyChunkedArray { pub fn to_numpy(&self, py: Python) -> PyResult { self.__array__(py) } + + #[classmethod] + pub fn from_arrow(_cls: &PyType, input: &PyAny) -> PyResult { + input.extract() + } + + /// Construct this object from a bare Arrow PyCapsule + #[classmethod] + pub fn from_arrow_pycapsule(_cls: &PyType, capsule: &PyCapsule) -> PyResult { + let stream = import_stream_pycapsule(capsule)?; + + let stream_reader = ArrowArrayStreamReader::try_new(stream) + .map_err(|err| PyValueError::new_err(err.to_string()))?; + + let field = stream_reader.field(); + + let mut chunks = vec![]; + for array in stream_reader { + let array = array.map_err(|err| PyTypeError::new_err(err.to_string()))?; + chunks.push(array); + } + + Ok(PyChunkedArray::new(chunks, field)) + } } diff --git a/arro3-core/src/ffi/from_python/array.rs b/arro3-core/src/ffi/from_python/array.rs index 1ce6f02..c47da33 100644 --- a/arro3-core/src/ffi/from_python/array.rs +++ b/arro3-core/src/ffi/from_python/array.rs @@ -1,13 +1,13 @@ -use std::sync::Arc; - use crate::array::*; -use crate::ffi::from_python::utils::import_arrow_c_array; +use crate::ffi::from_python::utils::call_arrow_c_array; use pyo3::prelude::*; use pyo3::{PyAny, PyResult}; impl<'a> FromPyObject<'a> for PyArray { fn extract(ob: &'a PyAny) -> PyResult { - let (array, field) = import_arrow_c_array(ob)?; - Ok(PyArray::new(array, Arc::new(field))) + let (schema_capsule, array_capsule) = call_arrow_c_array(ob)?; + Python::with_gil(|py| { + Self::from_arrow_pycapsule(py.get_type::(), schema_capsule, array_capsule) + }) } } diff --git a/arro3-core/src/ffi/from_python/chunked.rs b/arro3-core/src/ffi/from_python/chunked.rs index 6bdf0d3..7493068 100644 --- a/arro3-core/src/ffi/from_python/chunked.rs +++ b/arro3-core/src/ffi/from_python/chunked.rs @@ -1,24 +1,11 @@ use crate::chunked::PyChunkedArray; -use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader; -use crate::ffi::from_python::utils::import_arrow_c_stream; -use pyo3::exceptions::{PyTypeError, PyValueError}; +use crate::ffi::from_python::utils::call_arrow_c_stream; use pyo3::prelude::*; use pyo3::{PyAny, PyResult}; impl<'a> FromPyObject<'a> for PyChunkedArray { fn extract(ob: &'a PyAny) -> PyResult { - let stream = import_arrow_c_stream(ob)?; - let stream_reader = ArrowArrayStreamReader::try_new(stream) - .map_err(|err| PyValueError::new_err(err.to_string()))?; - - let field = stream_reader.field(); - - let mut chunks = vec![]; - for array in stream_reader { - let array = array.map_err(|err| PyTypeError::new_err(err.to_string()))?; - chunks.push(array); - } - - Ok(PyChunkedArray::new(chunks, field)) + let capsule = call_arrow_c_stream(ob)?; + Python::with_gil(|py| Self::from_arrow_pycapsule(py.get_type::(), capsule)) } } diff --git a/arro3-core/src/ffi/from_python/field.rs b/arro3-core/src/ffi/from_python/field.rs index 0387b36..91ad827 100644 --- a/arro3-core/src/ffi/from_python/field.rs +++ b/arro3-core/src/ffi/from_python/field.rs @@ -1,17 +1,11 @@ -use std::sync::Arc; - -use crate::ffi::from_python::utils::import_arrow_c_schema; +use crate::ffi::from_python::utils::call_arrow_c_schema; use crate::field::PyField; -use arrow_schema::Field; -use pyo3::exceptions::PyTypeError; use pyo3::prelude::*; use pyo3::{PyAny, PyResult}; impl<'a> FromPyObject<'a> for PyField { fn extract(ob: &'a PyAny) -> PyResult { - let schema_ptr = import_arrow_c_schema(ob)?; - let field = - Field::try_from(schema_ptr).map_err(|err| PyTypeError::new_err(err.to_string()))?; - Ok(Self::new(Arc::new(field))) + let capsule = call_arrow_c_schema(ob)?; + Python::with_gil(|py| Self::from_arrow_pycapsule(py.get_type::(), capsule)) } } diff --git a/arro3-core/src/ffi/from_python/record_batch.rs b/arro3-core/src/ffi/from_python/record_batch.rs index 72467c0..12b0106 100644 --- a/arro3-core/src/ffi/from_python/record_batch.rs +++ b/arro3-core/src/ffi/from_python/record_batch.rs @@ -1,39 +1,17 @@ -use std::sync::Arc; - -use crate::ffi::from_python::utils::import_arrow_c_array; +use crate::ffi::from_python::utils::call_arrow_c_array; use crate::record_batch::PyRecordBatch; -use arrow::array::AsArray; -use arrow::datatypes::{DataType, SchemaBuilder}; -use arrow_array::Array; -use arrow_array::RecordBatch; -use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::{PyAny, PyResult}; impl<'a> FromPyObject<'a> for PyRecordBatch { fn extract(ob: &'a PyAny) -> PyResult { - let (array, field) = import_arrow_c_array(ob)?; - match field.data_type() { - DataType::Struct(fields) => { - let struct_array = array.as_struct(); - let schema = SchemaBuilder::from(fields) - .finish() - .with_metadata(field.metadata().clone()); - assert_eq!( - struct_array.null_count(), - 0, - "Cannot convert nullable StructArray to RecordBatch" - ); - - let columns = struct_array.columns().to_vec(); - let batch = RecordBatch::try_new(Arc::new(schema), columns) - .map_err(|err| PyValueError::new_err(err.to_string()))?; - Ok(Self::new(batch)) - } - dt => Err(PyValueError::new_err(format!( - "Unexpected data type {}", - dt - ))), - } + let (schema_capsule, array_capsule) = call_arrow_c_array(ob)?; + Python::with_gil(|py| { + Self::from_arrow_pycapsule( + py.get_type::(), + schema_capsule, + array_capsule, + ) + }) } } diff --git a/arro3-core/src/ffi/from_python/record_batch_reader.rs b/arro3-core/src/ffi/from_python/record_batch_reader.rs index e0f1617..09a7a33 100644 --- a/arro3-core/src/ffi/from_python/record_batch_reader.rs +++ b/arro3-core/src/ffi/from_python/record_batch_reader.rs @@ -1,15 +1,13 @@ -use crate::ffi::from_python::utils::import_arrow_c_stream; +use crate::ffi::from_python::utils::call_arrow_c_stream; use crate::record_batch_reader::PyRecordBatchReader; -use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::{PyAny, PyResult}; impl<'a> FromPyObject<'a> for PyRecordBatchReader { fn extract(ob: &'a PyAny) -> PyResult { - let stream = import_arrow_c_stream(ob)?; - let stream_reader = arrow::ffi_stream::ArrowArrayStreamReader::try_new(stream) - .map_err(|err| PyValueError::new_err(err.to_string()))?; - - Ok(Self(Some(Box::new(stream_reader)))) + let capsule = call_arrow_c_stream(ob)?; + Python::with_gil(|py| { + Self::from_arrow_pycapsule(py.get_type::(), capsule) + }) } } diff --git a/arro3-core/src/ffi/from_python/schema.rs b/arro3-core/src/ffi/from_python/schema.rs index 9a93964..df3df99 100644 --- a/arro3-core/src/ffi/from_python/schema.rs +++ b/arro3-core/src/ffi/from_python/schema.rs @@ -1,17 +1,11 @@ -use std::sync::Arc; - -use crate::ffi::from_python::utils::import_arrow_c_schema; +use crate::ffi::from_python::utils::call_arrow_c_schema; use crate::schema::PySchema; -use arrow_schema::Schema; -use pyo3::exceptions::PyTypeError; use pyo3::prelude::*; use pyo3::{PyAny, PyResult}; impl<'a> FromPyObject<'a> for PySchema { fn extract(ob: &'a PyAny) -> PyResult { - let schema_ptr = import_arrow_c_schema(ob)?; - let schema = - Schema::try_from(schema_ptr).map_err(|err| PyTypeError::new_err(err.to_string()))?; - Ok(Self::new(Arc::new(schema))) + let schema_ptr = call_arrow_c_schema(ob)?; + Python::with_gil(|py| Self::from_arrow_pycapsule(py.get_type::(), schema_ptr)) } } diff --git a/arro3-core/src/ffi/from_python/table.rs b/arro3-core/src/ffi/from_python/table.rs index e8bd81a..e1701b3 100644 --- a/arro3-core/src/ffi/from_python/table.rs +++ b/arro3-core/src/ffi/from_python/table.rs @@ -1,24 +1,11 @@ -use crate::ffi::from_python::utils::import_arrow_c_stream; +use crate::ffi::from_python::utils::call_arrow_c_stream; use crate::table::PyTable; -use arrow::ffi_stream::ArrowArrayStreamReader as ArrowRecordBatchStreamReader; -use arrow_array::RecordBatchReader; -use pyo3::exceptions::{PyTypeError, PyValueError}; use pyo3::prelude::*; use pyo3::{PyAny, PyResult}; impl<'a> FromPyObject<'a> for PyTable { fn extract(ob: &'a PyAny) -> PyResult { - let stream = import_arrow_c_stream(ob)?; - let stream_reader = ArrowRecordBatchStreamReader::try_new(stream) - .map_err(|err| PyValueError::new_err(err.to_string()))?; - let schema = stream_reader.schema(); - - let mut batches = vec![]; - for batch in stream_reader { - let batch = batch.map_err(|err| PyTypeError::new_err(err.to_string()))?; - batches.push(batch); - } - - Ok(PyTable::new(schema, batches)) + let capsule = call_arrow_c_stream(ob)?; + Python::with_gil(|py| Self::from_arrow_pycapsule(py.get_type::(), capsule)) } } diff --git a/arro3-core/src/ffi/from_python/utils.rs b/arro3-core/src/ffi/from_python/utils.rs index cb34911..8bc22cb 100644 --- a/arro3-core/src/ffi/from_python/utils.rs +++ b/arro3-core/src/ffi/from_python/utils.rs @@ -28,14 +28,19 @@ pub fn validate_pycapsule_name(capsule: &PyCapsule, expected_name: &str) -> PyRe } /// Import `__arrow_c_schema__` across Python boundary -pub(crate) fn import_arrow_c_schema(ob: &PyAny) -> PyResult<&FFI_ArrowSchema> { +pub(crate) fn call_arrow_c_schema(ob: &PyAny) -> PyResult<&PyCapsule> { if !ob.hasattr("__arrow_c_schema__")? { return Err(PyValueError::new_err( "Expected an object with dunder __arrow_c_schema__", )); } - let capsule: &PyCapsule = PyTryInto::try_into(ob.getattr("__arrow_c_schema__")?.call0()?)?; + Ok(PyTryInto::try_into( + ob.getattr("__arrow_c_schema__")?.call0()?, + )?) +} + +pub(crate) fn import_schema_pycapsule(capsule: &PyCapsule) -> PyResult<&FFI_ArrowSchema> { validate_pycapsule_name(capsule, "arrow_schema")?; let schema_ptr = unsafe { capsule.reference::() }; @@ -43,7 +48,7 @@ pub(crate) fn import_arrow_c_schema(ob: &PyAny) -> PyResult<&FFI_ArrowSchema> { } /// Import `__arrow_c_array__` across Python boundary -pub(crate) fn import_arrow_c_array(ob: &PyAny) -> PyResult<(ArrayRef, Field)> { +pub(crate) fn call_arrow_c_array(ob: &PyAny) -> PyResult<(&PyCapsule, &PyCapsule)> { if !ob.hasattr("__arrow_c_array__")? { return Err(PyValueError::new_err( "Expected an object with dunder __arrow_c_array__", @@ -57,9 +62,15 @@ pub(crate) fn import_arrow_c_array(ob: &PyAny) -> PyResult<(ArrayRef, Field)> { )); } - let schema_capsule: &PyCapsule = PyTryInto::try_into(tuple.get_item(0)?)?; - let array_capsule: &PyCapsule = PyTryInto::try_into(tuple.get_item(1)?)?; + let schema_capsule = PyTryInto::try_into(tuple.get_item(0)?)?; + let array_capsule = PyTryInto::try_into(tuple.get_item(1)?)?; + Ok((schema_capsule, array_capsule)) +} +pub(crate) fn import_array_pycapsules( + schema_capsule: &PyCapsule, + array_capsule: &PyCapsule, +) -> PyResult<(ArrayRef, Field)> { validate_pycapsule_name(schema_capsule, "arrow_schema")?; validate_pycapsule_name(array_capsule, "arrow_array")?; @@ -73,14 +84,18 @@ pub(crate) fn import_arrow_c_array(ob: &PyAny) -> PyResult<(ArrayRef, Field)> { } /// Import `__arrow_c_stream__` across Python boundary. -pub(crate) fn import_arrow_c_stream(ob: &PyAny) -> PyResult { +pub(crate) fn call_arrow_c_stream(ob: &PyAny) -> PyResult<&PyCapsule> { if !ob.hasattr("__arrow_c_stream__")? { return Err(PyValueError::new_err( "Expected an object with dunder __arrow_c_stream__", )); } - let capsule: &PyCapsule = PyTryInto::try_into(ob.getattr("__arrow_c_stream__")?.call0()?)?; + let capsule = PyTryInto::try_into(ob.getattr("__arrow_c_stream__")?.call0()?)?; + Ok(capsule) +} + +pub(crate) fn import_stream_pycapsule(capsule: &PyCapsule) -> PyResult { validate_pycapsule_name(capsule, "arrow_array_stream")?; let stream = unsafe { FFI_ArrowArrayStream::from_raw(capsule.pointer() as _) }; diff --git a/arro3-core/src/ffi/to_python/array.rs b/arro3-core/src/ffi/to_python/array.rs new file mode 100644 index 0000000..e69de29 diff --git a/arro3-core/src/ffi/to_python/mod.rs b/arro3-core/src/ffi/to_python/mod.rs index 99698d8..58021d6 100644 --- a/arro3-core/src/ffi/to_python/mod.rs +++ b/arro3-core/src/ffi/to_python/mod.rs @@ -1,2 +1,5 @@ +pub mod array; pub mod chunked; pub mod ffi_stream; +pub mod schema; +pub mod stream; diff --git a/arro3-core/src/ffi/to_python/schema.rs b/arro3-core/src/ffi/to_python/schema.rs new file mode 100644 index 0000000..ea18a11 --- /dev/null +++ b/arro3-core/src/ffi/to_python/schema.rs @@ -0,0 +1,31 @@ +use std::ffi::CString; + +use arrow::ffi::FFI_ArrowSchema; +use pyo3::types::PyCapsule; +use pyo3::Python; + +use crate::error::PyArrowResult; +use crate::field::PyField; +use crate::schema::PySchema; + +pub trait ToSchemaPyCapsule { + fn to_py_capsule<'py>(&'py self, py: Python<'py>) -> PyArrowResult<&'py PyCapsule>; +} + +impl ToSchemaPyCapsule for PySchema { + fn to_py_capsule<'py>(&'py self, py: Python<'py>) -> PyArrowResult<&'py PyCapsule> { + let ffi_schema = FFI_ArrowSchema::try_from(self.as_ref())?; + let schema_capsule_name = CString::new("arrow_schema").unwrap(); + let schema_capsule = PyCapsule::new(py, ffi_schema, Some(schema_capsule_name))?; + Ok(schema_capsule) + } +} + +impl ToSchemaPyCapsule for PyField { + fn to_py_capsule<'py>(&'py self, py: Python<'py>) -> PyArrowResult<&'py PyCapsule> { + let ffi_schema = FFI_ArrowSchema::try_from(self.as_ref())?; + let schema_capsule_name = CString::new("arrow_schema").unwrap(); + let schema_capsule = PyCapsule::new(py, ffi_schema, Some(schema_capsule_name))?; + Ok(schema_capsule) + } +} diff --git a/arro3-core/src/ffi/to_python/stream.rs b/arro3-core/src/ffi/to_python/stream.rs new file mode 100644 index 0000000..e69de29 diff --git a/arro3-core/src/field.rs b/arro3-core/src/field.rs index ef8e217..bd1f59d 100644 --- a/arro3-core/src/field.rs +++ b/arro3-core/src/field.rs @@ -1,11 +1,14 @@ use std::ffi::CString; +use std::sync::Arc; use arrow::ffi::FFI_ArrowSchema; -use arrow_schema::FieldRef; +use arrow_schema::{Field, FieldRef}; +use pyo3::exceptions::PyTypeError; use pyo3::prelude::*; -use pyo3::types::PyCapsule; +use pyo3::types::{PyCapsule, PyType}; use crate::error::PyArrowResult; +use crate::ffi::from_python::utils::import_schema_pycapsule; #[pyclass(module = "arro3.core._rust", name = "Field", subclass)] pub struct PyField(FieldRef); @@ -28,6 +31,12 @@ impl From for PyField { } } +impl AsRef for PyField { + fn as_ref(&self) -> &Field { + &self.0 + } +} + #[pymethods] impl PyField { /// An implementation of the [Arrow PyCapsule @@ -50,4 +59,18 @@ impl PyField { pub fn __eq__(&self, other: &PyField) -> bool { self.0 == other.0 } + + #[classmethod] + pub fn from_arrow(_cls: &PyType, input: &PyAny) -> PyResult { + input.extract() + } + + /// Construct this object from a bare Arrow PyCapsule + #[classmethod] + pub fn from_arrow_pycapsule(_cls: &PyType, capsule: &PyCapsule) -> PyResult { + let schema_ptr = import_schema_pycapsule(capsule)?; + let field = + Field::try_from(schema_ptr).map_err(|err| PyTypeError::new_err(err.to_string()))?; + Ok(Self::new(Arc::new(field))) + } } diff --git a/arro3-core/src/record_batch.rs b/arro3-core/src/record_batch.rs index d2acddb..63a8889 100644 --- a/arro3-core/src/record_batch.rs +++ b/arro3-core/src/record_batch.rs @@ -1,11 +1,16 @@ use std::ffi::CString; +use std::sync::Arc; +use arrow::array::AsArray; use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; use arrow_array::{Array, RecordBatch, StructArray}; +use arrow_schema::{DataType, SchemaBuilder}; +use pyo3::exceptions::PyValueError; use pyo3::prelude::*; -use pyo3::types::{PyCapsule, PyTuple}; +use pyo3::types::{PyCapsule, PyTuple, PyType}; use crate::error::PyArrowResult; +use crate::ffi::from_python::utils::import_array_pycapsules; #[pyclass(module = "arro3.core._rust", name = "RecordBatch", subclass)] #[derive(Debug)] @@ -60,4 +65,48 @@ impl PyRecordBatch { pub fn __eq__(&self, other: &PyRecordBatch) -> bool { self.0 == other.0 } + + /// Construct this object from existing Arrow data + /// + /// Args: + /// input: Arrow array to use for constructing this object + /// + /// Returns: + /// Self + #[classmethod] + pub fn from_arrow(_cls: &PyType, input: &PyAny) -> PyResult { + input.extract() + } + + /// Construct this object from a bare Arrow PyCapsule + #[classmethod] + pub fn from_arrow_pycapsule( + _cls: &PyType, + schema_capsule: &PyCapsule, + array_capsule: &PyCapsule, + ) -> PyResult { + let (array, field) = import_array_pycapsules(schema_capsule, array_capsule)?; + match field.data_type() { + DataType::Struct(fields) => { + let struct_array = array.as_struct(); + let schema = SchemaBuilder::from(fields) + .finish() + .with_metadata(field.metadata().clone()); + assert_eq!( + struct_array.null_count(), + 0, + "Cannot convert nullable StructArray to RecordBatch" + ); + + let columns = struct_array.columns().to_vec(); + let batch = RecordBatch::try_new(Arc::new(schema), columns) + .map_err(|err| PyValueError::new_err(err.to_string()))?; + Ok(Self::new(batch)) + } + dt => Err(PyValueError::new_err(format!( + "Unexpected data type {}", + dt + ))), + } + } } diff --git a/arro3-core/src/record_batch_reader.rs b/arro3-core/src/record_batch_reader.rs index 336a277..32509fd 100644 --- a/arro3-core/src/record_batch_reader.rs +++ b/arro3-core/src/record_batch_reader.rs @@ -2,11 +2,12 @@ use std::ffi::CString; use arrow::ffi_stream::FFI_ArrowArrayStream; use arrow_array::RecordBatchReader; -use pyo3::exceptions::PyIOError; +use pyo3::exceptions::{PyIOError, PyValueError}; use pyo3::prelude::*; -use pyo3::types::PyCapsule; +use pyo3::types::{PyCapsule, PyType}; use crate::error::PyArrowResult; +use crate::ffi::from_python::utils::import_stream_pycapsule; /// A wrapper around an [arrow_array::RecordBatchReader] #[pyclass(module = "arro3.core._rust", name = "RecordBatchReader", subclass)] @@ -48,4 +49,19 @@ impl PyRecordBatchReader { Ok(stream_capsule.to_object(py)) }) } + + #[classmethod] + pub fn from_arrow(_cls: &PyType, input: &PyAny) -> PyResult { + input.extract() + } + + /// Construct this object from a bare Arrow PyCapsule + #[classmethod] + pub fn from_arrow_pycapsule(_cls: &PyType, capsule: &PyCapsule) -> PyResult { + let stream = import_stream_pycapsule(capsule)?; + let stream_reader = arrow::ffi_stream::ArrowArrayStreamReader::try_new(stream) + .map_err(|err| PyValueError::new_err(err.to_string()))?; + + Ok(Self(Some(Box::new(stream_reader)))) + } } diff --git a/arro3-core/src/schema.rs b/arro3-core/src/schema.rs index 8584e40..1745999 100644 --- a/arro3-core/src/schema.rs +++ b/arro3-core/src/schema.rs @@ -1,11 +1,13 @@ -use std::ffi::CString; +use std::sync::Arc; -use arrow::ffi::FFI_ArrowSchema; -use arrow_schema::SchemaRef; +use arrow_schema::{Schema, SchemaRef}; +use pyo3::exceptions::PyTypeError; use pyo3::prelude::*; use pyo3::types::{PyCapsule, PyType}; use crate::error::PyArrowResult; +use crate::ffi::from_python::utils::import_schema_pycapsule; +use crate::ffi::to_python::schema::ToSchemaPyCapsule; #[pyclass(module = "arro3.core._rust", name = "Schema", subclass)] pub struct PySchema(SchemaRef); @@ -28,6 +30,12 @@ impl From for PySchema { } } +impl AsRef for PySchema { + fn as_ref(&self) -> &Schema { + &self.0 + } +} + #[pymethods] impl PySchema { /// An implementation of the [Arrow PyCapsule @@ -37,14 +45,8 @@ impl PySchema { /// /// For example, you can call [`pyarrow.schema()`][pyarrow.schema] to convert this array /// into a pyarrow schema, without copying memory. - fn __arrow_c_schema__(&self) -> PyArrowResult { - let ffi_schema = FFI_ArrowSchema::try_from(self.0.as_ref())?; - let schema_capsule_name = CString::new("arrow_schema").unwrap(); - - Python::with_gil(|py| { - let schema_capsule = PyCapsule::new(py, ffi_schema, Some(schema_capsule_name))?; - Ok(schema_capsule.to_object(py)) - }) + fn __arrow_c_schema__(&self, py: Python) -> PyArrowResult { + Ok(self.to_py_capsule(py)?.to_object(py)) } /// Construct this object from existing Arrow data @@ -59,6 +61,15 @@ impl PySchema { input.extract() } + /// Construct this object from a bare Arrow PyCapsule + #[classmethod] + pub fn from_arrow_pycapsule(_cls: &PyType, capsule: &PyCapsule) -> PyResult { + let schema_ptr = import_schema_pycapsule(capsule)?; + let schema = + Schema::try_from(schema_ptr).map_err(|err| PyTypeError::new_err(err.to_string()))?; + Ok(Self::new(Arc::new(schema))) + } + pub fn __eq__(&self, other: &PySchema) -> bool { self.0 == other.0 } diff --git a/arro3-core/src/table.rs b/arro3-core/src/table.rs index aebf99b..f9de788 100644 --- a/arro3-core/src/table.rs +++ b/arro3-core/src/table.rs @@ -1,24 +1,32 @@ use std::ffi::CString; +use arrow::ffi_stream::ArrowArrayStreamReader as ArrowRecordBatchStreamReader; use arrow::ffi_stream::FFI_ArrowArrayStream; +use arrow_array::RecordBatchReader; use arrow_array::{RecordBatch, RecordBatchIterator}; use arrow_schema::SchemaRef; +use pyo3::exceptions::{PyTypeError, PyValueError}; use pyo3::prelude::*; -use pyo3::types::PyCapsule; +use pyo3::types::{PyCapsule, PyType}; use crate::error::PyArrowResult; +use crate::ffi::from_python::utils::import_stream_pycapsule; #[pyclass(module = "arro3.core._rust", name = "Table", subclass)] #[derive(Debug)] pub struct PyTable { - schema: SchemaRef, batches: Vec, + schema: SchemaRef, } impl PyTable { pub fn new(schema: SchemaRef, batches: Vec) -> Self { Self { schema, batches } } + + pub fn into_inner(self) -> (Vec, SchemaRef) { + (self.batches, self.schema) + } } #[pymethods] @@ -55,4 +63,33 @@ impl PyTable { pub fn __len__(&self) -> usize { self.batches.iter().fold(0, |acc, x| acc + x.num_rows()) } + + /// Construct this object from existing Arrow data + /// + /// Args: + /// input: Arrow array to use for constructing this object + /// + /// Returns: + /// Self + #[classmethod] + pub fn from_arrow(_cls: &PyType, input: &PyAny) -> PyResult { + input.extract() + } + + /// Construct this object from a bare Arrow PyCapsule + #[classmethod] + pub fn from_arrow_pycapsule(_cls: &PyType, capsule: &PyCapsule) -> PyResult { + let stream = import_stream_pycapsule(capsule)?; + let stream_reader = ArrowRecordBatchStreamReader::try_new(stream) + .map_err(|err| PyValueError::new_err(err.to_string()))?; + let schema = stream_reader.schema(); + + let mut batches = vec![]; + for batch in stream_reader { + let batch = batch.map_err(|err| PyTypeError::new_err(err.to_string()))?; + batches.push(batch); + } + + Ok(Self::new(schema, batches)) + } }