Skip to content

Commit

Permalink
Add FFI from_raw (#5082)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Nov 17, 2023
1 parent aff86e7 commit a3687a7
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 14 deletions.
16 changes: 16 additions & 0 deletions arrow-data/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,22 @@ impl FFI_ArrowArray {
}
}

/// Takes ownership of the pointed to [`FFI_ArrowArray`]
///
/// This acts to [move] the data out of `array`, setting the release callback to NULL
///
/// # Safety
///
/// * `array` must be [valid] for reads and writes
/// * `array` must be properly aligned
/// * `array` must point to a properly initialized value of [`FFI_ArrowArray`]
///
/// [move]: https://arrow.apache.org/docs/format/CDataInterface.html#moving-an-array
/// [valid]: https://doc.rust-lang.org/std/ptr/index.html#safety
pub unsafe fn from_raw(array: *mut FFI_ArrowArray) -> Self {
std::ptr::replace(array, Self::empty())
}

/// create an empty `FFI_ArrowArray`, which can be used to import data into
pub fn empty() -> Self {
Self {
Expand Down
16 changes: 16 additions & 0 deletions arrow-schema/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,22 @@ impl FFI_ArrowSchema {
Ok(self)
}

/// Takes ownership of the pointed to [`FFI_ArrowSchema`]
///
/// This acts to [move] the data out of `schema`, setting the release callback to NULL
///
/// # Safety
///
/// * `schema` must be [valid] for reads and writes
/// * `schema` must be properly aligned
/// * `schema` must point to a properly initialized value of [`FFI_ArrowSchema`]
///
/// [move]: https://arrow.apache.org/docs/format/CDataInterface.html#moving-an-array
/// [valid]: https://doc.rust-lang.org/std/ptr/index.html#safety
pub unsafe fn from_raw(schema: *mut FFI_ArrowSchema) -> Self {
std::ptr::replace(schema, Self::empty())
}

pub fn empty() -> Self {
Self {
format: std::ptr::null_mut(),
Expand Down
27 changes: 21 additions & 6 deletions arrow/src/ffi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,22 @@ impl FFI_ArrowArrayStream {
}
}

/// Takes ownership of the pointed to [`FFI_ArrowArrayStream`]
///
/// This acts to [move] the data out of `raw_stream`, setting the release callback to NULL
///
/// # Safety
///
/// * `raw_stream` must be [valid] for reads and writes
/// * `raw_stream` must be properly aligned
/// * `raw_stream` must point to a properly initialized value of [`FFI_ArrowArrayStream`]
///
/// [move]: https://arrow.apache.org/docs/format/CDataInterface.html#moving-an-array
/// [valid]: https://doc.rust-lang.org/std/ptr/index.html#safety
pub unsafe fn from_raw(raw_stream: *mut FFI_ArrowArrayStream) -> Self {
std::ptr::replace(raw_stream, Self::empty())
}

/// Creates a new empty [FFI_ArrowArrayStream]. Used to import from the C Stream Interface.
pub fn empty() -> Self {
Self {
Expand Down Expand Up @@ -306,11 +322,10 @@ impl ArrowArrayStreamReader {
/// the pointer.
///
/// # Safety
/// This function dereferences a raw pointer of `FFI_ArrowArrayStream`.
///
/// See [`FFI_ArrowArrayStream::from_raw`]
pub unsafe fn from_raw(raw_stream: *mut FFI_ArrowArrayStream) -> Result<Self> {
let stream_data = std::ptr::replace(raw_stream, FFI_ArrowArrayStream::empty());

Self::try_new(stream_data)
Self::try_new(FFI_ArrowArrayStream::from_raw(raw_stream))
}

/// Get the last error from `ArrowArrayStreamReader`
Expand Down Expand Up @@ -368,6 +383,7 @@ impl RecordBatchReader for ArrowArrayStreamReader {
/// # Safety
/// Assumes that the pointer represents valid C Stream Interfaces, both in memory
/// representation and lifetime via the `release` mechanism.
#[deprecated(note = "Use FFI_ArrowArrayStream::new")]
pub unsafe fn export_reader_into_raw(
reader: Box<dyn RecordBatchReader + Send>,
out_stream: *mut FFI_ArrowArrayStream,
Expand Down Expand Up @@ -426,8 +442,7 @@ mod tests {
let reader = TestRecordBatchReader::new(schema.clone(), iter);

// Export a `RecordBatchReader` through `FFI_ArrowArrayStream`
let mut ffi_stream = FFI_ArrowArrayStream::empty();
unsafe { export_reader_into_raw(reader, &mut ffi_stream) };
let mut ffi_stream = FFI_ArrowArrayStream::new(reader);

// Get schema from `FFI_ArrowArrayStream`
let mut ffi_schema = FFI_ArrowSchema::empty();
Expand Down
12 changes: 4 additions & 8 deletions arrow/src/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,7 @@ impl FromPyArrow for ArrayData {
validate_pycapsule(array_capsule, "arrow_array")?;

let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
let array_ptr = array_capsule.pointer() as *mut FFI_ArrowArray;
let array = unsafe { std::ptr::replace(array_ptr, FFI_ArrowArray::empty()) };
let array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer() as _) };
return ffi::from_ffi(array, schema_ptr).map_err(to_py_err);
}

Expand Down Expand Up @@ -348,8 +347,7 @@ impl FromPyArrow for RecordBatch {
validate_pycapsule(array_capsule, "arrow_array")?;

let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
let array_ptr = array_capsule.pointer() as *mut FFI_ArrowArray;
let ffi_array = unsafe { std::ptr::replace(array_ptr, FFI_ArrowArray::empty()) };
let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer() as _) };
let array_data = ffi::from_ffi(ffi_array, schema_ptr).map_err(to_py_err)?;
if !matches!(array_data.data_type(), DataType::Struct(_)) {
return Err(PyTypeError::new_err(
Expand Down Expand Up @@ -397,8 +395,7 @@ impl FromPyArrow for ArrowArrayStreamReader {
PyTryInto::try_into(value.getattr("__arrow_c_stream__")?.call0()?)?;
validate_pycapsule(capsule, "arrow_array_stream")?;

let stream_ptr = capsule.pointer() as *mut FFI_ArrowArrayStream;
let stream = unsafe { std::ptr::replace(stream_ptr, FFI_ArrowArrayStream::empty()) };
let stream = unsafe { FFI_ArrowArrayStream::from_raw(capsule.pointer() as _) };

let stream_reader = ArrowArrayStreamReader::try_new(stream)
.map_err(|err| PyValueError::new_err(err.to_string()))?;
Expand Down Expand Up @@ -430,8 +427,7 @@ impl IntoPyArrow for Box<dyn RecordBatchReader + Send> {
// We can't implement `ToPyArrow` for `T: RecordBatchReader + Send` because
// there is already a blanket implementation for `T: ToPyArrow`.
fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
let mut stream = FFI_ArrowArrayStream::empty();
unsafe { export_reader_into_raw(self, &mut stream) };
let mut stream = FFI_ArrowArrayStream::new(self);

let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream;
let module = py.import("pyarrow")?;
Expand Down

0 comments on commit a3687a7

Please sign in to comment.