Skip to content

Commit

Permalink
Add RecordBatchReader
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed May 28, 2024
1 parent 0d5e672 commit 02f7d56
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 0 deletions.
1 change: 1 addition & 0 deletions arro3-core/src/ffi/from_python/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod chunked;
pub mod ffi_stream;
pub mod field;
pub mod record_batch;
pub mod record_batch_reader;
pub mod schema;
pub mod table;
pub mod utils;
15 changes: 15 additions & 0 deletions arro3-core/src/ffi/from_python/record_batch_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use crate::ffi::from_python::utils::import_arrow_c_stream;
use crate::record_batch_reader::PyRecordBatchReader;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::{PyAny, PyResult};

impl<'a> FromPyObject<'a> for PyRecordBatchReader {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
let stream = import_arrow_c_stream(ob)?;
let stream_reader = arrow::ffi_stream::ArrowArrayStreamReader::try_new(stream)
.map_err(|err| PyValueError::new_err(err.to_string()))?;

Ok(Self(Some(Box::new(stream_reader))))
}
}
2 changes: 2 additions & 0 deletions arro3-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod ffi;
pub mod field;
pub mod interop;
pub mod record_batch;
pub mod record_batch_reader;
pub mod schema;
pub mod table;

Expand All @@ -26,6 +27,7 @@ fn _rust(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<chunked::PyChunkedArray>()?;
m.add_class::<field::PyField>()?;
m.add_class::<record_batch::PyRecordBatch>()?;
m.add_class::<record_batch_reader::PyRecordBatchReader>()?;
m.add_class::<schema::PySchema>()?;
m.add_class::<table::PyTable>()?;

Expand Down
51 changes: 51 additions & 0 deletions arro3-core/src/record_batch_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::ffi::CString;

use arrow::ffi_stream::FFI_ArrowArrayStream;
use arrow_array::RecordBatchReader;
use pyo3::exceptions::PyIOError;
use pyo3::prelude::*;
use pyo3::types::PyCapsule;

use crate::error::PyArrowResult;

/// A wrapper around an [arrow_array::RecordBatchReader]
#[pyclass(module = "arro3.core._rust", name = "RecordBatchReader", subclass)]
pub struct PyRecordBatchReader(pub(crate) Option<Box<dyn RecordBatchReader + Send>>);

impl PyRecordBatchReader {
pub fn into_reader(mut self) -> PyArrowResult<Box<dyn RecordBatchReader + Send>> {
let stream = self
.0
.take()
.ok_or(PyIOError::new_err("Cannot write from closed stream."))?;
Ok(stream)
}
}

#[pymethods]
impl PyRecordBatchReader {
/// An implementation of the [Arrow PyCapsule
/// Interface](https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html).
/// This dunder method should not be called directly, but enables zero-copy
/// data transfer to other Python libraries that understand Arrow memory.
///
/// For example, you can call [`pyarrow.table()`][pyarrow.table] to convert this array
/// into a pyarrow table, without copying memory.
fn __arrow_c_stream__(
&mut self,
_requested_schema: Option<PyObject>,
) -> PyArrowResult<PyObject> {
let reader = self
.0
.take()
.ok_or(PyIOError::new_err("Cannot read from closed stream"))?;

let ffi_stream = FFI_ArrowArrayStream::new(reader);
let stream_capsule_name = CString::new("arrow_array_stream").unwrap();

Python::with_gil(|py| {
let stream_capsule = PyCapsule::new(py, ffi_stream, Some(stream_capsule_name))?;
Ok(stream_capsule.to_object(py))
})
}
}

0 comments on commit 02f7d56

Please sign in to comment.