Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prefactor] new engine_data and error ffi modules #537

Merged
merged 4 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions ffi/src/engine_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//! EngineData related ffi code

use delta_kernel::{DeltaResult, EngineData};
use std::ffi::c_void;

use crate::{ExclusiveEngineData, ExternResult, IntoExternResult, SharedExternEngine};

use super::handle::Handle;

/// Get the number of rows in an engine data
///
/// # Safety
/// `data_handle` must be a valid pointer to a kernel allocated `ExclusiveEngineData`
#[no_mangle]
pub unsafe extern "C" fn engine_data_length(data: &mut Handle<ExclusiveEngineData>) -> usize {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: This is a somewhat odd API... presumably engine provided this engine data instance to kernel in the first place, so presumably engine should already know how to get the length if it wants it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually... pretty much everything in this file feels weird/unsafe/incomplete (check out all those TODO comments). I guess eventually we'll need to clean all this up.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this file is/was mostly a quick hack to allow the example program to actually read data. do note that if you're using the default engine this stuff is relevant and useful, but just that we haven't figured out all the ownership details quite yet

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I think this calls for a follow up issue

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feel free to chime in there: #538

let data = unsafe { data.as_mut() };
data.len()
}

/// Allow an engine to "unwrap" an [`ExclusiveEngineData`] into the raw pointer for the case it wants
/// to use its own engine data format
///
/// # Safety
///
/// `data_handle` must be a valid pointer to a kernel allocated `ExclusiveEngineData`. The Engine must
/// ensure the handle outlives the returned pointer.
// TODO(frj): What is the engine actually doing with this method?? If we need access to raw extern
// pointers, we will need to define an `ExternEngineData` trait that exposes such capability, along
// with an ExternEngineDataVtable that implements it. See `ExternEngine` and `ExternEngineVtable`
// for examples of how that works.
#[no_mangle]
pub unsafe extern "C" fn get_raw_engine_data(mut data: Handle<ExclusiveEngineData>) -> *mut c_void {
let ptr = get_raw_engine_data_impl(&mut data) as *mut dyn EngineData;
ptr as _
}

unsafe fn get_raw_engine_data_impl(data: &mut Handle<ExclusiveEngineData>) -> &mut dyn EngineData {
let _data = unsafe { data.as_mut() };
todo!() // See TODO comment for EngineData
}

/// Struct to allow binding to the arrow [C Data
/// Interface](https://arrow.apache.org/docs/format/CDataInterface.html). This includes the data and
/// the schema.
#[cfg(feature = "default-engine")]
#[repr(C)]
pub struct ArrowFFIData {
pub array: arrow_data::ffi::FFI_ArrowArray,
pub schema: arrow_schema::ffi::FFI_ArrowSchema,
}

// TODO: This should use a callback to avoid having to have the engine free the struct
/// Get an [`ArrowFFIData`] to allow binding to the arrow [C Data
/// Interface](https://arrow.apache.org/docs/format/CDataInterface.html). This includes the data and
/// the schema. If this function returns an `Ok` variant the _engine_ must free the returned struct.
///
/// # Safety
/// data_handle must be a valid ExclusiveEngineData as read by the
/// [`delta_kernel::engine::default::DefaultEngine`] obtained from `get_default_engine`.
#[cfg(feature = "default-engine")]
#[no_mangle]
pub unsafe extern "C" fn get_raw_arrow_data(
data: Handle<ExclusiveEngineData>,
engine: Handle<SharedExternEngine>,
) -> ExternResult<*mut ArrowFFIData> {
// TODO(frj): This consumes the handle. Is that what we really want?
let data = unsafe { data.into_inner() };
get_raw_arrow_data_impl(data).into_extern_result(&engine.as_ref())
}

