diff --git a/CMakeLists.txt b/CMakeLists.txt index 6797b39..4aeaefd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -99,7 +99,7 @@ ExternalProject_Add( GIT_REPOSITORY "https://github.com/delta-incubator/delta-kernel-rs" # WARNING: the FFI headers are currently pinned due to the C linkage issue of the c++ headers. Currently, when bumping # the kernel version, the produced header in ./src/include/delta_kernel_ffi.hpp should be also bumped, applying the fix - GIT_TAG ed2b80b127984481adba8e59879f39b9e5f871d1 + GIT_TAG v0.2.0 # Prints the env variables passed to the cargo build to the terminal, useful in debugging because passing them # through CMake is an error-prone mess CONFIGURE_COMMAND ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} env diff --git a/src/functions/delta_scan.cpp b/src/functions/delta_scan.cpp index e07c391..d4a30fd 100644 --- a/src/functions/delta_scan.cpp +++ b/src/functions/delta_scan.cpp @@ -83,7 +83,7 @@ static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::Kernel context->metadata.back()->partition_map = std::move(constant_map); } - static void visit_data(void *engine_context, ffi::EngineData* engine_data, const struct ffi::KernelBoolSlice selection_vec) { + static void visit_data(void *engine_context, ffi::ExclusiveEngineData* engine_data, const struct ffi::KernelBoolSlice selection_vec) { ffi::visit_scan_data(engine_data, selection_vec, engine_context, visit_callback); } diff --git a/src/include/delta_kernel_ffi.hpp b/src/include/delta_kernel_ffi.hpp index de22390..15db00d 100644 --- a/src/include/delta_kernel_ffi.hpp +++ b/src/include/delta_kernel_ffi.hpp @@ -49,15 +49,9 @@ enum class KernelError { InvalidTableLocationError, InvalidDecimalError, InvalidStructDataError, + InternalError, }; -#if defined(DEFINE_DEFAULT_ENGINE) -/// Struct to allow binding to the arrow [C Data -/// Interface](https://arrow.apache.org/docs/format/CDataInterface.html). This includes the data and -/// the schema. -struct ArrowFFIData; -#endif - struct CStringMap; /// this struct can be used by an engine to materialize a selection vector @@ -71,7 +65,9 @@ struct EngineBuilder; /// an opaque struct that encapsulates data read by an engine. this handle can be passed back into /// some kernel calls to operate on the data, or can be converted into the raw data as read by the /// [`delta_kernel::Engine`] by calling [`get_raw_engine_data`] -struct EngineData; +struct ExclusiveEngineData; + +struct ExclusiveFileReadResultIterator; struct KernelExpressionVisitorState; @@ -83,16 +79,55 @@ struct SharedScan; struct SharedScanDataIterator; +struct SharedSchema; + struct SharedSnapshot; +struct StringSliceIterator; + /// Represents an owned slice of boolean values allocated by the kernel. Any time the engine /// receives a `KernelBoolSlice` as a return value from a kernel method, engine is responsible -/// to free that slice, by calling [super::drop_bool_slice] exactly once. +/// to free that slice, by calling [super::free_bool_slice] exactly once. struct KernelBoolSlice { bool *ptr; uintptr_t len; }; +/// Represents an object that crosses the FFI boundary and which outlives the scope that created +/// it. It can be passed freely between rust code and external code. The +/// +/// An accompanying [`HandleDescriptor`] trait defines the behavior of each handle type: +/// +/// * The true underlying ("target") type the handle represents. For safety reasons, target type +/// must always be [`Send`]. +/// +/// * Mutable (`Box`-like) vs. shared (`Arc`-like). For safety reasons, the target type of a +/// shared handle must always be [`Send`]+[`Sync`]. +/// +/// * Sized vs. unsized. Sized types allow handle operations to be implemented more efficiently. +/// +/// # Validity +/// +/// A `Handle` is _valid_ if all of the following hold: +/// +/// * It was created by a call to [`Handle::from`] +/// * Not yet dropped by a call to [`Handle::drop_handle`] +/// * Not yet consumed by a call to [`Handle::into_inner`] +/// +/// Additionally, in keeping with the [`Send`] contract, multi-threaded external code must +/// enforce mutual exclusion -- no mutable handle should ever be passed to more than one kernel +/// API call at a time. If thread races are possible, the handle should be protected with a +/// mutex. Due to Rust [reference +/// rules](https://doc.rust-lang.org/book/ch04-02-references-and-borrowing.html#the-rules-of-references), +/// this requirement applies even for API calls that appear to be read-only (because Rust code +/// always receives the handle as mutable). +/// +/// NOTE: Because the underlying type is always [`Sync`], multi-threaded external code can +/// freely access shared (non-mutable) handles. +/// +template<typename H> +using Handle = H*; + /// An error that can be returned to the engine. Engines that wish to associate additional /// information can define and use any type that is [pointer /// interconvertible](https://en.cppreference.com/w/cpp/language/static_cast#pointer-interconvertible) @@ -155,40 +190,11 @@ struct KernelStringSlice { using AllocateErrorFn = EngineError*(*)(KernelError etype, KernelStringSlice msg); -/// Represents an object that crosses the FFI boundary and which outlives the scope that created -/// it. It can be passed freely between rust code and external code. The -/// -/// An accompanying [`HandleDescriptor`] trait defines the behavior of each handle type: -/// -/// * The true underlying ("target") type the handle represents. For safety reasons, target type -/// must always be [`Send`]. -/// -/// * Mutable (`Box`-like) vs. shared (`Arc`-like). For safety reasons, the target type of a -/// shared handle must always be [`Send`]+[`Sync`]. -/// -/// * Sized vs. unsized. Sized types allow handle operations to be implemented more efficiently. -/// -/// # Validity -/// -/// A `Handle` is _valid_ if all of the following hold: -/// -/// * It was created by a call to [`Handle::from`] -/// * Not yet dropped by a call to [`Handle::drop_handle`] -/// * Not yet consumed by a call to [`Handle::into_inner`] -/// -/// Additionally, in keeping with the [`Send`] contract, multi-threaded external code must -/// enforce mutual exclusion -- no mutable handle should ever be passed to more than one kernel -/// API call at a time. If thread races are possible, the handle should be protected with a -/// mutex. Due to Rust [reference -/// rules](https://doc.rust-lang.org/book/ch04-02-references-and-borrowing.html#the-rules-of-references), -/// this requirement applies even for API calls that appear to be read-only (because Rust code -/// always receives the handle as mutable). -/// -/// NOTE: Because the underlying type is always [`Sync`], multi-threaded external code can -/// freely access shared (non-mutable) handles. -/// -template<typename H> -using Handle = H*; +using NullableCvoid = void*; + +/// Allow engines to allocate strings of their own type. the contract of calling a passed allocate +/// function is that `kernel_str` is _only_ valid until the return from this function +using AllocateStringFn = NullableCvoid(*)(KernelStringSlice kernel_str); /// The `EngineSchemaVisitor` defines a visitor system to allow engines to build their own /// representation of a schema from a particular schema within kernel. @@ -283,6 +289,68 @@ struct EngineIterator { const void *(*get_next)(void *data); }; +struct FileMeta { + KernelStringSlice path; + int64_t last_modified; + uintptr_t size; +}; + +/// ABI-compatible struct for ArrowArray from C Data Interface +/// See <https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions> +/// +/// ``` +/// # use arrow_data::ArrayData; +/// # use arrow_data::ffi::FFI_ArrowArray; +/// fn export_array(array: &ArrayData) -> FFI_ArrowArray { +/// FFI_ArrowArray::new(array) +/// } +/// ``` +struct FFI_ArrowArray { + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void **buffers; + FFI_ArrowArray **children; + FFI_ArrowArray *dictionary; + void (*release)(FFI_ArrowArray *arg1); + void *private_data; +}; + +/// ABI-compatible struct for `ArrowSchema` from C Data Interface +/// See <https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions> +/// +/// ``` +/// # use arrow_schema::DataType; +/// # use arrow_schema::ffi::FFI_ArrowSchema; +/// fn array_schema(data_type: &DataType) -> FFI_ArrowSchema { +/// FFI_ArrowSchema::try_from(data_type).unwrap() +/// } +/// ``` +/// +struct FFI_ArrowSchema { + const char *format; + const char *name; + const char *metadata; + int64_t flags; + int64_t n_children; + FFI_ArrowSchema **children; + FFI_ArrowSchema *dictionary; + void (*release)(FFI_ArrowSchema *arg1); + void *private_data; +}; + +#if defined(DEFINE_DEFAULT_ENGINE) +/// Struct to allow binding to the arrow [C Data +/// Interface](https://arrow.apache.org/docs/format/CDataInterface.html). This includes the data and +/// the schema. +struct ArrowFFIData { + FFI_ArrowArray array; + FFI_ArrowSchema schema; +}; +#endif + /// A predicate that can be used to skip data when scanning. /// /// When invoking [`scan::scan`], The engine provides a pointer to the (engine's native) predicate, @@ -298,12 +366,6 @@ struct EnginePredicate { uintptr_t (*visitor)(void *predicate, KernelExpressionVisitorState *state); }; -using NullableCvoid = void*; - -/// Allow engines to allocate strings of their own type. the contract of calling a passed allocate -/// function is that `kernel_str` is _only_ valid until the return from this function -using AllocateStringFn = NullableCvoid(*)(KernelStringSlice kernel_str); - /// Give engines an easy way to consume stats struct Stats { /// For any file where the deletion vector is not present (see [`DvInfo::has_vector`]), the @@ -339,7 +401,14 @@ extern "C" { /// # Safety /// /// Caller is responsible for passing a valid handle. -void drop_bool_slice(KernelBoolSlice slice); +void free_bool_slice(KernelBoolSlice slice); + +/// Drop an `ExclusiveEngineData`. +/// +/// # Safety +/// +/// Caller is responsible for passing a valid handle as engine_data +void free_engine_data(Handle<ExclusiveEngineData> engine_data); #if defined(DEFINE_DEFAULT_ENGINE) /// Get a "builder" that can be used to construct an engine. The function @@ -390,7 +459,7 @@ ExternResult<Handle<SharedExternEngine>> get_sync_engine(AllocateErrorFn allocat /// # Safety /// /// Caller is responsible for passing a valid handle. -void drop_engine(Handle<SharedExternEngine> engine); +void free_engine(Handle<SharedExternEngine> engine); /// Get the latest snapshot from the specified table /// @@ -403,7 +472,7 @@ ExternResult<Handle<SharedSnapshot>> snapshot(KernelStringSlice path, /// # Safety /// /// Caller is responsible for passing a valid handle. -void drop_snapshot(Handle<SharedSnapshot> snapshot); +void free_snapshot(Handle<SharedSnapshot> snapshot); /// Get the version of the specified snapshot /// @@ -412,6 +481,27 @@ void drop_snapshot(Handle<SharedSnapshot> snapshot); /// Caller is responsible for passing a valid handle. uint64_t version(Handle<SharedSnapshot> snapshot); +/// Get the resolved root of the table. This should be used in any future calls that require +/// constructing a path +/// +/// # Safety +/// +/// Caller is responsible for passing a valid handle. +NullableCvoid snapshot_table_root(Handle<SharedSnapshot> snapshot, AllocateStringFn allocate_fn); + +/// # Safety +/// +/// The iterator must be valid (returned by [kernel_scan_data_init]) and not yet freed by +/// [kernel_scan_data_free]. The visitor function pointer must be non-null. +bool string_slice_next(Handle<StringSliceIterator> data, + NullableCvoid engine_context, + void (*engine_visitor)(NullableCvoid engine_context, KernelStringSlice slice)); + +/// # Safety +/// +/// Caller is responsible for (at most once) passing a valid pointer to a [`StringSliceIterator`] +void free_string_slice_data(Handle<StringSliceIterator> data); + /// Visit the schema of the passed `SnapshotHandle`, using the provided `visitor`. See the /// documentation of [`EngineSchemaVisitor`] for a description of how this visitor works. /// @@ -464,14 +554,49 @@ uintptr_t visit_expression_literal_double(KernelExpressionVisitorState *state, d uintptr_t visit_expression_literal_bool(KernelExpressionVisitorState *state, bool value); -/// Allow an engine to "unwrap" an [`EngineData`] into the raw pointer for the case it wants +/// Call the engine back with the next `EngingeData` batch read by Parquet/Json handler. The +/// _engine_ "owns" the data that is passed into the `engine_visitor`, since it is allocated by the +/// `Engine` being used for log-replay. If the engine wants the kernel to free this data, it _must_ +/// call [`free_engine_data`] on it. +/// +/// # Safety +/// +/// The iterator must be valid (returned by [`read_parquet_file`]) and not yet freed by +/// [`free_read_result_iter`]. The visitor function pointer must be non-null. +ExternResult<bool> read_result_next(Handle<ExclusiveFileReadResultIterator> data, + NullableCvoid engine_context, + void (*engine_visitor)(NullableCvoid engine_context, + Handle<ExclusiveEngineData> engine_data)); + +/// Free the memory from the passed read result iterator +/// # Safety +/// +/// Caller is responsible for (at most once) passing a valid pointer returned by a call to +/// [`read_parquet_file`]. +void free_read_result_iter(Handle<ExclusiveFileReadResultIterator> data); + +/// Use the specified engine's [`delta_kernel::ParquetHandler`] to read the specified file. +/// +/// # Safety +/// Caller is responsible for calling with a valid `ExternEngineHandle` and `FileMeta` +ExternResult<Handle<ExclusiveFileReadResultIterator>> read_parquet_file(Handle<SharedExternEngine> engine, + const FileMeta *file, + Handle<SharedSchema> physical_schema); + +/// Get the number of rows in an engine data +/// +/// # Safety +/// `data_handle` must be a valid pointer to a kernel allocated `ExclusiveEngineData` +uintptr_t engine_data_length(Handle<ExclusiveEngineData> *data); + +/// Allow an engine to "unwrap" an [`ExclusiveEngineData`] into the raw pointer for the case it wants /// to use its own engine data format /// /// # Safety /// -/// `data_handle` must be a valid pointer to a kernel allocated `EngineData`. The Engine must +/// `data_handle` must be a valid pointer to a kernel allocated `ExclusiveEngineData`. The Engine must /// ensure the handle outlives the returned pointer. -void *get_raw_engine_data(Handle<EngineData> data); +void *get_raw_engine_data(Handle<ExclusiveEngineData> data); #if defined(DEFINE_DEFAULT_ENGINE) /// Get an [`ArrowFFIData`] to allow binding to the arrow [C Data @@ -479,16 +604,16 @@ void *get_raw_engine_data(Handle<EngineData> data); /// the schema. /// /// # Safety -/// data_handle must be a valid EngineData as read by the +/// data_handle must be a valid ExclusiveEngineData as read by the /// [`delta_kernel::engine::default::DefaultEngine`] obtained from `get_default_engine`. -ExternResult<ArrowFFIData*> get_raw_arrow_data(Handle<EngineData> data, +ExternResult<ArrowFFIData*> get_raw_arrow_data(Handle<ExclusiveEngineData> data, Handle<SharedExternEngine> engine); #endif /// Drops a scan. /// # Safety /// Caller is responsible for passing a [valid][Handle#Validity] scan handle. -void drop_scan(Handle<SharedScan> scan); +void free_scan(Handle<SharedScan> scan); /// Get a [`Scan`] over the table specified by the passed snapshot. /// # Safety @@ -505,10 +630,35 @@ ExternResult<Handle<SharedScan>> scan(Handle<SharedSnapshot> snapshot, /// Engine is responsible for providing a valid scan pointer Handle<SharedGlobalScanState> get_global_scan_state(Handle<SharedScan> scan); +/// Get the kernel view of the physical read schema that an engine should read from parquet file in +/// a scan +/// /// # Safety +/// Engine is responsible for providing a valid GlobalScanState pointer +Handle<SharedSchema> get_global_read_schema(Handle<SharedGlobalScanState> state); + +/// Free a global read schema +/// +/// # Safety +/// Engine is responsible for providing a valid schema obtained via [`get_global_read_schema`] +void free_global_read_schema(Handle<SharedSchema> schema); + +/// Get a count of the number of partition columns for this scan /// +/// # Safety +/// Caller is responsible for passing a valid global scan pointer. +uintptr_t get_partition_column_count(Handle<SharedGlobalScanState> state); + +/// Get an iterator of the list of partition columns for this scan. +/// +/// # Safety /// Caller is responsible for passing a valid global scan pointer. -void drop_global_scan_state(Handle<SharedGlobalScanState> state); +Handle<StringSliceIterator> get_partition_columns(Handle<SharedGlobalScanState> state); + +/// # Safety +/// +/// Caller is responsible for passing a valid global scan state pointer. +void free_global_scan_state(Handle<SharedGlobalScanState> state); /// Get an iterator over the data needed to perform a scan. This will return a /// [`KernelScanDataIterator`] which can be passed to [`kernel_scan_data_next`] to get the actual @@ -523,18 +673,18 @@ ExternResult<Handle<SharedScanDataIterator>> kernel_scan_data_init(Handle<Shared /// # Safety /// /// The iterator must be valid (returned by [kernel_scan_data_init]) and not yet freed by -/// [kernel_scan_data_free]. The visitor function pointer must be non-null. +/// [`free_kernel_scan_data`]. The visitor function pointer must be non-null. ExternResult<bool> kernel_scan_data_next(Handle<SharedScanDataIterator> data, NullableCvoid engine_context, void (*engine_visitor)(NullableCvoid engine_context, - Handle<EngineData> engine_data, + Handle<ExclusiveEngineData> engine_data, KernelBoolSlice selection_vector)); /// # Safety /// /// Caller is responsible for (at most once) passing a valid pointer returned by a call to /// [`kernel_scan_data_init`]. -void kernel_scan_data_free(Handle<SharedScanDataIterator> data); +void free_kernel_scan_data(Handle<SharedScanDataIterator> data); /// allow probing into a CStringMap. If the specified key is in the map, kernel will call /// allocate_fn with the value associated with the key and return the value returned from that @@ -559,8 +709,8 @@ ExternResult<KernelBoolSlice> selection_vector_from_dv(const DvInfo *dv_info, /// data which provides the data handle and selection vector as each element in the iterator. /// /// # Safety -/// engine is responsbile for passing a valid [`EngineData`] and selection vector. -void visit_scan_data(Handle<EngineData> data, +/// engine is responsbile for passing a valid [`ExclusiveEngineData`] and selection vector. +void visit_scan_data(Handle<ExclusiveEngineData> data, KernelBoolSlice selection_vec, NullableCvoid engine_context, CScanCallback callback); diff --git a/src/include/delta_utils.hpp b/src/include/delta_utils.hpp index 37dc289..9b33c5c 100644 --- a/src/include/delta_utils.hpp +++ b/src/include/delta_utils.hpp @@ -102,11 +102,11 @@ struct TemplatedUniqueKernelPointer : public UniqueKernelPointer<KernelType> { }; }; -typedef TemplatedUniqueKernelPointer<ffi::SharedSnapshot, ffi::drop_snapshot> KernelSnapshot; -typedef TemplatedUniqueKernelPointer<ffi::SharedExternEngine, ffi::drop_engine> KernelExternEngine; -typedef TemplatedUniqueKernelPointer<ffi::SharedScan, ffi::drop_scan> KernelScan; -typedef TemplatedUniqueKernelPointer<ffi::SharedGlobalScanState, ffi::drop_global_scan_state> KernelGlobalScanState; -typedef TemplatedUniqueKernelPointer<ffi::SharedScanDataIterator, ffi::kernel_scan_data_free> KernelScanDataIterator; +typedef TemplatedUniqueKernelPointer<ffi::SharedSnapshot, ffi::free_snapshot> KernelSnapshot; +typedef TemplatedUniqueKernelPointer<ffi::SharedExternEngine, ffi::free_engine> KernelExternEngine; +typedef TemplatedUniqueKernelPointer<ffi::SharedScan, ffi::free_scan> KernelScan; +typedef TemplatedUniqueKernelPointer<ffi::SharedGlobalScanState, ffi::free_global_scan_state> KernelGlobalScanState; +typedef TemplatedUniqueKernelPointer<ffi::SharedScanDataIterator, ffi::free_kernel_scan_data> KernelScanDataIterator; struct KernelUtils { static ffi::KernelStringSlice ToDeltaString(const string &str); diff --git a/src/include/functions/delta_scan.hpp b/src/include/functions/delta_scan.hpp index 07c782b..b4c3c76 100644 --- a/src/include/functions/delta_scan.hpp +++ b/src/include/functions/delta_scan.hpp @@ -22,7 +22,7 @@ struct DeltaFileMetaData { ~DeltaFileMetaData() { if (selection_vector.ptr) { - ffi::drop_bool_slice(selection_vector); + ffi::free_bool_slice(selection_vector); } } diff --git a/test/sql/dat/all.test b/test/sql/dat/all.test index f6332da..fc7c6c5 100644 --- a/test/sql/dat/all.test +++ b/test/sql/dat/all.test @@ -53,22 +53,22 @@ FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/basic_append/expected/ ---- # with_schema_change -query I rowsort with_checkpoint +query I rowsort with_schema_change SELECT * FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/with_schema_change/delta') ---- -query I rowsort with_checkpoint +query I rowsort with_schema_change SELECT * FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/with_schema_change/expected/latest/**/*.parquet') ---- -query I rowsort with_checkpoint_count +query I rowsort with_schema_change_count SELECT count(*) FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/with_schema_change/delta') ---- -query I rowsort with_checkpoint_count +query I rowsort with_schema_change_count SELECT count(*) FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/with_schema_change/expected/latest/**/*.parquet') ---- @@ -129,11 +129,6 @@ SELECT * FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/multi_partitioned_2/expected/latest/**/*.parquet') ---- -### FAILING DAT TESTS - -# TODO fix all of these -mode skip - # no_replay query I rowsort no_replay SELECT *