diff --git a/CMakeLists.txt b/CMakeLists.txt index 88b8ac7..90d6e8b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -65,10 +65,9 @@ endif() ExternalProject_Add( ${KERNEL_NAME} GIT_REPOSITORY "https://github.com/delta-incubator/delta-kernel-rs" - GIT_TAG e49bf519fb2a01f1b3eb78ef53d50fc70bda3142 + GIT_TAG 08f0764a00e89f42136fd478823d28278adc7ee8 CONFIGURE_COMMAND "" UPDATE_COMMAND "" - PATCH_COMMAND python3 ${CMAKE_SOURCE_DIR}/../scripts/apply_patches.py ${CMAKE_SOURCE_DIR}/../patches/delta_kernel_rs BUILD_IN_SOURCE 1 # Build debug build BUILD_COMMAND cargo build --package delta_kernel_ffi --all-features --target=${RUST_PLATFORM_TARGET} diff --git a/patches/delta_kernel_rs/enable_s3.patch b/patches/delta_kernel_rs/enable_s3.patch deleted file mode 100644 index 9935eb2..0000000 --- a/patches/delta_kernel_rs/enable_s3.patch +++ /dev/null @@ -1,13 +0,0 @@ -diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml -index bc183f6..5f07527 100644 ---- a/kernel/Cargo.toml -+++ b/kernel/Cargo.toml -@@ -44,7 +44,7 @@ arrow-json = { version = "^49.0", optional = true } - arrow-ord = { version = "^49.0", optional = true } - arrow-schema = { version = "^49.0", optional = true } - futures = { version = "0.3", optional = true } --object_store = { version = "^0.8.0", optional = true } -+object_store = { version = "^0.8.0", optional = true, features = [ "aws" ]} - # Used in default and sync engine - parquet = { version = "^49.0", optional = true } - # Used for fetching direct urls (like pre-signed urls) diff --git a/src/functions/delta_scan.cpp b/src/functions/delta_scan.cpp index 5873715..2febd95 100644 --- a/src/functions/delta_scan.cpp +++ b/src/functions/delta_scan.cpp @@ -32,7 +32,7 @@ static void* allocate_string(const struct ffi::KernelStringSlice slice) { return new string(slice.ptr, slice.len); } -static void visit_callback(ffi::NullableCvoid engine_context, const struct ffi::KernelStringSlice path, int64_t size, const ffi::DvInfo *dv_info, struct ffi::CStringMap *partition_values) { +static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size, const ffi::DvInfo *dv_info, const struct ffi::CStringMap *partition_values) { auto context = (DeltaSnapshot *) engine_context; auto path_string = context->GetPath() + "/" + from_delta_string_slice(path); @@ -49,7 +49,7 @@ static void visit_callback(ffi::NullableCvoid engine_context, const struct ffi:: context->metadata.back()->file_number = context->resolved_files.size() - 1; // Fetch the deletion vector - auto selection_vector_res = ffi::selection_vector_from_dv(dv_info, context->table_client, context->global_state); + auto selection_vector_res = ffi::selection_vector_from_dv(dv_info, context->extern_engine, context->global_state); auto selection_vector = unpack_result_or_throw(selection_vector_res, "selection_vector_from_dv for path " + context->GetPath()); if (selection_vector.ptr) { context->metadata.back()->selection_vector = selection_vector; @@ -69,7 +69,7 @@ static void visit_callback(ffi::NullableCvoid engine_context, const struct ffi:: context->metadata.back()->partition_map = std::move(constant_map); } -static void visit_data(void *engine_context, struct ffi::EngineDataHandle *engine_data, const struct ffi::KernelBoolSlice selection_vec) { + static void visit_data(void *engine_context, ffi::EngineData* engine_data, const struct ffi::KernelBoolSlice selection_vec) { // printf("Got some data\n"); // printf(" Of this data, here is a selection vector\n"); // print_selection_vector(" ", &selection_vec); @@ -208,19 +208,19 @@ void DeltaSnapshot::InitializeFiles() { auto interface_builder = CreateBuilder(context, paths[0]); auto engine_interface_res = ffi::builder_build(interface_builder); - table_client = unpack_result_or_throw(engine_interface_res, "get_default_client in DeltaScanScanBind"); + extern_engine = unpack_result_or_throw(engine_interface_res, "get_default_client in DeltaScanScanBind"); // Alternatively we can do the default client like so: -// auto table_client_res = ffi::get_default_client(path_slice, error_allocator); -// table_client = unpack_result_or_throw(table_client_res, "get_default_client in DeltaScanScanBind"); +// auto extern_engine_res = ffi::get_default_client(path_slice, error_allocator); +// extern_engine = unpack_result_or_throw(extern_engine_res, "get_default_client in DeltaScanScanBind"); // Initialize Snapshot - auto snapshot_res = ffi::snapshot(path_slice, table_client); + auto snapshot_res = ffi::snapshot(path_slice, extern_engine); snapshot = unpack_result_or_throw(snapshot_res, "snapshot in DeltaScanScanBind"); PredicateVisitor visitor(names, &table_filters); - auto scan_res = ffi::scan(snapshot, table_client, &visitor); + auto scan_res = ffi::scan(snapshot, extern_engine, &visitor); scan = unpack_result_or_throw(scan_res, "scan in DeltaScanScanBind"); global_state = ffi::get_global_scan_state(scan); @@ -228,7 +228,7 @@ void DeltaSnapshot::InitializeFiles() { // Set version this->version = ffi::version(snapshot); - auto scan_iterator_res = ffi::kernel_scan_data_init(table_client, scan); + auto scan_iterator_res = ffi::kernel_scan_data_init(extern_engine, scan); scan_data_iterator = { unpack_result_or_throw(scan_iterator_res, "kernel_scan_data_init in InitFiles"), ffi::kernel_scan_data_free diff --git a/src/include/delta_utils.hpp b/src/include/delta_utils.hpp index e7b8450..461668a 100644 --- a/src/include/delta_utils.hpp +++ b/src/include/delta_utils.hpp @@ -13,14 +13,14 @@ class SchemaVisitor { public: using FieldList = child_list_t; - static unique_ptr VisitSnapshotSchema(const ffi::SnapshotHandle* snapshot) { + static unique_ptr VisitSnapshotSchema(ffi::SharedSnapshot* snapshot) { SchemaVisitor state; ffi::EngineSchemaVisitor visitor; visitor.data = &state; visitor.make_field_list = (uintptr_t (*)(void*, uintptr_t)) &MakeFieldList; visitor.visit_struct = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, uintptr_t)) &VisitStruct; - visitor.visit_decimal = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, uint8_t , int8_t)) &VisitDecimal; + visitor.visit_decimal = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, uint8_t , uint8_t)) &VisitDecimal; visitor.visit_string = VisitSimpleType(); visitor.visit_long = VisitSimpleType(); visitor.visit_integer = VisitSimpleType(); @@ -53,7 +53,7 @@ class SchemaVisitor { state->AppendToList(sibling_list_id, name, TypeId); } - static void VisitDecimal(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, uint8_t precision, int8_t scale) { + static void VisitDecimal(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, uint8_t precision, uint8_t scale) { state->AppendToList(sibling_list_id, name, LogicalType::DECIMAL(precision, scale)); } diff --git a/src/include/functions/delta_scan.hpp b/src/include/functions/delta_scan.hpp index 8e40019..980f0fa 100644 --- a/src/include/functions/delta_scan.hpp +++ b/src/include/functions/delta_scan.hpp @@ -62,11 +62,11 @@ struct DeltaSnapshot : public MultiFileList { idx_t version; //! Delta Kernel Structures - const ffi::SnapshotHandle *snapshot; - const ffi::ExternEngineHandle *table_client; - ffi::Scan* scan; - ffi::GlobalScanState *global_state; - UniqueKernelPointer scan_data_iterator; + ffi::SharedSnapshot* snapshot; + ffi::SharedExternEngine* extern_engine; + ffi::SharedScan* scan; + ffi::SharedGlobalScanState* global_state; + UniqueKernelPointer scan_data_iterator; //! Names vector names;