// TODO: This method leaks the returned pointer memory. How will the engine free it?
#[cfg(feature = "default-engine")]
fn get_raw_arrow_data_impl(data: Box<dyn EngineData>) -> DeltaResult<*mut ArrowFFIData> {
let record_batch: arrow_array::RecordBatch = data
.into_any()
.downcast::<delta_kernel::engine::arrow_data::ArrowEngineData>()
.map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))?
.into();
let sa: arrow_array::StructArray = record_batch.into();
let array_data: arrow_data::ArrayData = sa.into();
// these call `clone`. is there a way to not copy anything and what exactly are they cloning?
let array = arrow_data::ffi::FFI_ArrowArray::new(&array_data);
let schema = arrow_schema::ffi::FFI_ArrowSchema::try_from(array_data.data_type())?;
let ret_data = Box::new(ArrowFFIData { array, schema });
Ok(Box::leak(ret_data))
}
205 changes: 205 additions & 0 deletions ffi/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
use delta_kernel::{DeltaResult, Error};

use crate::{kernel_string_slice, ExternEngine, KernelStringSlice};

#[repr(C)]
#[derive(Debug)]
pub enum KernelError {
UnknownError, // catch-all for unrecognized kernel Error types
FFIError, // errors encountered in the code layer that supports FFI
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
ArrowError,
EngineDataTypeError,
ExtractError,
GenericError,
IOErrorError,
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
ParquetError,
#[cfg(feature = "default-engine")]
ObjectStoreError,
#[cfg(feature = "default-engine")]
ObjectStorePathError,
#[cfg(feature = "default-engine")]
ReqwestError,
FileNotFoundError,
MissingColumnError,
UnexpectedColumnTypeError,
MissingDataError,
MissingVersionError,
DeletionVectorError,
InvalidUrlError,
MalformedJsonError,
MissingMetadataError,
MissingProtocolError,
InvalidProtocolError,
MissingMetadataAndProtocolError,
ParseError,
JoinFailureError,
Utf8Error,
ParseIntError,
InvalidColumnMappingModeError,
InvalidTableLocationError,
InvalidDecimalError,
InvalidStructDataError,
InternalError,
InvalidExpression,
InvalidLogPath,
InvalidCommitInfo,
FileAlreadyExists,
MissingCommitInfo,
UnsupportedError,
ParseIntervalError,
ChangeDataFeedUnsupported,
}

impl From<Error> for KernelError {
fn from(e: Error) -> Self {
match e {
// NOTE: By definition, no kernel Error maps to FFIError
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
Error::Arrow(_) => KernelError::ArrowError,
Error::EngineDataType(_) => KernelError::EngineDataTypeError,
Error::Extract(..) => KernelError::ExtractError,
Error::Generic(_) => KernelError::GenericError,
Error::GenericError { .. } => KernelError::GenericError,
Error::IOError(_) => KernelError::IOErrorError,
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
Error::Parquet(_) => KernelError::ParquetError,
#[cfg(feature = "default-engine")]
Error::ObjectStore(_) => KernelError::ObjectStoreError,
#[cfg(feature = "default-engine")]
Error::ObjectStorePath(_) => KernelError::ObjectStorePathError,
#[cfg(feature = "default-engine")]
Error::Reqwest(_) => KernelError::ReqwestError,
Error::FileNotFound(_) => KernelError::FileNotFoundError,
Error::MissingColumn(_) => KernelError::MissingColumnError,
Error::UnexpectedColumnType(_) => KernelError::UnexpectedColumnTypeError,
Error::MissingData(_) => KernelError::MissingDataError,
Error::MissingVersion => KernelError::MissingVersionError,
Error::DeletionVector(_) => KernelError::DeletionVectorError,
Error::InvalidUrl(_) => KernelError::InvalidUrlError,
Error::MalformedJson(_) => KernelError::MalformedJsonError,
Error::MissingMetadata => KernelError::MissingMetadataError,
Error::MissingProtocol => KernelError::MissingProtocolError,
Error::InvalidProtocol(_) => KernelError::InvalidProtocolError,
Error::MissingMetadataAndProtocol => KernelError::MissingMetadataAndProtocolError,
Error::ParseError(..) => KernelError::ParseError,
Error::JoinFailure(_) => KernelError::JoinFailureError,
Error::Utf8Error(_) => KernelError::Utf8Error,
Error::ParseIntError(_) => KernelError::ParseIntError,
Error::InvalidColumnMappingMode(_) => KernelError::InvalidColumnMappingModeError,
Error::InvalidTableLocation(_) => KernelError::InvalidTableLocationError,
Error::InvalidDecimal(_) => KernelError::InvalidDecimalError,
Error::InvalidStructData(_) => KernelError::InvalidStructDataError,
Error::InternalError(_) => KernelError::InternalError,
Error::Backtraced {
source,
backtrace: _,
} => Self::from(*source),
Error::InvalidExpressionEvaluation(_) => KernelError::InvalidExpression,
Error::InvalidLogPath(_) => KernelError::InvalidLogPath,
Error::InvalidCommitInfo(_) => KernelError::InvalidCommitInfo,
Error::FileAlreadyExists(_) => KernelError::FileAlreadyExists,
Error::MissingCommitInfo => KernelError::MissingCommitInfo,
Error::Unsupported(_) => KernelError::UnsupportedError,
Error::ParseIntervalError(_) => KernelError::ParseIntervalError,
Error::ChangeDataFeedUnsupported(_) => KernelError::ChangeDataFeedUnsupported,
}
}
}

