diff --git a/ffi/src/engine_data.rs b/ffi/src/engine_data.rs new file mode 100644 index 000000000..3363c9034 --- /dev/null +++ b/ffi/src/engine_data.rs @@ -0,0 +1,86 @@ +//! EngineData related ffi code + +use delta_kernel::{DeltaResult, EngineData}; +use std::ffi::c_void; + +use crate::{ExclusiveEngineData, ExternResult, IntoExternResult, SharedExternEngine}; + +use super::handle::Handle; + +/// Get the number of rows in an engine data +/// +/// # Safety +/// `data_handle` must be a valid pointer to a kernel allocated `ExclusiveEngineData` +#[no_mangle] +pub unsafe extern "C" fn engine_data_length(data: &mut Handle) -> usize { + let data = unsafe { data.as_mut() }; + data.len() +} + +/// Allow an engine to "unwrap" an [`ExclusiveEngineData`] into the raw pointer for the case it wants +/// to use its own engine data format +/// +/// # Safety +/// +/// `data_handle` must be a valid pointer to a kernel allocated `ExclusiveEngineData`. The Engine must +/// ensure the handle outlives the returned pointer. +// TODO(frj): What is the engine actually doing with this method?? If we need access to raw extern +// pointers, we will need to define an `ExternEngineData` trait that exposes such capability, along +// with an ExternEngineDataVtable that implements it. See `ExternEngine` and `ExternEngineVtable` +// for examples of how that works. +#[no_mangle] +pub unsafe extern "C" fn get_raw_engine_data(mut data: Handle) -> *mut c_void { + let ptr = get_raw_engine_data_impl(&mut data) as *mut dyn EngineData; + ptr as _ +} + +unsafe fn get_raw_engine_data_impl(data: &mut Handle) -> &mut dyn EngineData { + let _data = unsafe { data.as_mut() }; + todo!() // See TODO comment for EngineData +} + +/// Struct to allow binding to the arrow [C Data +/// Interface](https://arrow.apache.org/docs/format/CDataInterface.html). This includes the data and +/// the schema. +#[cfg(feature = "default-engine")] +#[repr(C)] +pub struct ArrowFFIData { + pub array: arrow_data::ffi::FFI_ArrowArray, + pub schema: arrow_schema::ffi::FFI_ArrowSchema, +} + +// TODO: This should use a callback to avoid having to have the engine free the struct +/// Get an [`ArrowFFIData`] to allow binding to the arrow [C Data +/// Interface](https://arrow.apache.org/docs/format/CDataInterface.html). This includes the data and +/// the schema. If this function returns an `Ok` variant the _engine_ must free the returned struct. +/// +/// # Safety +/// data_handle must be a valid ExclusiveEngineData as read by the +/// [`delta_kernel::engine::default::DefaultEngine`] obtained from `get_default_engine`. +#[cfg(feature = "default-engine")] +#[no_mangle] +pub unsafe extern "C" fn get_raw_arrow_data( + data: Handle, + engine: Handle, +) -> ExternResult<*mut ArrowFFIData> { + // TODO(frj): This consumes the handle. Is that what we really want? + let data = unsafe { data.into_inner() }; + get_raw_arrow_data_impl(data).into_extern_result(&engine.as_ref()) +} + +// TODO: This method leaks the returned pointer memory. How will the engine free it? +#[cfg(feature = "default-engine")] +fn get_raw_arrow_data_impl(data: Box) -> DeltaResult<*mut ArrowFFIData> { + let record_batch: arrow_array::RecordBatch = data + .into_any() + .downcast::() + .map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))? + .into(); + let sa: arrow_array::StructArray = record_batch.into(); + let array_data: arrow_data::ArrayData = sa.into(); + // these call `clone`. is there a way to not copy anything and what exactly are they cloning? + let array = arrow_data::ffi::FFI_ArrowArray::new(&array_data); + let schema = arrow_schema::ffi::FFI_ArrowSchema::try_from(array_data.data_type())?; + let ret_data = Box::new(ArrowFFIData { array, schema }); + Ok(Box::leak(ret_data)) +} diff --git a/ffi/src/error.rs b/ffi/src/error.rs new file mode 100644 index 000000000..4297037f9 --- /dev/null +++ b/ffi/src/error.rs @@ -0,0 +1,205 @@ +use delta_kernel::{DeltaResult, Error}; + +use crate::{kernel_string_slice, ExternEngine, KernelStringSlice}; + +#[repr(C)] +#[derive(Debug)] +pub enum KernelError { + UnknownError, // catch-all for unrecognized kernel Error types + FFIError, // errors encountered in the code layer that supports FFI + #[cfg(any(feature = "default-engine", feature = "sync-engine"))] + ArrowError, + EngineDataTypeError, + ExtractError, + GenericError, + IOErrorError, + #[cfg(any(feature = "default-engine", feature = "sync-engine"))] + ParquetError, + #[cfg(feature = "default-engine")] + ObjectStoreError, + #[cfg(feature = "default-engine")] + ObjectStorePathError, + #[cfg(feature = "default-engine")] + ReqwestError, + FileNotFoundError, + MissingColumnError, + UnexpectedColumnTypeError, + MissingDataError, + MissingVersionError, + DeletionVectorError, + InvalidUrlError, + MalformedJsonError, + MissingMetadataError, + MissingProtocolError, + InvalidProtocolError, + MissingMetadataAndProtocolError, + ParseError, + JoinFailureError, + Utf8Error, + ParseIntError, + InvalidColumnMappingModeError, + InvalidTableLocationError, + InvalidDecimalError, + InvalidStructDataError, + InternalError, + InvalidExpression, + InvalidLogPath, + InvalidCommitInfo, + FileAlreadyExists, + MissingCommitInfo, + UnsupportedError, + ParseIntervalError, + ChangeDataFeedUnsupported, +} + +impl From for KernelError { + fn from(e: Error) -> Self { + match e { + // NOTE: By definition, no kernel Error maps to FFIError + #[cfg(any(feature = "default-engine", feature = "sync-engine"))] + Error::Arrow(_) => KernelError::ArrowError, + Error::EngineDataType(_) => KernelError::EngineDataTypeError, + Error::Extract(..) => KernelError::ExtractError, + Error::Generic(_) => KernelError::GenericError, + Error::GenericError { .. } => KernelError::GenericError, + Error::IOError(_) => KernelError::IOErrorError, + #[cfg(any(feature = "default-engine", feature = "sync-engine"))] + Error::Parquet(_) => KernelError::ParquetError, + #[cfg(feature = "default-engine")] + Error::ObjectStore(_) => KernelError::ObjectStoreError, + #[cfg(feature = "default-engine")] + Error::ObjectStorePath(_) => KernelError::ObjectStorePathError, + #[cfg(feature = "default-engine")] + Error::Reqwest(_) => KernelError::ReqwestError, + Error::FileNotFound(_) => KernelError::FileNotFoundError, + Error::MissingColumn(_) => KernelError::MissingColumnError, + Error::UnexpectedColumnType(_) => KernelError::UnexpectedColumnTypeError, + Error::MissingData(_) => KernelError::MissingDataError, + Error::MissingVersion => KernelError::MissingVersionError, + Error::DeletionVector(_) => KernelError::DeletionVectorError, + Error::InvalidUrl(_) => KernelError::InvalidUrlError, + Error::MalformedJson(_) => KernelError::MalformedJsonError, + Error::MissingMetadata => KernelError::MissingMetadataError, + Error::MissingProtocol => KernelError::MissingProtocolError, + Error::InvalidProtocol(_) => KernelError::InvalidProtocolError, + Error::MissingMetadataAndProtocol => KernelError::MissingMetadataAndProtocolError, + Error::ParseError(..) => KernelError::ParseError, + Error::JoinFailure(_) => KernelError::JoinFailureError, + Error::Utf8Error(_) => KernelError::Utf8Error, + Error::ParseIntError(_) => KernelError::ParseIntError, + Error::InvalidColumnMappingMode(_) => KernelError::InvalidColumnMappingModeError, + Error::InvalidTableLocation(_) => KernelError::InvalidTableLocationError, + Error::InvalidDecimal(_) => KernelError::InvalidDecimalError, + Error::InvalidStructData(_) => KernelError::InvalidStructDataError, + Error::InternalError(_) => KernelError::InternalError, + Error::Backtraced { + source, + backtrace: _, + } => Self::from(*source), + Error::InvalidExpressionEvaluation(_) => KernelError::InvalidExpression, + Error::InvalidLogPath(_) => KernelError::InvalidLogPath, + Error::InvalidCommitInfo(_) => KernelError::InvalidCommitInfo, + Error::FileAlreadyExists(_) => KernelError::FileAlreadyExists, + Error::MissingCommitInfo => KernelError::MissingCommitInfo, + Error::Unsupported(_) => KernelError::UnsupportedError, + Error::ParseIntervalError(_) => KernelError::ParseIntervalError, + Error::ChangeDataFeedUnsupported(_) => KernelError::ChangeDataFeedUnsupported, + } + } +} + +/// An error that can be returned to the engine. Engines that wish to associate additional +/// information can define and use any type that is [pointer +/// interconvertible](https://en.cppreference.com/w/cpp/language/static_cast#pointer-interconvertible) +/// with this one -- e.g. by subclassing this struct or by embedding this struct as the first member +/// of a [standard layout](https://en.cppreference.com/w/cpp/language/data_members#Standard-layout) +/// class. +#[repr(C)] +pub struct EngineError { + pub(crate) etype: KernelError, +} + +/// Semantics: Kernel will always immediately return the leaked engine error to the engine (if it +/// allocated one at all), and engine is responsible for freeing it. +#[repr(C)] +pub enum ExternResult { + Ok(T), + Err(*mut EngineError), +} + +pub type AllocateErrorFn = + extern "C" fn(etype: KernelError, msg: KernelStringSlice) -> *mut EngineError; + +impl ExternResult { + pub fn is_ok(&self) -> bool { + match self { + Self::Ok(_) => true, + Self::Err(_) => false, + } + } + pub fn is_err(&self) -> bool { + !self.is_ok() + } +} + +/// Represents an engine error allocator. Ultimately all implementations will fall back to an +/// [`AllocateErrorFn`] provided by the engine, but the trait allows us to conveniently access the +/// allocator in various types that may wrap it. +pub trait AllocateError { + /// Allocates a new error in engine memory and returns the resulting pointer. The engine is + /// expected to copy the passed-in message, which is only guaranteed to remain valid until the + /// call returns. Kernel will always immediately return the result of this method to the engine. + /// + /// # Safety + /// + /// The string slice must be valid until the call returns, and the error allocator must also be + /// valid. + unsafe fn allocate_error(&self, etype: KernelError, msg: KernelStringSlice) + -> *mut EngineError; +} + +impl AllocateError for AllocateErrorFn { + unsafe fn allocate_error( + &self, + etype: KernelError, + msg: KernelStringSlice, + ) -> *mut EngineError { + self(etype, msg) + } +} + +impl AllocateError for &dyn ExternEngine { + /// # Safety + /// + /// In addition to the usual requirements, the engine handle must be valid. + unsafe fn allocate_error( + &self, + etype: KernelError, + msg: KernelStringSlice, + ) -> *mut EngineError { + self.error_allocator().allocate_error(etype, msg) + } +} + +/// Converts a [DeltaResult] into an [ExternResult], using the engine's error allocator. +/// +/// # Safety +/// +/// The allocator must be valid. +pub(crate) trait IntoExternResult { + unsafe fn into_extern_result(self, alloc: &dyn AllocateError) -> ExternResult; +} + +// NOTE: We can't "just" impl From> because we require an error allocator. +impl IntoExternResult for DeltaResult { + unsafe fn into_extern_result(self, alloc: &dyn AllocateError) -> ExternResult { + match self { + Ok(ok) => ExternResult::Ok(ok), + Err(err) => { + let msg = format!("{}", err); + let err = unsafe { alloc.allocate_error(err.into(), kernel_string_slice!(msg)) }; + ExternResult::Err(err) + } + } + } +} diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index ef02c15ee..c3f0df935 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -1,6 +1,7 @@ -/// FFI interface for the delta kernel -/// -/// Exposes that an engine needs to call from C/C++ to interface with kernel +//! FFI interface for the delta kernel +//! +//! Exposes that an engine needs to call from C/C++ to interface with kernel + #[cfg(feature = "default-engine")] use std::collections::HashMap; use std::default::Default; @@ -11,7 +12,7 @@ use tracing::debug; use url::Url; use delta_kernel::snapshot::Snapshot; -use delta_kernel::{DeltaResult, Engine, EngineData, Error, Table}; +use delta_kernel::{DeltaResult, Engine, EngineData, Table}; use delta_kernel_ffi_macros::handle_descriptor; // cbindgen doesn't understand our use of feature flags here, and by default it parses `mod handle` @@ -29,7 +30,10 @@ use handle::Handle; // relies on `crate::` extern crate self as delta_kernel_ffi; +pub mod engine_data; pub mod engine_funcs; +pub mod error; +use error::{AllocateError, AllocateErrorFn, ExternResult, IntoExternResult}; pub mod expressions; pub mod scan; pub mod schema; @@ -337,208 +341,6 @@ pub unsafe extern "C" fn free_engine_data(engine_data: Handle for KernelError { - fn from(e: Error) -> Self { - match e { - // NOTE: By definition, no kernel Error maps to FFIError - #[cfg(any(feature = "default-engine", feature = "sync-engine"))] - Error::Arrow(_) => KernelError::ArrowError, - Error::EngineDataType(_) => KernelError::EngineDataTypeError, - Error::Extract(..) => KernelError::ExtractError, - Error::Generic(_) => KernelError::GenericError, - Error::GenericError { .. } => KernelError::GenericError, - Error::IOError(_) => KernelError::IOErrorError, - #[cfg(any(feature = "default-engine", feature = "sync-engine"))] - Error::Parquet(_) => KernelError::ParquetError, - #[cfg(feature = "default-engine")] - Error::ObjectStore(_) => KernelError::ObjectStoreError, - #[cfg(feature = "default-engine")] - Error::ObjectStorePath(_) => KernelError::ObjectStorePathError, - #[cfg(feature = "default-engine")] - Error::Reqwest(_) => KernelError::ReqwestError, - Error::FileNotFound(_) => KernelError::FileNotFoundError, - Error::MissingColumn(_) => KernelError::MissingColumnError, - Error::UnexpectedColumnType(_) => KernelError::UnexpectedColumnTypeError, - Error::MissingData(_) => KernelError::MissingDataError, - Error::MissingVersion => KernelError::MissingVersionError, - Error::DeletionVector(_) => KernelError::DeletionVectorError, - Error::InvalidUrl(_) => KernelError::InvalidUrlError, - Error::MalformedJson(_) => KernelError::MalformedJsonError, - Error::MissingMetadata => KernelError::MissingMetadataError, - Error::MissingProtocol => KernelError::MissingProtocolError, - Error::InvalidProtocol(_) => KernelError::InvalidProtocolError, - Error::MissingMetadataAndProtocol => KernelError::MissingMetadataAndProtocolError, - Error::ParseError(..) => KernelError::ParseError, - Error::JoinFailure(_) => KernelError::JoinFailureError, - Error::Utf8Error(_) => KernelError::Utf8Error, - Error::ParseIntError(_) => KernelError::ParseIntError, - Error::InvalidColumnMappingMode(_) => KernelError::InvalidColumnMappingModeError, - Error::InvalidTableLocation(_) => KernelError::InvalidTableLocationError, - Error::InvalidDecimal(_) => KernelError::InvalidDecimalError, - Error::InvalidStructData(_) => KernelError::InvalidStructDataError, - Error::InternalError(_) => KernelError::InternalError, - Error::Backtraced { - source, - backtrace: _, - } => Self::from(*source), - Error::InvalidExpressionEvaluation(_) => KernelError::InvalidExpression, - Error::InvalidLogPath(_) => KernelError::InvalidLogPath, - Error::InvalidCommitInfo(_) => KernelError::InvalidCommitInfo, - Error::FileAlreadyExists(_) => KernelError::FileAlreadyExists, - Error::MissingCommitInfo => KernelError::MissingCommitInfo, - Error::Unsupported(_) => KernelError::UnsupportedError, - Error::ParseIntervalError(_) => KernelError::ParseIntervalError, - Error::ChangeDataFeedUnsupported(_) => KernelError::ChangeDataFeedUnsupported, - } - } -} - -/// An error that can be returned to the engine. Engines that wish to associate additional -/// information can define and use any type that is [pointer -/// interconvertible](https://en.cppreference.com/w/cpp/language/static_cast#pointer-interconvertible) -/// with this one -- e.g. by subclassing this struct or by embedding this struct as the first member -/// of a [standard layout](https://en.cppreference.com/w/cpp/language/data_members#Standard-layout) -/// class. -#[repr(C)] -pub struct EngineError { - etype: KernelError, -} - -/// Semantics: Kernel will always immediately return the leaked engine error to the engine (if it -/// allocated one at all), and engine is responsible for freeing it. -#[repr(C)] -pub enum ExternResult { - Ok(T), - Err(*mut EngineError), -} - -pub type AllocateErrorFn = - extern "C" fn(etype: KernelError, msg: KernelStringSlice) -> *mut EngineError; - -// NOTE: We can't "just" impl From> because we require an error allocator. -impl ExternResult { - pub fn is_ok(&self) -> bool { - match self { - Self::Ok(_) => true, - Self::Err(_) => false, - } - } - pub fn is_err(&self) -> bool { - !self.is_ok() - } -} - -/// Represents an engine error allocator. Ultimately all implementations will fall back to an -/// [`AllocateErrorFn`] provided by the engine, but the trait allows us to conveniently access the -/// allocator in various types that may wrap it. -pub trait AllocateError { - /// Allocates a new error in engine memory and returns the resulting pointer. The engine is - /// expected to copy the passed-in message, which is only guaranteed to remain valid until the - /// call returns. Kernel will always immediately return the result of this method to the engine. - /// - /// # Safety - /// - /// The string slice must be valid until the call returns, and the error allocator must also be - /// valid. - unsafe fn allocate_error(&self, etype: KernelError, msg: KernelStringSlice) - -> *mut EngineError; -} - -impl AllocateError for AllocateErrorFn { - unsafe fn allocate_error( - &self, - etype: KernelError, - msg: KernelStringSlice, - ) -> *mut EngineError { - self(etype, msg) - } -} - -impl AllocateError for &dyn ExternEngine { - /// # Safety - /// - /// In addition to the usual requirements, the engine handle must be valid. - unsafe fn allocate_error( - &self, - etype: KernelError, - msg: KernelStringSlice, - ) -> *mut EngineError { - self.error_allocator().allocate_error(etype, msg) - } -} - -/// Converts a [DeltaResult] into an [ExternResult], using the engine's error allocator. -/// -/// # Safety -/// -/// The allocator must be valid. -trait IntoExternResult { - unsafe fn into_extern_result(self, alloc: &dyn AllocateError) -> ExternResult; -} - -impl IntoExternResult for DeltaResult { - unsafe fn into_extern_result(self, alloc: &dyn AllocateError) -> ExternResult { - match self { - Ok(ok) => ExternResult::Ok(ok), - Err(err) => { - let msg = format!("{}", err); - let err = unsafe { alloc.allocate_error(err.into(), kernel_string_slice!(msg)) }; - ExternResult::Err(err) - } - } - } -} - // A wrapper for Engine which defines additional FFI-specific methods. pub trait ExternEngine: Send + Sync { fn engine(&self) -> Arc; @@ -909,6 +711,7 @@ impl Default for ReferenceSet { #[cfg(test)] mod tests { use super::*; + use crate::error::{EngineError, KernelError}; #[no_mangle] extern "C" fn allocate_err(etype: KernelError, _: KernelStringSlice) -> *mut EngineError { diff --git a/ffi/src/scan.rs b/ffi/src/scan.rs index 698a58fc1..51ca075b1 100644 --- a/ffi/src/scan.rs +++ b/ffi/src/scan.rs @@ -1,14 +1,13 @@ -//! Scan and EngineData related ffi code +//! Scan related ffi code use std::collections::HashMap; -use std::ffi::c_void; use std::sync::{Arc, Mutex}; use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState}; use delta_kernel::scan::{Scan, ScanData}; use delta_kernel::schema::Schema; use delta_kernel::snapshot::Snapshot; -use delta_kernel::{DeltaResult, EngineData, Error}; +use delta_kernel::{DeltaResult, Error}; use delta_kernel_ffi_macros::handle_descriptor; use tracing::debug; use url::Url; @@ -24,84 +23,6 @@ use crate::{ use super::handle::Handle; -/// Get the number of rows in an engine data -/// -/// # Safety -/// `data_handle` must be a valid pointer to a kernel allocated `ExclusiveEngineData` -#[no_mangle] -pub unsafe extern "C" fn engine_data_length(data: &mut Handle) -> usize { - let data = unsafe { data.as_mut() }; - data.len() -} - -/// Allow an engine to "unwrap" an [`ExclusiveEngineData`] into the raw pointer for the case it wants -/// to use its own engine data format -/// -/// # Safety -/// -/// `data_handle` must be a valid pointer to a kernel allocated `ExclusiveEngineData`. The Engine must -/// ensure the handle outlives the returned pointer. -// TODO(frj): What is the engine actually doing with this method?? If we need access to raw extern -// pointers, we will need to define an `ExternEngineData` trait that exposes such capability, along -// with an ExternEngineDataVtable that implements it. See `ExternEngine` and `ExternEngineVtable` -// for examples of how that works. -#[no_mangle] -pub unsafe extern "C" fn get_raw_engine_data(mut data: Handle) -> *mut c_void { - let ptr = get_raw_engine_data_impl(&mut data) as *mut dyn EngineData; - ptr as _ -} - -unsafe fn get_raw_engine_data_impl(data: &mut Handle) -> &mut dyn EngineData { - let _data = unsafe { data.as_mut() }; - todo!() // See TODO comment for EngineData -} - -/// Struct to allow binding to the arrow [C Data -/// Interface](https://arrow.apache.org/docs/format/CDataInterface.html). This includes the data and -/// the schema. -#[cfg(feature = "default-engine")] -#[repr(C)] -pub struct ArrowFFIData { - pub array: arrow_data::ffi::FFI_ArrowArray, - pub schema: arrow_schema::ffi::FFI_ArrowSchema, -} - -// TODO: This should use a callback to avoid having to have the engine free the struct -/// Get an [`ArrowFFIData`] to allow binding to the arrow [C Data -/// Interface](https://arrow.apache.org/docs/format/CDataInterface.html). This includes the data and -/// the schema. If this function returns an `Ok` variant the _engine_ must free the returned struct. -/// -/// # Safety -/// data_handle must be a valid ExclusiveEngineData as read by the -/// [`delta_kernel::engine::default::DefaultEngine`] obtained from `get_default_engine`. -#[cfg(feature = "default-engine")] -#[no_mangle] -pub unsafe extern "C" fn get_raw_arrow_data( - data: Handle, - engine: Handle, -) -> ExternResult<*mut ArrowFFIData> { - // TODO(frj): This consumes the handle. Is that what we really want? - let data = unsafe { data.into_inner() }; - get_raw_arrow_data_impl(data).into_extern_result(&engine.as_ref()) -} - -// TODO: This method leaks the returned pointer memory. How will the engine free it? -#[cfg(feature = "default-engine")] -fn get_raw_arrow_data_impl(data: Box) -> DeltaResult<*mut ArrowFFIData> { - let record_batch: arrow_array::RecordBatch = data - .into_any() - .downcast::() - .map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))? - .into(); - let sa: arrow_array::StructArray = record_batch.into(); - let array_data: arrow_data::ArrayData = sa.into(); - // these call `clone`. is there a way to not copy anything and what exactly are they cloning? - let array = arrow_data::ffi::FFI_ArrowArray::new(&array_data); - let schema = arrow_schema::ffi::FFI_ArrowSchema::try_from(array_data.data_type())?; - let ret_data = Box::new(ArrowFFIData { array, schema }); - Ok(Box::leak(ret_data)) -} - // TODO: Why do we even need to expose a scan, when the only thing an engine can do with it is // handit back to the kernel by calling `kernel_scan_data_init`? There isn't even an FFI method to // drop it!