Skip to content

Commit

Permalink
FFI Sync Engine changes
Browse files Browse the repository at this point in the history
  • Loading branch information
abrassel committed May 24, 2024
1 parent 1b855dd commit 5ff982e
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 30 deletions.
2 changes: 1 addition & 1 deletion ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ libc = "0.2.147"
rand = "0.8.5"

[features]
default = ["default-engine"]
default = ["default-engine", "sync-engine"]
default-engine = [
"delta_kernel/default-engine",
"arrow-array",
Expand Down
70 changes: 44 additions & 26 deletions ffi/cffi-test.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,29 @@

#include "delta_kernel_ffi.h"

void visit_callback(void* engine_context, KernelStringSlice path, int64_t size, const DvInfo *dv_info, const CStringMap *partition_values) {
void visit_callback(void *engine_context, KernelStringSlice path, int64_t size, const DvInfo *dv_info, const CStringMap *partition_values)
{
printf("file: %.*s\n", (int)path.len, path.ptr);
}


void visit_data(void *engine_context, EngineData *engine_data, const KernelBoolSlice selection_vec) {
void visit_data(void *engine_context, EngineData *engine_data, const KernelBoolSlice selection_vec)
{
visit_scan_data(engine_data, selection_vec, engine_context, visit_callback);
}

int main(int argc, char* argv[]) {

if (argc < 2) {
printf("Usage: %s table/path\n", argv[0]);
return -1;
}

char* table_path = argv[1];
printf("Reading table at %s\n", table_path);

KernelStringSlice table_path_slice = {table_path, strlen(table_path)};

ExternResultHandleSharedExternEngine engine_res =
get_default_engine(table_path_slice, NULL);
if (engine_res.tag != OkHandleSharedExternEngine) {
int main_impl(KernelStringSlice table_path_slice, ExternResultHandleSharedExternEngine engine_res)
{
if (engine_res.tag != OkHandleSharedExternEngine)
{
printf("Failed to get engine\n");
return -1;
}

SharedExternEngine *engine = engine_res.ok;

ExternResultHandleSharedSnapshot snapshot_res = snapshot(table_path_slice, engine);
if (snapshot_res.tag != OkHandleSharedSnapshot) {
if (snapshot_res.tag != OkHandleSharedSnapshot)
{
printf("Failed to create snapshot\n");
return -1;
}
Expand All @@ -45,36 +36,63 @@ int main(int argc, char* argv[]) {
uint64_t v = version(snapshot);
printf("version: %" PRIu64 "\n", v);
ExternResultHandleSharedScan scan_res = scan(snapshot, engine, NULL);
if (scan_res.tag != OkHandleSharedScan) {
if (scan_res.tag != OkHandleSharedScan)
{
printf("Failed to create scan\n");
return -1;
}

SharedScan *scan = scan_res.ok;

ExternResultHandleSharedScanDataIterator data_iter_res =
kernel_scan_data_init(engine, scan);
if (data_iter_res.tag != OkHandleSharedScanDataIterator) {
kernel_scan_data_init(engine, scan);
if (data_iter_res.tag != OkHandleSharedScanDataIterator)
{
printf("Failed to construct scan data iterator\n");
return -1;
}

SharedScanDataIterator *data_iter = data_iter_res.ok;

// iterate scan files
for (;;) {
for (;;)
{
ExternResultbool ok_res = kernel_scan_data_next(data_iter, NULL, visit_data);
if (ok_res.tag != Okbool) {
if (ok_res.tag != Okbool)
{
printf("Failed to iterate scan data\n");
return -1;
} else if (!ok_res.ok) {
}
else if (!ok_res.ok)
{
break;
}
}

drop_scan(scan);
drop_snapshot(snapshot);
drop_engine(engine);
}

int main(int argc, char *argv[])
{

if (argc < 2)
{
printf("Usage: %s table/path\n", argv[0]);
return -1;
}

char *table_path = argv[1];
printf("Reading table at %s\n", table_path);

KernelStringSlice table_path_slice = {table_path, strlen(table_path)};

ExternResultHandleSharedExternEngine default_engine_res = get_default_engine(table_path_slice, NULL);
ExternResultHandleSharedExternEngine sync_engine_res = get_sync_engine(table_path_slice, NULL);

int res = main_impl(table_path_slice, default_engine_res);
int res = main_impl(table_path_slice, sync_engine_res);

return 0;
return res;
}
57 changes: 54 additions & 3 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/// FFI interface for the delta kernel
///
/// Exposes that an engine needs to call from C/C++ to interface with kernel
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
#[cfg(feature = "default-engine")]
use std::collections::HashMap;
use std::default::Default;
use std::os::raw::{c_char, c_void};
Expand Down Expand Up @@ -432,13 +432,16 @@ unsafe fn unwrap_and_parse_path_as_url(path: KernelStringSlice) -> DeltaResult<U
/// A builder that allows setting options on the `Engine` before actually building it
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
pub struct EngineBuilder {
#[cfg(feature = "default-engine")]
url: Url,
allocate_fn: AllocateErrorFn,
#[cfg(feature = "default-engine")]
options: HashMap<String, String>,
}

#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
impl EngineBuilder {
#[cfg(feature = "default-engine")]
fn set_option(&mut self, key: String, val: String) {
self.options.insert(key, val);
}
Expand Down Expand Up @@ -491,15 +494,16 @@ pub unsafe extern "C" fn set_builder_option(
builder.set_option(key.unwrap(), value.unwrap());
}

/// Consume the builder and return an engine. After calling, the passed pointer is _no
/// Consume the builder and return a `default` engine. After calling, the passed pointer is _no
/// longer valid_.
///
///
/// # Safety
///
/// Caller is responsible to pass a valid EngineBuilder pointer, and to not use it again afterwards
#[cfg(feature = "default-engine")]
#[no_mangle]
pub unsafe extern "C" fn builder_build(
pub unsafe extern "C" fn build_default_engine(
builder: *mut EngineBuilder,
) -> ExternResult<Handle<SharedExternEngine>> {
let builder_box = unsafe { Box::from_raw(builder) };
Expand All @@ -511,6 +515,21 @@ pub unsafe extern "C" fn builder_build(
.into_extern_result(&builder_box.allocate_fn)
}

/// Consume the builder and return a `sync` engine. After calling, the passed pointer is _no
/// longer valid_.
///
/// # Safety
///
/// Caller is responsible to pass a valid EngineBuilder pointer, and to not use it again afterwards
#[cfg(feature = "sync-engine")]
#[no_mangle]
pub unsafe extern "C" fn build_sync_engine(
builder: *mut EngineBuilder,
) -> ExternResult<Handle<SharedExternEngine>> {
let builder_box = unsafe { Box::from_raw(builder) };
get_sync_engine_impl(builder_box.allocate_fn).into_extern_result(&builder_box.allocate_fn)
}

/// # Safety
///
/// Caller is responsible for passing a valid path pointer.
Expand All @@ -533,6 +552,25 @@ fn get_default_default_engine_impl(
get_default_engine_impl(url?, Default::default(), allocate_error)
}

/// # Safety
///
/// Caller is responsible for passing a valid path pointer.
#[cfg(feature = "sync-engine")]
#[no_mangle]
pub unsafe extern "C" fn get_sync_engine(
allocate_error: AllocateErrorFn,
) -> ExternResult<Handle<SharedExternEngine>> {
get_default_sync_engine_impl(allocate_error).into_extern_result(&allocate_error)
}

// get the default version of the sync engine :^)
#[cfg(feature = "sync-engine")]
fn get_default_sync_engine_impl(
allocate_error: AllocateErrorFn,
) -> DeltaResult<Handle<SharedExternEngine>> {
get_sync_engine_impl(allocate_error)
}

#[cfg(feature = "default-engine")]
fn get_default_engine_impl(
url: Url,
Expand All @@ -553,6 +591,19 @@ fn get_default_engine_impl(
Ok(engine.into())
}

#[cfg(feature = "sync-engine")]
fn get_sync_engine_impl(
allocate_error: AllocateErrorFn,
) -> DeltaResult<Handle<SharedExternEngine>> {
use delta_kernel::engine::sync::SyncEngine;
let engine = SyncEngine::new();
let engine: Arc<dyn ExternEngine> = Arc::new(ExternEngineVtable {
engine: Arc::new(engine),
allocate_error,
});
Ok(engine.into())
}

/// # Safety
///
/// Caller is responsible for passing a valid handle.
Expand Down

0 comments on commit 5ff982e

Please sign in to comment.