Skip to content

Commit

Permalink
[prefactor] new engine_data and error ffi modules (#537)
Browse files Browse the repository at this point in the history
(no-change code movement PR)

`ffi/src/lib.rs` was getting pretty long and included pieces from
various things like error handling, etc. which are nicely-grouped as a
standalone module. Secondly, `ffi/src/scan.rs` included some engine data
utilities that could be moved out and likely shared in the future
(nothing scan-specific). This PR just moves out some of these
easily-grouped items into separate modules:

- new `mod engine_data`: the `pub fn`'s moved into `mod engine_data`
will need to be prefixed with `engine_data` now to resolve
- new `mod error`: i've done `pub use error::*` so error types will be
exposed the same.
  • Loading branch information
zachschuermann authored Nov 26, 2024
1 parent 7bfd119 commit a245355
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 287 deletions.
86 changes: 86 additions & 0 deletions ffi/src/engine_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//! EngineData related ffi code
use delta_kernel::{DeltaResult, EngineData};
use std::ffi::c_void;

use crate::{ExclusiveEngineData, ExternResult, IntoExternResult, SharedExternEngine};

use super::handle::Handle;

/// Get the number of rows in an engine data
///
/// # Safety
/// `data_handle` must be a valid pointer to a kernel allocated `ExclusiveEngineData`
#[no_mangle]
pub unsafe extern "C" fn engine_data_length(data: &mut Handle<ExclusiveEngineData>) -> usize {
let data = unsafe { data.as_mut() };
data.len()
}

/// Allow an engine to "unwrap" an [`ExclusiveEngineData`] into the raw pointer for the case it wants
/// to use its own engine data format
///
/// # Safety
///
/// `data_handle` must be a valid pointer to a kernel allocated `ExclusiveEngineData`. The Engine must
/// ensure the handle outlives the returned pointer.
// TODO(frj): What is the engine actually doing with this method?? If we need access to raw extern
// pointers, we will need to define an `ExternEngineData` trait that exposes such capability, along
// with an ExternEngineDataVtable that implements it. See `ExternEngine` and `ExternEngineVtable`
// for examples of how that works.
#[no_mangle]
pub unsafe extern "C" fn get_raw_engine_data(mut data: Handle<ExclusiveEngineData>) -> *mut c_void {
let ptr = get_raw_engine_data_impl(&mut data) as *mut dyn EngineData;
ptr as _
}

unsafe fn get_raw_engine_data_impl(data: &mut Handle<ExclusiveEngineData>) -> &mut dyn EngineData {
let _data = unsafe { data.as_mut() };
todo!() // See TODO comment for EngineData
}

/// Struct to allow binding to the arrow [C Data
/// Interface](https://arrow.apache.org/docs/format/CDataInterface.html). This includes the data and
/// the schema.
#[cfg(feature = "default-engine")]
#[repr(C)]
pub struct ArrowFFIData {
pub array: arrow_data::ffi::FFI_ArrowArray,
pub schema: arrow_schema::ffi::FFI_ArrowSchema,
}

// TODO: This should use a callback to avoid having to have the engine free the struct
/// Get an [`ArrowFFIData`] to allow binding to the arrow [C Data
/// Interface](https://arrow.apache.org/docs/format/CDataInterface.html). This includes the data and
/// the schema. If this function returns an `Ok` variant the _engine_ must free the returned struct.
///
/// # Safety
/// data_handle must be a valid ExclusiveEngineData as read by the
/// [`delta_kernel::engine::default::DefaultEngine`] obtained from `get_default_engine`.
#[cfg(feature = "default-engine")]
#[no_mangle]
pub unsafe extern "C" fn get_raw_arrow_data(
data: Handle<ExclusiveEngineData>,
engine: Handle<SharedExternEngine>,
) -> ExternResult<*mut ArrowFFIData> {
// TODO(frj): This consumes the handle. Is that what we really want?
let data = unsafe { data.into_inner() };
get_raw_arrow_data_impl(data).into_extern_result(&engine.as_ref())
}

// TODO: This method leaks the returned pointer memory. How will the engine free it?
#[cfg(feature = "default-engine")]
fn get_raw_arrow_data_impl(data: Box<dyn EngineData>) -> DeltaResult<*mut ArrowFFIData> {
let record_batch: arrow_array::RecordBatch = data
.into_any()
.downcast::<delta_kernel::engine::arrow_data::ArrowEngineData>()
.map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))?
.into();
let sa: arrow_array::StructArray = record_batch.into();
let array_data: arrow_data::ArrayData = sa.into();
// these call `clone`. is there a way to not copy anything and what exactly are they cloning?
let array = arrow_data::ffi::FFI_ArrowArray::new(&array_data);
let schema = arrow_schema::ffi::FFI_ArrowSchema::try_from(array_data.data_type())?;
let ret_data = Box::new(ArrowFFIData { array, schema });
Ok(Box::leak(ret_data))
}
205 changes: 205 additions & 0 deletions ffi/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
use delta_kernel::{DeltaResult, Error};

