diff --git a/CMakeLists.txt b/CMakeLists.txt index 6797b39..4aeaefd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -99,7 +99,7 @@ ExternalProject_Add( GIT_REPOSITORY "https://github.com/delta-incubator/delta-kernel-rs" # WARNING: the FFI headers are currently pinned due to the C linkage issue of the c++ headers. Currently, when bumping # the kernel version, the produced header in ./src/include/delta_kernel_ffi.hpp should be also bumped, applying the fix - GIT_TAG ed2b80b127984481adba8e59879f39b9e5f871d1 + GIT_TAG v0.2.0 # Prints the env variables passed to the cargo build to the terminal, useful in debugging because passing them # through CMake is an error-prone mess CONFIGURE_COMMAND ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} env diff --git a/src/functions/delta_scan.cpp b/src/functions/delta_scan.cpp index e07c391..d4a30fd 100644 --- a/src/functions/delta_scan.cpp +++ b/src/functions/delta_scan.cpp @@ -83,7 +83,7 @@ static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::Kernel context->metadata.back()->partition_map = std::move(constant_map); } - static void visit_data(void *engine_context, ffi::EngineData* engine_data, const struct ffi::KernelBoolSlice selection_vec) { + static void visit_data(void *engine_context, ffi::ExclusiveEngineData* engine_data, const struct ffi::KernelBoolSlice selection_vec) { ffi::visit_scan_data(engine_data, selection_vec, engine_context, visit_callback); } diff --git a/src/include/delta_kernel_ffi.hpp b/src/include/delta_kernel_ffi.hpp index de22390..cf69a8d 100644 --- a/src/include/delta_kernel_ffi.hpp +++ b/src/include/delta_kernel_ffi.hpp @@ -49,15 +49,9 @@ enum class KernelError { InvalidTableLocationError, InvalidDecimalError, InvalidStructDataError, + InternalError, }; -#if defined(DEFINE_DEFAULT_ENGINE) -/// Struct to allow binding to the arrow [C Data -/// Interface](https://arrow.apache.org/docs/format/CDataInterface.html). This includes the data and -/// the schema. -struct ArrowFFIData; -#endif - struct CStringMap; /// this struct can be used by an engine to materialize a selection vector @@ -71,7 +65,9 @@ struct EngineBuilder; /// an opaque struct that encapsulates data read by an engine. this handle can be passed back into /// some kernel calls to operate on the data, or can be converted into the raw data as read by the /// [`delta_kernel::Engine`] by calling [`get_raw_engine_data`] -struct EngineData; +struct ExclusiveEngineData; + +struct ExclusiveFileReadResultIterator; struct KernelExpressionVisitorState; @@ -83,16 +79,55 @@ struct SharedScan; struct SharedScanDataIterator; +struct SharedSchema; + struct SharedSnapshot; +struct StringSliceIterator; + /// Represents an owned slice of boolean values allocated by the kernel. Any time the engine /// receives a `KernelBoolSlice` as a return value from a kernel method, engine is responsible -/// to free that slice, by calling [super::drop_bool_slice] exactly once. +/// to free that slice, by calling [super::free_bool_slice] exactly once. struct KernelBoolSlice { bool *ptr; uintptr_t len; }; +/// Represents an object that crosses the FFI boundary and which outlives the scope that created +/// it. It can be passed freely between rust code and external code. The +/// +/// An accompanying [`HandleDescriptor`] trait defines the behavior of each handle type: +/// +/// * The true underlying ("target") type the handle represents. For safety reasons, target type +/// must always be [`Send`]. +/// +/// * Mutable (`Box`-like) vs. shared (`Arc`-like). For safety reasons, the target type of a +/// shared handle must always be [`Send`]+[`Sync`]. +/// +/// * Sized vs. unsized. Sized types allow handle operations to be implemented more efficiently. +/// +/// # Validity +/// +/// A `Handle` is _valid_ if all of the following hold: +/// +/// * It was created by a call to [`Handle::from`] +/// * Not yet dropped by a call to [`Handle::drop_handle`] +/// * Not yet consumed by a call to [`Handle::into_inner`] +/// +/// Additionally, in keeping with the [`Send`] contract, multi-threaded external code must +/// enforce mutual exclusion -- no mutable handle should ever be passed to more than one kernel +/// API call at a time. If thread races are possible, the handle should be protected with a +/// mutex. Due to Rust [reference +/// rules](https://doc.rust-lang.org/book/ch04-02-references-and-borrowing.html#the-rules-of-references), +/// this requirement applies even for API calls that appear to be read-only (because Rust code +/// always receives the handle as mutable). +/// +/// NOTE: Because the underlying type is always [`Sync`], multi-threaded external code can +/// freely access shared (non-mutable) handles. +/// +template +using Handle = H*; + /// An error that can be returned to the engine. Engines that wish to associate additional /// information can define and use any type that is [pointer /// interconvertible](https://en.cppreference.com/w/cpp/language/static_cast#pointer-interconvertible) @@ -155,40 +190,11 @@ struct KernelStringSlice { using AllocateErrorFn = EngineError*(*)(KernelError etype, KernelStringSlice msg); -/// Represents an object that crosses the FFI boundary and which outlives the scope that created -/// it. It can be passed freely between rust code and external code. The -/// -/// An accompanying [`HandleDescriptor`] trait defines the behavior of each handle type: -/// -/// * The true underlying ("target") type the handle represents. For safety reasons, target type -/// must always be [`Send`]. -/// -/// * Mutable (`Box`-like) vs. shared (`Arc`-like). For safety reasons, the target type of a -/// shared handle must always be [`Send`]+[`Sync`]. -/// -/// * Sized vs. unsized. Sized types allow handle operations to be implemented more efficiently. -/// -/// # Validity -/// -/// A `Handle` is _valid_ if all of the following hold: -/// -/// * It was created by a call to [`Handle::from`] -/// * Not yet dropped by a call to [`Handle::drop_handle`] -/// * Not yet consumed by a call to [`Handle::into_inner`] -/// -/// Additionally, in keeping with the [`Send`] contract, multi-threaded external code must -/// enforce mutual exclusion -- no mutable handle should ever be passed to more than one kernel -/// API call at a time. If thread races are possible, the handle should be protected with a -/// mutex. Due to Rust [reference -/// rules](https://doc.rust-lang.org/book/ch04-02-references-and-borrowing.html#the-rules-of-references), -/// this requirement applies even for API calls that appear to be read-only (because Rust code -/// always receives the handle as mutable). -/// -/// NOTE: Because the underlying type is always [`Sync`], multi-threaded external code can -/// freely access shared (non-mutable) handles. -/// -template -using Handle = H*; +using NullableCvoid = void*; + +/// Allow engines to allocate strings of their own type. the contract of calling a passed allocate +/// function is that `kernel_str` is _only_ valid until the return from this function +using AllocateStringFn = NullableCvoid(*)(KernelStringSlice kernel_str); /// The `EngineSchemaVisitor` defines a visitor system to allow engines to build their own /// representation of a schema from a particular schema within kernel. @@ -283,6 +289,68 @@ struct EngineIterator { const void *(*get_next)(void *data); }; +struct FileMeta { + KernelStringSlice path; + int64_t last_modified; + uintptr_t size; +}; + +/// ABI-compatible struct for ArrowArray from C Data Interface +/// See +/// +/// ``` +/// # use arrow_data::ArrayData; +/// # use arrow_data::ffi::FFI_ArrowArray; +/// fn export_array(array: &ArrayData) -> FFI_ArrowArray { +/// FFI_ArrowArray::new(array) +/// } +/// ``` +struct FFI_ArrowArray { + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void **buffers; + FFI_ArrowArray **children; + FFI_ArrowArray *dictionary; + void (*release)(FFI_ArrowArray *arg1); + void *private_data; +}; + +/// ABI-compatible struct for `ArrowSchema` from C Data Interface +/// See +/// +/// ``` +/// # use arrow_schema::DataType; +/// # use arrow_schema::ffi::FFI_ArrowSchema; +/// fn array_schema(data_type: &DataType) -> FFI_ArrowSchema { +/// FFI_ArrowSchema::try_from(data_type).unwrap() +/// } +/// ``` +/// +struct FFI_ArrowSchema { + const char *format; + const char *name; + const char *metadata; + int64_t flags; + int64_t n_children; + FFI_ArrowSchema **children; + FFI_ArrowSchema *dictionary; + void (*release)(FFI_ArrowSchema *arg1); + void *private_data; +}; + +#if defined(DEFINE_DEFAULT_ENGINE) +/// Struct to allow binding to the arrow [C Data +/// Interface](https://arrow.apache.org/docs/format/CDataInterface.html). This includes the data and +/// the schema. +struct ArrowFFIData { + FFI_ArrowArray array; + FFI_ArrowSchema schema; +}; +#endif + /// A predicate that can be used to skip data when scanning. /// /// When invoking [`scan::scan`], The engine provides a pointer to the (engine's native) predicate, @@ -298,12 +366,6 @@ struct EnginePredicate { uintptr_t (*visitor)(void *predicate, KernelExpressionVisitorState *state); }; -using NullableCvoid = void*; - -/// Allow engines to allocate strings of their own type. the contract of calling a passed allocate -/// function is that `kernel_str` is _only_ valid until the return from this function -using AllocateStringFn = NullableCvoid(*)(KernelStringSlice kernel_str); - /// Give engines an easy way to consume stats struct Stats { /// For any file where the deletion vector is not present (see [`DvInfo::has_vector`]), the @@ -331,7 +393,7 @@ struct im_an_unused_struct_that_tricks_msvc_into_compilation { ExternResult field7; ExternResult> field8; ExternResult> field9; - ExternResult> field10; + ExternResult> field10; }; extern "C" { @@ -339,7 +401,14 @@ extern "C" { /// # Safety /// /// Caller is responsible for passing a valid handle. -void drop_bool_slice(KernelBoolSlice slice); +void free_bool_slice(KernelBoolSlice slice); + +/// Drop an `ExclusiveEngineData`. +/// +/// # Safety +/// +/// Caller is responsible for passing a valid handle as engine_data +void free_engine_data(Handle engine_data); #if defined(DEFINE_DEFAULT_ENGINE) /// Get a "builder" that can be used to construct an engine. The function @@ -390,7 +459,7 @@ ExternResult> get_sync_engine(AllocateErrorFn allocat /// # Safety /// /// Caller is responsible for passing a valid handle. -void drop_engine(Handle engine); +void free_engine(Handle engine); /// Get the latest snapshot from the specified table /// @@ -403,7 +472,7 @@ ExternResult> snapshot(KernelStringSlice path, /// # Safety /// /// Caller is responsible for passing a valid handle. -void drop_snapshot(Handle snapshot); +void free_snapshot(Handle snapshot); /// Get the version of the specified snapshot /// @@ -412,6 +481,27 @@ void drop_snapshot(Handle snapshot); /// Caller is responsible for passing a valid handle. uint64_t version(Handle snapshot); +/// Get the resolved root of the table. This should be used in any future calls that require +/// constructing a path +/// +/// # Safety +/// +/// Caller is responsible for passing a valid handle. +NullableCvoid snapshot_table_root(Handle snapshot, AllocateStringFn allocate_fn); + +/// # Safety +/// +/// The iterator must be valid (returned by [kernel_scan_data_init]) and not yet freed by +/// [kernel_scan_data_free]. The visitor function pointer must be non-null. +bool string_slice_next(Handle data, + NullableCvoid engine_context, + void (*engine_visitor)(NullableCvoid engine_context, KernelStringSlice slice)); + +/// # Safety +/// +/// Caller is responsible for (at most once) passing a valid pointer to a [`StringSliceIterator`] +void free_string_slice_data(Handle data); + /// Visit the schema of the passed `SnapshotHandle`, using the provided `visitor`. See the /// documentation of [`EngineSchemaVisitor`] for a description of how this visitor works. /// @@ -464,14 +554,49 @@ uintptr_t visit_expression_literal_double(KernelExpressionVisitorState *state, d uintptr_t visit_expression_literal_bool(KernelExpressionVisitorState *state, bool value); -/// Allow an engine to "unwrap" an [`EngineData`] into the raw pointer for the case it wants +/// Call the engine back with the next `EngingeData` batch read by Parquet/Json handler. The +/// _engine_ "owns" the data that is passed into the `engine_visitor`, since it is allocated by the +/// `Engine` being used for log-replay. If the engine wants the kernel to free this data, it _must_ +/// call [`free_engine_data`] on it. +/// +/// # Safety +/// +/// The iterator must be valid (returned by [`read_parquet_file`]) and not yet freed by +/// [`free_read_result_iter`]. The visitor function pointer must be non-null. +ExternResult read_result_next(Handle data, + NullableCvoid engine_context, + void (*engine_visitor)(NullableCvoid engine_context, + Handle engine_data)); + +/// Free the memory from the passed read result iterator +/// # Safety +/// +/// Caller is responsible for (at most once) passing a valid pointer returned by a call to +/// [`read_parquet_file`]. +void free_read_result_iter(Handle data); + +/// Use the specified engine's [`delta_kernel::ParquetHandler`] to read the specified file. +/// +/// # Safety +/// Caller is responsible for calling with a valid `ExternEngineHandle` and `FileMeta` +ExternResult> read_parquet_file(Handle engine, + const FileMeta *file, + Handle physical_schema); + +/// Get the number of rows in an engine data +/// +/// # Safety +/// `data_handle` must be a valid pointer to a kernel allocated `ExclusiveEngineData` +uintptr_t engine_data_length(Handle *data); + +/// Allow an engine to "unwrap" an [`ExclusiveEngineData`] into the raw pointer for the case it wants /// to use its own engine data format /// /// # Safety /// -/// `data_handle` must be a valid pointer to a kernel allocated `EngineData`. The Engine must +/// `data_handle` must be a valid pointer to a kernel allocated `ExclusiveEngineData`. The Engine must /// ensure the handle outlives the returned pointer. -void *get_raw_engine_data(Handle data); +void *get_raw_engine_data(Handle data); #if defined(DEFINE_DEFAULT_ENGINE) /// Get an [`ArrowFFIData`] to allow binding to the arrow [C Data @@ -479,16 +604,16 @@ void *get_raw_engine_data(Handle data); /// the schema. /// /// # Safety -/// data_handle must be a valid EngineData as read by the +/// data_handle must be a valid ExclusiveEngineData as read by the /// [`delta_kernel::engine::default::DefaultEngine`] obtained from `get_default_engine`. -ExternResult get_raw_arrow_data(Handle data, +ExternResult get_raw_arrow_data(Handle data, Handle engine); #endif /// Drops a scan. /// # Safety /// Caller is responsible for passing a [valid][Handle#Validity] scan handle. -void drop_scan(Handle scan); +void free_scan(Handle scan); /// Get a [`Scan`] over the table specified by the passed snapshot. /// # Safety @@ -505,10 +630,35 @@ ExternResult> scan(Handle snapshot, /// Engine is responsible for providing a valid scan pointer Handle get_global_scan_state(Handle scan); +/// Get the kernel view of the physical read schema that an engine should read from parquet file in +/// a scan +/// /// # Safety +/// Engine is responsible for providing a valid GlobalScanState pointer +Handle get_global_read_schema(Handle state); + +/// Free a global read schema +/// +/// # Safety +/// Engine is responsible for providing a valid schema obtained via [`get_global_read_schema`] +void free_global_read_schema(Handle schema); + +/// Get a count of the number of partition columns for this scan /// +/// # Safety +/// Caller is responsible for passing a valid global scan pointer. +uintptr_t get_partition_column_count(Handle state); + +/// Get an iterator of the list of partition columns for this scan. +/// +/// # Safety /// Caller is responsible for passing a valid global scan pointer. -void drop_global_scan_state(Handle state); +Handle get_partition_columns(Handle state); + +/// # Safety +/// +/// Caller is responsible for passing a valid global scan state pointer. +void free_global_scan_state(Handle state); /// Get an iterator over the data needed to perform a scan. This will return a /// [`KernelScanDataIterator`] which can be passed to [`kernel_scan_data_next`] to get the actual @@ -523,18 +673,18 @@ ExternResult> kernel_scan_data_init(Handle kernel_scan_data_next(Handle data, NullableCvoid engine_context, void (*engine_visitor)(NullableCvoid engine_context, - Handle engine_data, + Handle engine_data, KernelBoolSlice selection_vector)); /// # Safety /// /// Caller is responsible for (at most once) passing a valid pointer returned by a call to /// [`kernel_scan_data_init`]. -void kernel_scan_data_free(Handle data); +void free_kernel_scan_data(Handle data); /// allow probing into a CStringMap. If the specified key is in the map, kernel will call /// allocate_fn with the value associated with the key and return the value returned from that @@ -559,8 +709,8 @@ ExternResult selection_vector_from_dv(const DvInfo *dv_info, /// data which provides the data handle and selection vector as each element in the iterator. /// /// # Safety -/// engine is responsbile for passing a valid [`EngineData`] and selection vector. -void visit_scan_data(Handle data, +/// engine is responsbile for passing a valid [`ExclusiveEngineData`] and selection vector. +void visit_scan_data(Handle data, KernelBoolSlice selection_vec, NullableCvoid engine_context, CScanCallback callback); diff --git a/src/include/delta_utils.hpp b/src/include/delta_utils.hpp index 37dc289..9b33c5c 100644 --- a/src/include/delta_utils.hpp +++ b/src/include/delta_utils.hpp @@ -102,11 +102,11 @@ struct TemplatedUniqueKernelPointer : public UniqueKernelPointer { }; }; -typedef TemplatedUniqueKernelPointer KernelSnapshot; -typedef TemplatedUniqueKernelPointer KernelExternEngine; -typedef TemplatedUniqueKernelPointer KernelScan; -typedef TemplatedUniqueKernelPointer KernelGlobalScanState; -typedef TemplatedUniqueKernelPointer KernelScanDataIterator; +typedef TemplatedUniqueKernelPointer KernelSnapshot; +typedef TemplatedUniqueKernelPointer KernelExternEngine; +typedef TemplatedUniqueKernelPointer KernelScan; +typedef TemplatedUniqueKernelPointer KernelGlobalScanState; +typedef TemplatedUniqueKernelPointer KernelScanDataIterator; struct KernelUtils { static ffi::KernelStringSlice ToDeltaString(const string &str); diff --git a/src/include/functions/delta_scan.hpp b/src/include/functions/delta_scan.hpp index 07c782b..b4c3c76 100644 --- a/src/include/functions/delta_scan.hpp +++ b/src/include/functions/delta_scan.hpp @@ -22,7 +22,7 @@ struct DeltaFileMetaData { ~DeltaFileMetaData() { if (selection_vector.ptr) { - ffi::drop_bool_slice(selection_vector); + ffi::free_bool_slice(selection_vector); } } diff --git a/test/sql/dat/all.test b/test/sql/dat/all.test index f6332da..fc7c6c5 100644 --- a/test/sql/dat/all.test +++ b/test/sql/dat/all.test @@ -53,22 +53,22 @@ FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/basic_append/expected/ ---- # with_schema_change -query I rowsort with_checkpoint +query I rowsort with_schema_change SELECT * FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/with_schema_change/delta') ---- -query I rowsort with_checkpoint +query I rowsort with_schema_change SELECT * FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/with_schema_change/expected/latest/**/*.parquet') ---- -query I rowsort with_checkpoint_count +query I rowsort with_schema_change_count SELECT count(*) FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/with_schema_change/delta') ---- -query I rowsort with_checkpoint_count +query I rowsort with_schema_change_count SELECT count(*) FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/with_schema_change/expected/latest/**/*.parquet') ---- @@ -129,11 +129,6 @@ SELECT * FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/multi_partitioned_2/expected/latest/**/*.parquet') ---- -### FAILING DAT TESTS - -# TODO fix all of these -mode skip - # no_replay query I rowsort no_replay SELECT *