From 5ff982e22d69cc5032436442452f81bac37b0ed0 Mon Sep 17 00:00:00 2001 From: Alexander Brassel Date: Thu, 23 May 2024 23:15:54 -0700 Subject: [PATCH] FFI Sync Engine changes --- ffi/Cargo.toml | 2 +- ffi/cffi-test.c | 70 +++++++++++++++++++++++++++++++------------------ ffi/src/lib.rs | 57 +++++++++++++++++++++++++++++++++++++--- 3 files changed, 99 insertions(+), 30 deletions(-) diff --git a/ffi/Cargo.toml b/ffi/Cargo.toml index becff06e9..a5e739fdb 100644 --- a/ffi/Cargo.toml +++ b/ffi/Cargo.toml @@ -37,7 +37,7 @@ libc = "0.2.147" rand = "0.8.5" [features] -default = ["default-engine"] +default = ["default-engine", "sync-engine"] default-engine = [ "delta_kernel/default-engine", "arrow-array", diff --git a/ffi/cffi-test.c b/ffi/cffi-test.c index e067dfb20..6e69069b7 100644 --- a/ffi/cffi-test.c +++ b/ffi/cffi-test.c @@ -4,30 +4,20 @@ #include "delta_kernel_ffi.h" -void visit_callback(void* engine_context, KernelStringSlice path, int64_t size, const DvInfo *dv_info, const CStringMap *partition_values) { +void visit_callback(void *engine_context, KernelStringSlice path, int64_t size, const DvInfo *dv_info, const CStringMap *partition_values) +{ printf("file: %.*s\n", (int)path.len, path.ptr); } - -void visit_data(void *engine_context, EngineData *engine_data, const KernelBoolSlice selection_vec) { +void visit_data(void *engine_context, EngineData *engine_data, const KernelBoolSlice selection_vec) +{ visit_scan_data(engine_data, selection_vec, engine_context, visit_callback); } -int main(int argc, char* argv[]) { - - if (argc < 2) { - printf("Usage: %s table/path\n", argv[0]); - return -1; - } - - char* table_path = argv[1]; - printf("Reading table at %s\n", table_path); - - KernelStringSlice table_path_slice = {table_path, strlen(table_path)}; - - ExternResultHandleSharedExternEngine engine_res = - get_default_engine(table_path_slice, NULL); - if (engine_res.tag != OkHandleSharedExternEngine) { +int main_impl(KernelStringSlice table_path_slice, ExternResultHandleSharedExternEngine engine_res) +{ + if (engine_res.tag != OkHandleSharedExternEngine) + { printf("Failed to get engine\n"); return -1; } @@ -35,7 +25,8 @@ int main(int argc, char* argv[]) { SharedExternEngine *engine = engine_res.ok; ExternResultHandleSharedSnapshot snapshot_res = snapshot(table_path_slice, engine); - if (snapshot_res.tag != OkHandleSharedSnapshot) { + if (snapshot_res.tag != OkHandleSharedSnapshot) + { printf("Failed to create snapshot\n"); return -1; } @@ -45,7 +36,8 @@ int main(int argc, char* argv[]) { uint64_t v = version(snapshot); printf("version: %" PRIu64 "\n", v); ExternResultHandleSharedScan scan_res = scan(snapshot, engine, NULL); - if (scan_res.tag != OkHandleSharedScan) { + if (scan_res.tag != OkHandleSharedScan) + { printf("Failed to create scan\n"); return -1; } @@ -53,8 +45,9 @@ int main(int argc, char* argv[]) { SharedScan *scan = scan_res.ok; ExternResultHandleSharedScanDataIterator data_iter_res = - kernel_scan_data_init(engine, scan); - if (data_iter_res.tag != OkHandleSharedScanDataIterator) { + kernel_scan_data_init(engine, scan); + if (data_iter_res.tag != OkHandleSharedScanDataIterator) + { printf("Failed to construct scan data iterator\n"); return -1; } @@ -62,12 +55,16 @@ int main(int argc, char* argv[]) { SharedScanDataIterator *data_iter = data_iter_res.ok; // iterate scan files - for (;;) { + for (;;) + { ExternResultbool ok_res = kernel_scan_data_next(data_iter, NULL, visit_data); - if (ok_res.tag != Okbool) { + if (ok_res.tag != Okbool) + { printf("Failed to iterate scan data\n"); return -1; - } else if (!ok_res.ok) { + } + else if (!ok_res.ok) + { break; } } @@ -75,6 +72,27 @@ int main(int argc, char* argv[]) { drop_scan(scan); drop_snapshot(snapshot); drop_engine(engine); +} + +int main(int argc, char *argv[]) +{ + + if (argc < 2) + { + printf("Usage: %s table/path\n", argv[0]); + return -1; + } + + char *table_path = argv[1]; + printf("Reading table at %s\n", table_path); + + KernelStringSlice table_path_slice = {table_path, strlen(table_path)}; + + ExternResultHandleSharedExternEngine default_engine_res = get_default_engine(table_path_slice, NULL); + ExternResultHandleSharedExternEngine sync_engine_res = get_sync_engine(table_path_slice, NULL); + + int res = main_impl(table_path_slice, default_engine_res); + int res = main_impl(table_path_slice, sync_engine_res); - return 0; + return res; } diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index ed913e226..7c7f207d0 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -1,7 +1,7 @@ /// FFI interface for the delta kernel /// /// Exposes that an engine needs to call from C/C++ to interface with kernel -#[cfg(any(feature = "default-engine", feature = "sync-engine"))] +#[cfg(feature = "default-engine")] use std::collections::HashMap; use std::default::Default; use std::os::raw::{c_char, c_void}; @@ -432,13 +432,16 @@ unsafe fn unwrap_and_parse_path_as_url(path: KernelStringSlice) -> DeltaResult, } #[cfg(any(feature = "default-engine", feature = "sync-engine"))] impl EngineBuilder { + #[cfg(feature = "default-engine")] fn set_option(&mut self, key: String, val: String) { self.options.insert(key, val); } @@ -491,15 +494,16 @@ pub unsafe extern "C" fn set_builder_option( builder.set_option(key.unwrap(), value.unwrap()); } -/// Consume the builder and return an engine. After calling, the passed pointer is _no +/// Consume the builder and return a `default` engine. After calling, the passed pointer is _no /// longer valid_. /// +/// /// # Safety /// /// Caller is responsible to pass a valid EngineBuilder pointer, and to not use it again afterwards #[cfg(feature = "default-engine")] #[no_mangle] -pub unsafe extern "C" fn builder_build( +pub unsafe extern "C" fn build_default_engine( builder: *mut EngineBuilder, ) -> ExternResult> { let builder_box = unsafe { Box::from_raw(builder) }; @@ -511,6 +515,21 @@ pub unsafe extern "C" fn builder_build( .into_extern_result(&builder_box.allocate_fn) } +/// Consume the builder and return a `sync` engine. After calling, the passed pointer is _no +/// longer valid_. +/// +/// # Safety +/// +/// Caller is responsible to pass a valid EngineBuilder pointer, and to not use it again afterwards +#[cfg(feature = "sync-engine")] +#[no_mangle] +pub unsafe extern "C" fn build_sync_engine( + builder: *mut EngineBuilder, +) -> ExternResult> { + let builder_box = unsafe { Box::from_raw(builder) }; + get_sync_engine_impl(builder_box.allocate_fn).into_extern_result(&builder_box.allocate_fn) +} + /// # Safety /// /// Caller is responsible for passing a valid path pointer. @@ -533,6 +552,25 @@ fn get_default_default_engine_impl( get_default_engine_impl(url?, Default::default(), allocate_error) } +/// # Safety +/// +/// Caller is responsible for passing a valid path pointer. +#[cfg(feature = "sync-engine")] +#[no_mangle] +pub unsafe extern "C" fn get_sync_engine( + allocate_error: AllocateErrorFn, +) -> ExternResult> { + get_default_sync_engine_impl(allocate_error).into_extern_result(&allocate_error) +} + +// get the default version of the sync engine :^) +#[cfg(feature = "sync-engine")] +fn get_default_sync_engine_impl( + allocate_error: AllocateErrorFn, +) -> DeltaResult> { + get_sync_engine_impl(allocate_error) +} + #[cfg(feature = "default-engine")] fn get_default_engine_impl( url: Url, @@ -553,6 +591,19 @@ fn get_default_engine_impl( Ok(engine.into()) } +#[cfg(feature = "sync-engine")] +fn get_sync_engine_impl( + allocate_error: AllocateErrorFn, +) -> DeltaResult> { + use delta_kernel::engine::sync::SyncEngine; + let engine = SyncEngine::new(); + let engine: Arc = Arc::new(ExternEngineVtable { + engine: Arc::new(engine), + allocate_error, + }); + Ok(engine.into()) +} + /// # Safety /// /// Caller is responsible for passing a valid handle.