use crate::{kernel_string_slice, ExternEngine, KernelStringSlice};

#[repr(C)]
#[derive(Debug)]
pub enum KernelError {
UnknownError, // catch-all for unrecognized kernel Error types
FFIError, // errors encountered in the code layer that supports FFI
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
ArrowError,
EngineDataTypeError,
ExtractError,
GenericError,
IOErrorError,
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
ParquetError,
#[cfg(feature = "default-engine")]
ObjectStoreError,
#[cfg(feature = "default-engine")]
ObjectStorePathError,
#[cfg(feature = "default-engine")]
ReqwestError,
FileNotFoundError,
MissingColumnError,
UnexpectedColumnTypeError,
MissingDataError,
MissingVersionError,
DeletionVectorError,
InvalidUrlError,
MalformedJsonError,
MissingMetadataError,
MissingProtocolError,
InvalidProtocolError,
MissingMetadataAndProtocolError,
ParseError,
JoinFailureError,
Utf8Error,
ParseIntError,
InvalidColumnMappingModeError,
InvalidTableLocationError,
InvalidDecimalError,
InvalidStructDataError,
InternalError,
InvalidExpression,
InvalidLogPath,
InvalidCommitInfo,
FileAlreadyExists,
MissingCommitInfo,
UnsupportedError,
ParseIntervalError,
ChangeDataFeedUnsupported,
}

impl From<Error> for KernelError {
fn from(e: Error) -> Self {
match e {
// NOTE: By definition, no kernel Error maps to FFIError
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
Error::Arrow(_) => KernelError::ArrowError,
Error::EngineDataType(_) => KernelError::EngineDataTypeError,
Error::Extract(..) => KernelError::ExtractError,
Error::Generic(_) => KernelError::GenericError,
Error::GenericError { .. } => KernelError::GenericError,
Error::IOError(_) => KernelError::IOErrorError,
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
Error::Parquet(_) => KernelError::ParquetError,
#[cfg(feature = "default-engine")]
Error::ObjectStore(_) => KernelError::ObjectStoreError,
#[cfg(feature = "default-engine")]
Error::ObjectStorePath(_) => KernelError::ObjectStorePathError,
#[cfg(feature = "default-engine")]
Error::Reqwest(_) => KernelError::ReqwestError,
Error::FileNotFound(_) => KernelError::FileNotFoundError,
Error::MissingColumn(_) => KernelError::MissingColumnError,
Error::UnexpectedColumnType(_) => KernelError::UnexpectedColumnTypeError,
Error::MissingData(_) => KernelError::MissingDataError,
Error::MissingVersion => KernelError::MissingVersionError,
Error::DeletionVector(_) => KernelError::DeletionVectorError,
Error::InvalidUrl(_) => KernelError::InvalidUrlError,
Error::MalformedJson(_) => KernelError::MalformedJsonError,
Error::MissingMetadata => KernelError::MissingMetadataError,
Error::MissingProtocol => KernelError::MissingProtocolError,
Error::InvalidProtocol(_) => KernelError::InvalidProtocolError,
Error::MissingMetadataAndProtocol => KernelError::MissingMetadataAndProtocolError,
Error::ParseError(..) => KernelError::ParseError,
Error::JoinFailure(_) => KernelError::JoinFailureError,
Error::Utf8Error(_) => KernelError::Utf8Error,
Error::ParseIntError(_) => KernelError::ParseIntError,
Error::InvalidColumnMappingMode(_) => KernelError::InvalidColumnMappingModeError,
Error::InvalidTableLocation(_) => KernelError::InvalidTableLocationError,
Error::InvalidDecimal(_) => KernelError::InvalidDecimalError,
Error::InvalidStructData(_) => KernelError::InvalidStructDataError,
Error::InternalError(_) => KernelError::InternalError,
Error::Backtraced {
source,
backtrace: _,
} => Self::from(*source),
Error::InvalidExpressionEvaluation(_) => KernelError::InvalidExpression,
Error::InvalidLogPath(_) => KernelError::InvalidLogPath,
Error::InvalidCommitInfo(_) => KernelError::InvalidCommitInfo,
Error::FileAlreadyExists(_) => KernelError::FileAlreadyExists,
Error::MissingCommitInfo => KernelError::MissingCommitInfo,
Error::Unsupported(_) => KernelError::UnsupportedError,
Error::ParseIntervalError(_) => KernelError::ParseIntervalError,
Error::ChangeDataFeedUnsupported(_) => KernelError::ChangeDataFeedUnsupported,
}
}
}