/// An error that can be returned to the engine. Engines that wish to associate additional
/// information can define and use any type that is [pointer
/// interconvertible](https://en.cppreference.com/w/cpp/language/static_cast#pointer-interconvertible)
/// with this one -- e.g. by subclassing this struct or by embedding this struct as the first member
/// of a [standard layout](https://en.cppreference.com/w/cpp/language/data_members#Standard-layout)
/// class.
#[repr(C)]
pub struct EngineError {
pub(crate) etype: KernelError,
}

/// Semantics: Kernel will always immediately return the leaked engine error to the engine (if it
/// allocated one at all), and engine is responsible for freeing it.
#[repr(C)]
pub enum ExternResult<T> {
Ok(T),
Err(*mut EngineError),
}

pub type AllocateErrorFn =
extern "C" fn(etype: KernelError, msg: KernelStringSlice) -> *mut EngineError;

// NOTE: We can't "just" impl From<DeltaResult<T>> because we require an error allocator.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems misplaced? Maybe it belongs with the IntoExternResult trait below?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems reasonable - moved!

impl<T> ExternResult<T> {
pub fn is_ok(&self) -> bool {
match self {
Self::Ok(_) => true,
Self::Err(_) => false,
}
}
pub fn is_err(&self) -> bool {
!self.is_ok()
}
}

/// Represents an engine error allocator. Ultimately all implementations will fall back to an
/// [`AllocateErrorFn`] provided by the engine, but the trait allows us to conveniently access the
/// allocator in various types that may wrap it.
pub trait AllocateError {
/// Allocates a new error in engine memory and returns the resulting pointer. The engine is
/// expected to copy the passed-in message, which is only guaranteed to remain valid until the
/// call returns. Kernel will always immediately return the result of this method to the engine.
///
/// # Safety
///
/// The string slice must be valid until the call returns, and the error allocator must also be
/// valid.
unsafe fn allocate_error(&self, etype: KernelError, msg: KernelStringSlice)
-> *mut EngineError;
}

impl AllocateError for AllocateErrorFn {
unsafe fn allocate_error(
&self,
etype: KernelError,
msg: KernelStringSlice,
) -> *mut EngineError {
self(etype, msg)
}
}

impl AllocateError for &dyn ExternEngine {
/// # Safety
///
/// In addition to the usual requirements, the engine handle must be valid.
unsafe fn allocate_error(
&self,
etype: KernelError,
msg: KernelStringSlice,
) -> *mut EngineError {
self.error_allocator().allocate_error(etype, msg)
}
}

/// Converts a [DeltaResult] into an [ExternResult], using the engine's error allocator.
///
/// # Safety
///
/// The allocator must be valid.
pub(crate) trait IntoExternResult<T> {
unsafe fn into_extern_result(self, alloc: &dyn AllocateError) -> ExternResult<T>;
}

impl<T> IntoExternResult<T> for DeltaResult<T> {
unsafe fn into_extern_result(self, alloc: &dyn AllocateError) -> ExternResult<T> {
match self {
Ok(ok) => ExternResult::Ok(ok),
Err(err) => {
let msg = format!("{}", err);
let err = unsafe { alloc.allocate_error(err.into(), kernel_string_slice!(msg)) };
ExternResult::Err(err)
}
}
}
}
Loading
Loading