/// An error that can be returned to the engine. Engines that wish to associate additional
/// information can define and use any type that is [pointer
/// interconvertible](https://en.cppreference.com/w/cpp/language/static_cast#pointer-interconvertible)
/// with this one -- e.g. by subclassing this struct or by embedding this struct as the first member
/// of a [standard layout](https://en.cppreference.com/w/cpp/language/data_members#Standard-layout)
/// class.
#[repr(C)]
pub struct EngineError {
pub(crate) etype: KernelError,
}

/// Semantics: Kernel will always immediately return the leaked engine error to the engine (if it
/// allocated one at all), and engine is responsible for freeing it.
#[repr(C)]
pub enum ExternResult<T> {
Ok(T),
Err(*mut EngineError),
}

pub type AllocateErrorFn =
extern "C" fn(etype: KernelError, msg: KernelStringSlice) -> *mut EngineError;

impl<T> ExternResult<T> {
pub fn is_ok(&self) -> bool {
match self {
Self::Ok(_) => true,
Self::Err(_) => false,
}
}
pub fn is_err(&self) -> bool {
!self.is_ok()
}
}

/// Represents an engine error allocator. Ultimately all implementations will fall back to an
/// [`AllocateErrorFn`] provided by the engine, but the trait allows us to conveniently access the
/// allocator in various types that may wrap it.
pub trait AllocateError {
/// Allocates a new error in engine memory and returns the resulting pointer. The engine is
/// expected to copy the passed-in message, which is only guaranteed to remain valid until the
/// call returns. Kernel will always immediately return the result of this method to the engine.
///
/// # Safety
///
/// The string slice must be valid until the call returns, and the error allocator must also be
/// valid.
unsafe fn allocate_error(&self, etype: KernelError, msg: KernelStringSlice)
-> *mut EngineError;
}

impl AllocateError for AllocateErrorFn {
unsafe fn allocate_error(
&self,
etype: KernelError,
msg: KernelStringSlice,
) -> *mut EngineError {
self(etype, msg)
}
}

impl AllocateError for &dyn ExternEngine {
/// # Safety
///
/// In addition to the usual requirements, the engine handle must be valid.
unsafe fn allocate_error(
&self,
etype: KernelError,
msg: KernelStringSlice,
) -> *mut EngineError {
self.error_allocator().allocate_error(etype, msg)
}
}

/// Converts a [DeltaResult] into an [ExternResult], using the engine's error allocator.
///
/// # Safety
///
/// The allocator must be valid.
pub(crate) trait IntoExternResult<T> {
unsafe fn into_extern_result(self, alloc: &dyn AllocateError) -> ExternResult<T>;
}

// NOTE: We can't "just" impl From<DeltaResult<T>> because we require an error allocator.
impl<T> IntoExternResult<T> for DeltaResult<T> {
unsafe fn into_extern_result(self, alloc: &dyn AllocateError) -> ExternResult<T> {
match self {
Ok(ok) => ExternResult::Ok(ok),
Err(err) => {
let msg = format!("{}", err);
let err = unsafe { alloc.allocate_error(err.into(), kernel_string_slice!(msg)) };
ExternResult::Err(err)
}
}
}
}
Loading

0 comments on commit a245355

Please sign in to comment.