diff --git a/CMakeLists.txt b/CMakeLists.txt index 46a0b95..88d2fe2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,6 +20,7 @@ set(EXTENSION_SOURCES src/inlined_parquet/parquet_extension.cpp src/delta_extension.cpp src/delta_functions.cpp + src/delta_utils.cpp src/functions/delta_scan.cpp) ### Custom config diff --git a/src/delta_utils.cpp b/src/delta_utils.cpp new file mode 100644 index 0000000..b02e898 --- /dev/null +++ b/src/delta_utils.cpp @@ -0,0 +1,319 @@ +#include "delta_utils.hpp" + +#include "duckdb.hpp" +#include "duckdb/main/extension_util.hpp" +#include + +namespace duckdb { + +unique_ptr SchemaVisitor::VisitSnapshotSchema(ffi::SharedSnapshot* snapshot) { + SchemaVisitor state; + ffi::EngineSchemaVisitor visitor; + + visitor.data = &state; + visitor.make_field_list = (uintptr_t (*)(void*, uintptr_t)) &MakeFieldList; + visitor.visit_struct = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, uintptr_t)) &VisitStruct; + visitor.visit_array = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, bool, uintptr_t)) &VisitArray; + visitor.visit_map = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, bool, uintptr_t)) &VisitMap; + visitor.visit_decimal = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, uint8_t , uint8_t)) &VisitDecimal; + visitor.visit_string = VisitSimpleType(); + visitor.visit_long = VisitSimpleType(); + visitor.visit_integer = VisitSimpleType(); + visitor.visit_short = VisitSimpleType(); + visitor.visit_byte = VisitSimpleType(); + visitor.visit_float = VisitSimpleType(); + visitor.visit_double = VisitSimpleType(); + visitor.visit_boolean = VisitSimpleType(); + visitor.visit_binary = VisitSimpleType(); + visitor.visit_date = VisitSimpleType(); + visitor.visit_timestamp = VisitSimpleType(); + visitor.visit_timestamp_ntz = VisitSimpleType(); + + uintptr_t result = visit_schema(snapshot, &visitor); + return state.TakeFieldList(result); +} + +void SchemaVisitor::VisitDecimal(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, uint8_t precision, uint8_t scale) { + state->AppendToList(sibling_list_id, name, LogicalType::DECIMAL(precision, scale)); +} + +uintptr_t SchemaVisitor::MakeFieldList(SchemaVisitor* state, uintptr_t capacity_hint) { + return state->MakeFieldListImpl(capacity_hint); +} + +void SchemaVisitor::VisitStruct(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, uintptr_t child_list_id) { + auto children = state->TakeFieldList(child_list_id); + state->AppendToList(sibling_list_id, name, LogicalType::STRUCT(std::move(*children))); +} + +void SchemaVisitor::VisitArray(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool contains_null, uintptr_t child_list_id) { + auto children = state->TakeFieldList(child_list_id); + + D_ASSERT(children->size() == 1); + state->AppendToList(sibling_list_id, name, LogicalType::LIST(children->front().second)); +} + +void SchemaVisitor::VisitMap(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool contains_null, uintptr_t child_list_id) { + auto children = state->TakeFieldList(child_list_id); + + D_ASSERT(children->size() == 2); + state->AppendToList(sibling_list_id, name, LogicalType::MAP(LogicalType::STRUCT(std::move(*children)))); +} + +uintptr_t SchemaVisitor::MakeFieldListImpl(uintptr_t capacity_hint) { + uintptr_t id = next_id++; + auto list = make_uniq(); + if (capacity_hint > 0) { + list->reserve(capacity_hint); + } + inflight_lists.emplace(id, std::move(list)); + return id; +} + +void SchemaVisitor::AppendToList(uintptr_t id, ffi::KernelStringSlice name, LogicalType&& child) { + auto it = inflight_lists.find(id); + if (it == inflight_lists.end()) { + // TODO... some error... + throw InternalException("WEIRD SHIT"); + } else { + it->second->emplace_back(std::make_pair(string(name.ptr, name.len), std::move(child))); + } +} + +unique_ptr SchemaVisitor::TakeFieldList(uintptr_t id) { + auto it = inflight_lists.find(id); + if (it == inflight_lists.end()) { + // TODO: Raise some kind of error. + throw InternalException("WEIRD SHIT 2"); + } + auto rval = std::move(it->second); + inflight_lists.erase(it); + return rval; +} + + +ffi::EngineError* DuckDBEngineError::AllocateError(ffi::KernelError etype, ffi::KernelStringSlice msg) { + auto error = new DuckDBEngineError; + error->etype = etype; + error->error_message = string(msg.ptr, msg.len); + return error; +} + +string DuckDBEngineError::KernelErrorEnumToString(ffi::KernelError err) { + const char* KERNEL_ERROR_ENUM_STRINGS[] = { + "UnknownError", + "FFIError", + "ArrowError", + "EngineDataTypeError", + "ExtractError", + "GenericError", + "IOErrorError", + "ParquetError", + "ObjectStoreError", + "ObjectStorePathError", + "Reqwest", + "FileNotFoundError", + "MissingColumnError", + "UnexpectedColumnTypeError", + "MissingDataError", + "MissingVersionError", + "DeletionVectorError", + "InvalidUrlError", + "MalformedJsonError", + "MissingMetadataError", + "MissingProtocolError", + "MissingMetadataAndProtocolError", + "ParseError", + "JoinFailureError", + "Utf8Error", + "ParseIntError", + "InvalidColumnMappingMode", + "InvalidTableLocation", + "InvalidDecimalError", + }; + + static_assert(sizeof(KERNEL_ERROR_ENUM_STRINGS)/sizeof(char*)-1 == (int)ffi::KernelError::InvalidDecimalError, + "KernelErrorEnumStrings mismatched with kernel"); + + if ((int)err < sizeof(KERNEL_ERROR_ENUM_STRINGS)/sizeof(char*)) { + return KERNEL_ERROR_ENUM_STRINGS[(int)err]; + } + + return StringUtil::Format("EnumOutOfRange (enum val out of range: %d)", (int)err); +} + +void DuckDBEngineError::Throw(string from_where) { + // Make copies before calling delete this + auto etype_copy = etype; + auto message_copy = error_message; + + // Consume error by calling delete this (remember this error is created by kernel using AllocateError) + delete this; + throw IOException("Hit DeltaKernel FFI error (from: %s): Hit error: %u (%s) with message (%s)", + from_where.c_str(), etype_copy, KernelErrorEnumToString(etype_copy), message_copy); +} + + + +ffi::KernelStringSlice KernelUtils::ToDeltaString(const string &str) { + return {str.data(), str.size()}; +} + +string KernelUtils::FromDeltaString(const struct ffi::KernelStringSlice slice) { + return {slice.ptr, slice.len}; +} + +vector KernelUtils::FromDeltaBoolSlice(const struct ffi::KernelBoolSlice slice) { + vector result; + result.assign(slice.ptr, slice.ptr + slice.len); + return result; +} + +PredicateVisitor::PredicateVisitor(const vector &column_names, optional_ptr filters) : EnginePredicate { + .predicate = this, + .visitor = (uintptr_t (*)(void*, ffi::KernelExpressionVisitorState*)) &VisitPredicate} +{ + if (filters) { + for (auto& filter : filters->filters) { + column_filters[column_names[filter.first]] = filter.second.get(); + } + } +} + +// Template wrapper function that implements get_next for EngineIteratorFromCallable. +template +static auto GetNextFromCallable(Callable* callable) -> decltype(std::declval()()) { + return callable->operator()(); +} + +// Wraps a callable object (e.g. C++11 lambda) as an EngineIterator. +template +ffi::EngineIterator EngineIteratorFromCallable(Callable& callable) { + auto* get_next = &GetNextFromCallable; + return {.data = &callable, .get_next = (const void *(*)(void*)) get_next}; +}; + +// Helper function to prevent pushing down filters kernel cant handle +// TODO: remove once kernel handles this properly? +static bool CanHandleFilter(TableFilter *filter) { + switch (filter->filter_type) { + case TableFilterType::CONSTANT_COMPARISON: + return true; + case TableFilterType::CONJUNCTION_AND: { + auto &conjunction = static_cast(*filter); + bool can_handle = true; + for (const auto& child : conjunction.child_filters) { + can_handle = can_handle && CanHandleFilter(child.get()); + } + return can_handle; + } + + default: + return false; + } +} + +// Prunes the list of predicates to ones that we can handle +static unordered_map PrunePredicates(unordered_map predicates) { + unordered_map result; + for (const auto &predicate : predicates) { + if (CanHandleFilter(predicate.second)) { + result[predicate.first] = predicate.second; + } + + } + return result; +} + +uintptr_t PredicateVisitor::VisitPredicate(PredicateVisitor* predicate, ffi::KernelExpressionVisitorState* state) { + auto filters = PrunePredicates(predicate->column_filters); + + auto it = filters.begin(); + auto end = filters.end(); + auto get_next = [predicate, state, &it, &end]() -> uintptr_t { + if (it == end) { + return 0; + } + auto &filter = *it++; + return predicate->VisitFilter(filter.first, *filter.second, state); + }; + auto eit = EngineIteratorFromCallable(get_next); + + // TODO: this should be fixed upstream? + try { + return visit_expression_and(state, &eit); + } catch (...) { + return ~0; + } +} + +uintptr_t PredicateVisitor::VisitConstantFilter(const string &col_name, const ConstantFilter &filter, ffi::KernelExpressionVisitorState* state) { + auto maybe_left = ffi::visit_expression_column(state, KernelUtils::ToDeltaString(col_name), DuckDBEngineError::AllocateError); + uintptr_t left = KernelUtils::UnpackResult(maybe_left, "VisitConstantFilter failed to visit_expression_column"); + + uintptr_t right = ~0; + auto &value = filter.constant; + switch (value.type().id()) { + case LogicalType::BIGINT: + right = visit_expression_literal_long(state, BigIntValue::Get(value)); + break; + + + case LogicalType::VARCHAR: { + // WARNING: C++ lifetime extension rules don't protect calls of the form foo(std::string(...).c_str()) + auto str = StringValue::Get(value); + auto maybe_right = ffi::visit_expression_literal_string(state, KernelUtils::ToDeltaString(col_name), DuckDBEngineError::AllocateError); + right = KernelUtils::UnpackResult(maybe_right, "VisitConstantFilter failed to visit_expression_literal_string"); + break; + } + + default: + break; // unsupported type + } + + // TODO support other comparison types? + switch (filter.comparison_type) { + case ExpressionType::COMPARE_LESSTHAN: + return visit_expression_lt(state, left, right); + case ExpressionType::COMPARE_LESSTHANOREQUALTO: + return visit_expression_le(state, left, right); + case ExpressionType::COMPARE_GREATERTHAN: + return visit_expression_gt(state, left, right); + case ExpressionType::COMPARE_GREATERTHANOREQUALTO: + return visit_expression_ge(state, left, right); + case ExpressionType::COMPARE_EQUAL: + return visit_expression_eq(state, left, right); + + default: + std::cout << " Unsupported operation: " << (int) filter.comparison_type << std::endl; + return ~0; // Unsupported operation + } +} + + +uintptr_t PredicateVisitor::VisitAndFilter(const string &col_name, const ConjunctionAndFilter &filter, ffi::KernelExpressionVisitorState* state) { + auto it = filter.child_filters.begin(); + auto end = filter.child_filters.end(); + auto get_next = [this, col_name, state, &it, &end]() -> uintptr_t { + if (it == end) { + return 0; + } + auto &child_filter = *it++; + return VisitFilter(col_name, *child_filter, state); + }; + auto eit = EngineIteratorFromCallable(get_next); + return visit_expression_and(state, &eit); +} + +uintptr_t PredicateVisitor::VisitFilter(const string &col_name, const TableFilter &filter, ffi::KernelExpressionVisitorState* state) { + switch (filter.filter_type) { + case TableFilterType::CONSTANT_COMPARISON: + return VisitConstantFilter(col_name, static_cast(filter), state); + case TableFilterType::CONJUNCTION_AND: + return VisitAndFilter(col_name, static_cast(filter), state); + default: + throw NotImplementedException("Attempted to push down unimplemented filter type: '%s'", EnumUtil::ToString(filter.filter_type)); + } +} + +}; diff --git a/src/functions/delta_scan.cpp b/src/functions/delta_scan.cpp index dfe5a62..4ac6826 100644 --- a/src/functions/delta_scan.cpp +++ b/src/functions/delta_scan.cpp @@ -22,12 +22,6 @@ namespace duckdb { -static void print_selection_vector(char* indent, const struct ffi::KernelBoolSlice *selection_vec) { - for (int i = 0; i < selection_vec->len; i++) { - printf("%ssel[%i] = %s\n", indent, i, selection_vec->ptr[i] ? "1" : "0"); - } -} - static void* allocate_string(const struct ffi::KernelStringSlice slice) { return new string(slice.ptr, slice.len); } @@ -36,9 +30,7 @@ static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::Kernel auto context = (DeltaSnapshot *) engine_context; auto path_string = context->GetPath(); StringUtil::RTrim(path_string, "/"); - path_string += "/" + from_delta_string_slice(path); - -// printf("Fetch metadata for %s\n", path_string.c_str()); + path_string += "/" + KernelUtils::FromDeltaString(path); // First we append the file to our resolved files context->resolved_files.push_back(DeltaSnapshot::ToDuckDBPath(path_string)); @@ -51,8 +43,8 @@ static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::Kernel context->metadata.back()->file_number = context->resolved_files.size() - 1; // Fetch the deletion vector - auto selection_vector_res = ffi::selection_vector_from_dv(dv_info, context->extern_engine, context->global_state); - auto selection_vector = unpack_result_or_throw(selection_vector_res, "selection_vector_from_dv for path " + context->GetPath()); + auto selection_vector_res = ffi::selection_vector_from_dv(dv_info, context->extern_engine.get(), context->global_state.get()); + auto selection_vector = KernelUtils::UnpackResult(selection_vector_res, "selection_vector_from_dv for path " + context->GetPath()); if (selection_vector.ptr) { context->metadata.back()->selection_vector = selection_vector; } @@ -60,10 +52,9 @@ static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::Kernel // Lookup all columns for potential hits in the constant map case_insensitive_map_t constant_map; for (const auto &col: context->names) { - auto key = to_delta_string_slice(col); + auto key = KernelUtils::ToDeltaString(col); auto *partition_val = (string *) ffi::get_from_map(partition_values, key, allocate_string); if (partition_val) { -// printf("- %s = %s\n", col.c_str(), (*partition_val).c_str()); constant_map[col] = *partition_val; delete partition_val; } @@ -72,9 +63,6 @@ static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::Kernel } static void visit_data(void *engine_context, ffi::EngineData* engine_data, const struct ffi::KernelBoolSlice selection_vec) { -// printf("Got some data\n"); -// printf(" Of this data, here is a selection vector\n"); -// print_selection_vector(" ", &selection_vec); ffi::visit_scan_data(engine_data, selection_vec, engine_context, visit_callback); } @@ -83,8 +71,8 @@ static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &p // For "regular" paths we early out with the default builder config if (!StringUtil::StartsWith(path, "s3://")) { - auto interface_builder_res = ffi::get_engine_builder(to_delta_string_slice(path), error_allocator); - return unpack_result_or_throw(interface_builder_res, "get_engine_interface_builder for path " + path); + auto interface_builder_res = ffi::get_engine_builder(KernelUtils::ToDeltaString(path), DuckDBEngineError::AllocateError); + return KernelUtils::UnpackResult(interface_builder_res, "get_engine_interface_builder for path " + path); } auto end_of_container = path.find('/',5); @@ -95,10 +83,8 @@ static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &p auto bucket = path.substr(5, end_of_container-5); auto path_in_bucket = path.substr(end_of_container); - auto interface_builder_res = ffi::get_engine_builder(to_delta_string_slice(path), error_allocator); - builder = unpack_result_or_throw(interface_builder_res, "get_engine_interface_builder for path " + path); - -// ffi::set_builder_option(builder, to_delta_string_slice("aws_bucket"), to_delta_string_slice(bucket)); + auto interface_builder_res = ffi::get_engine_builder(KernelUtils::ToDeltaString(path), DuckDBEngineError::AllocateError); + builder = KernelUtils::UnpackResult(interface_builder_res, "get_engine_interface_builder for path " + path); // For S3 paths we need to trim the url, set the container, and fetch a potential secret auto &secret_manager = SecretManager::Get(context); @@ -117,16 +103,16 @@ static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &p auto region = kv_secret.TryGetValue("region").ToString(); if (key_id.empty() && secret.empty()) { - ffi::set_builder_option(builder, to_delta_string_slice("skip_signature"), to_delta_string_slice("true")); + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("skip_signature"), KernelUtils::ToDeltaString("true")); } if (!key_id.empty()) { - ffi::set_builder_option(builder, to_delta_string_slice("aws_access_key_id"), to_delta_string_slice(key_id)); + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_access_key_id"), KernelUtils::ToDeltaString(key_id)); } if (!secret.empty()) { - ffi::set_builder_option(builder, to_delta_string_slice("aws_secret_access_key"), to_delta_string_slice(secret)); + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_secret_access_key"), KernelUtils::ToDeltaString(secret)); } - ffi::set_builder_option(builder, to_delta_string_slice("aws_region"), to_delta_string_slice(region)); + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_region"), KernelUtils::ToDeltaString(region)); return builder; } @@ -167,7 +153,7 @@ void DeltaSnapshot::Bind(vector &return_types, vector &name if (!initialized) { InitializeFiles(); } - auto schema = SchemaVisitor::VisitSnapshotSchema(snapshot); + auto schema = SchemaVisitor::VisitSnapshotSchema(snapshot.get()); for (const auto &field: *schema) { names.push_back(field.first); return_types.push_back(field.second); @@ -194,7 +180,7 @@ string DeltaSnapshot::GetFile(idx_t i) { auto have_scan_data_res = ffi::kernel_scan_data_next(scan_data_iterator.get(), this, visit_data); - auto have_scan_data = unpack_result_or_throw(have_scan_data_res, "kernel_scan_data_next in DeltaSnapshot GetFile"); + auto have_scan_data = TryUnpackKernelResult(have_scan_data_res); // TODO: shouldn't the kernel always return false here? if (!have_scan_data || resolved_files.size() == size_before) { @@ -212,35 +198,27 @@ string DeltaSnapshot::GetFile(idx_t i) { } void DeltaSnapshot::InitializeFiles() { - auto path_slice = to_delta_string_slice(paths[0]); + auto path_slice = KernelUtils::ToDeltaString(paths[0]); + // Register engine auto interface_builder = CreateBuilder(context, paths[0]); - auto engine_interface_res = ffi::builder_build(interface_builder); - extern_engine = unpack_result_or_throw(engine_interface_res, "get_default_client in DeltaScanScanBind"); - - // Alternatively we can do the default client like so: -// auto extern_engine_res = ffi::get_default_client(path_slice, error_allocator); -// extern_engine = unpack_result_or_throw(extern_engine_res, "get_default_client in DeltaScanScanBind"); + extern_engine = TryUnpackKernelResult( ffi::builder_build(interface_builder)); // Initialize Snapshot - auto snapshot_res = ffi::snapshot(path_slice, extern_engine); - snapshot = unpack_result_or_throw(snapshot_res, "snapshot in DeltaScanScanBind"); + snapshot = TryUnpackKernelResult(ffi::snapshot(path_slice, extern_engine.get())); + // Create Scan PredicateVisitor visitor(names, &table_filters); + scan = TryUnpackKernelResult(ffi::scan(snapshot.get(), extern_engine.get(), &visitor)); - auto scan_res = ffi::scan(snapshot, extern_engine, &visitor); - scan = unpack_result_or_throw(scan_res, "scan in DeltaScanScanBind"); - - global_state = ffi::get_global_scan_state(scan); + // Create GlobalState + global_state = ffi::get_global_scan_state(scan.get()); // Set version - this->version = ffi::version(snapshot); + this->version = ffi::version(snapshot.get()); - auto scan_iterator_res = ffi::kernel_scan_data_init(extern_engine, scan); - scan_data_iterator = { - unpack_result_or_throw(scan_iterator_res, "kernel_scan_data_init in InitFiles"), - ffi::kernel_scan_data_free - }; + // Create scan data iterator + scan_data_iterator = TryUnpackKernelResult(ffi::kernel_scan_data_init(extern_engine.get(), scan.get())); initialized = true; } diff --git a/src/include/delta_utils.hpp b/src/include/delta_utils.hpp index fe298fd..bcb5f74 100644 --- a/src/include/delta_utils.hpp +++ b/src/include/delta_utils.hpp @@ -3,45 +3,22 @@ #include "delta_kernel_ffi.hpp" #include "duckdb/planner/filter/constant_filter.hpp" #include "duckdb/planner/filter/conjunction_filter.hpp" +#include "duckdb/common/enum_util.hpp" #include // TODO: clean up this file as we go namespace duckdb { +// SchemaVisitor is used to parse the schema of a Delta table from the Kernel class SchemaVisitor { public: using FieldList = child_list_t; - static unique_ptr VisitSnapshotSchema(ffi::SharedSnapshot* snapshot) { - SchemaVisitor state; - ffi::EngineSchemaVisitor visitor; - - visitor.data = &state; - visitor.make_field_list = (uintptr_t (*)(void*, uintptr_t)) &MakeFieldList; - visitor.visit_struct = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, uintptr_t)) &VisitStruct; - visitor.visit_array = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, bool, uintptr_t)) &VisitArray; - visitor.visit_map = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, bool, uintptr_t)) &VisitMap; - visitor.visit_decimal = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, uint8_t , uint8_t)) &VisitDecimal; - visitor.visit_string = VisitSimpleType(); - visitor.visit_long = VisitSimpleType(); - visitor.visit_integer = VisitSimpleType(); - visitor.visit_short = VisitSimpleType(); - visitor.visit_byte = VisitSimpleType(); - visitor.visit_float = VisitSimpleType(); - visitor.visit_double = VisitSimpleType(); - visitor.visit_boolean = VisitSimpleType(); - visitor.visit_binary = VisitSimpleType(); - visitor.visit_date = VisitSimpleType(); - visitor.visit_timestamp = VisitSimpleType(); - visitor.visit_timestamp_ntz = VisitSimpleType(); - - uintptr_t result = visit_schema(snapshot, &visitor); - return state.TakeFieldList(result); - } + static unique_ptr VisitSnapshotSchema(ffi::SharedSnapshot* snapshot); private: - std::map > inflight_lists; + unordered_map> inflight_lists; uintptr_t next_id = 1; typedef void (SimpleTypeVisitorFunction)(void*, uintptr_t, ffi::KernelStringSlice); @@ -55,74 +32,29 @@ class SchemaVisitor { state->AppendToList(sibling_list_id, name, TypeId); } - static void VisitDecimal(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, uint8_t precision, uint8_t scale) { - state->AppendToList(sibling_list_id, name, LogicalType::DECIMAL(precision, scale)); - } - - static uintptr_t MakeFieldList(SchemaVisitor* state, uintptr_t capacity_hint) { - return state->MakeFieldListImpl(capacity_hint); - } - - static void VisitStruct(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, uintptr_t child_list_id) { - auto children = state->TakeFieldList(child_list_id); - state->AppendToList(sibling_list_id, name, LogicalType::STRUCT(std::move(*children))); - } - - static void VisitArray(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool contains_null, uintptr_t child_list_id) { - auto children = state->TakeFieldList(child_list_id); - - D_ASSERT(children->size() == 1); - state->AppendToList(sibling_list_id, name, LogicalType::LIST(children->front().second)); - } - - static void VisitMap(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool contains_null, uintptr_t child_list_id) { - auto children = state->TakeFieldList(child_list_id); - - D_ASSERT(children->size() == 2); - state->AppendToList(sibling_list_id, name, LogicalType::MAP(LogicalType::STRUCT(std::move(*children)))); - } - - uintptr_t MakeFieldListImpl(uintptr_t capacity_hint) { - uintptr_t id = next_id++; - auto list = make_uniq(); - if (capacity_hint > 0) { - list->reserve(capacity_hint); - } - inflight_lists.emplace(id, std::move(list)); - return id; - } - - void AppendToList(uintptr_t id, ffi::KernelStringSlice name, LogicalType&& child) { - auto it = inflight_lists.find(id); - if (it == inflight_lists.end()) { - // TODO... some error... - throw InternalException("WEIRD SHIT"); - } else { - it->second->emplace_back(std::make_pair(string(name.ptr, name.len), std::move(child))); - } - } + static void VisitDecimal(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, uint8_t precision, uint8_t scale); + static uintptr_t MakeFieldList(SchemaVisitor* state, uintptr_t capacity_hint); + static void VisitStruct(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, uintptr_t child_list_id); + static void VisitArray(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool contains_null, uintptr_t child_list_id); + static void VisitMap(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool contains_null, uintptr_t child_list_id); - unique_ptr TakeFieldList(uintptr_t id) { - auto it = inflight_lists.find(id); - if (it == inflight_lists.end()) { - // TODO: Raise some kind of error. - throw InternalException("WEIRD SHIT 2"); - } - auto rval = std::move(it->second); - inflight_lists.erase(it); - return rval; - } + uintptr_t MakeFieldListImpl(uintptr_t capacity_hint); + void AppendToList(uintptr_t id, ffi::KernelStringSlice name, LogicalType&& child); + unique_ptr TakeFieldList(uintptr_t id); }; +// Allocator for errors that the kernel might throw struct DuckDBEngineError : ffi::EngineError { - string error_message; -}; + // Allocate a DuckDBEngineError, function ptr passed to kernel for error allocation + static ffi::EngineError* AllocateError(ffi::KernelError etype, ffi::KernelStringSlice msg); + // Convert a kernel error enum to a string + static string KernelErrorEnumToString(ffi::KernelError err); -ffi::EngineError* error_allocator(ffi::KernelError etype, ffi::KernelStringSlice msg) { - auto error = new DuckDBEngineError; - error->etype = etype; - error->error_message = string(msg.ptr, msg.len); - return error; + // Throw the error as an IOException + [[noreturn]] void Throw(string from_info); + + // The error message from Kernel + string error_message; }; // RAII wrapper that returns ownership of a kernel pointer to kernel when it goes out of @@ -160,236 +92,55 @@ struct UniqueKernelPointer { void (*free)(KernelType*) = nullptr; }; -// TODO make less hacky -static const char* KernelErrorEnumStrings[] = { - "UnknownError", - "FFIError", - "ArrowError", - "EngineDataTypeError", - "ExtractError", - "GenericError", - "IOErrorError", - "ParquetError", - "ObjectStoreError", - "ObjectStorePathError", - "Reqwest", - "FileNotFoundError", - "MissingColumnError", - "UnexpectedColumnTypeError", - "MissingDataError", - "MissingVersionError", - "DeletionVectorError", - "InvalidUrlError", - "MalformedJsonError", - "MissingMetadataError", - "MissingProtocolError", - "MissingMetadataAndProtocolError", - "ParseError", - "JoinFailureError", - "Utf8Error", - "ParseIntError" +// Syntactic sugar around the different kernel types +template +struct TemplatedUniqueKernelPointer : public UniqueKernelPointer { + TemplatedUniqueKernelPointer() : UniqueKernelPointer() { + }; + TemplatedUniqueKernelPointer(KernelType* ptr) : UniqueKernelPointer(ptr, DeleteFunction) { + }; }; -static_assert(sizeof(KernelErrorEnumStrings)/sizeof(char*)-1 == (int)ffi::KernelError::ParseIntError, - "KernelErrorEnumStrings failin"); - -static string kernel_error_to_string(ffi::KernelError err) { - return KernelErrorEnumStrings[(int)err]; -} - -// TODO: not unpacking an ExternResult with an error will now lead to memory leak -template -static T unpack_result_or_throw(ffi::ExternResult result, const string &from_where) { - if (result.tag == ffi::ExternResult::Tag::Err) { - if (result.err._0){ - auto error_cast = static_cast(result.err._0); - auto etype = error_cast->etype; - auto message = error_cast->error_message; - delete error_cast; - - throw IOException("Hit DeltaKernel FFI error (from: %s): Hit error: %u (%s) with message (%s)", - from_where.c_str(), etype, kernel_error_to_string(etype), message); - } else { - throw IOException("Hit DeltaKernel FFI error (from: %s): Hit error, but error was nullptr", from_where.c_str()); - } - } else if (result.tag == ffi::ExternResult::Tag::Ok) { - return result.ok._0; - } - throw IOException("Invalid error ExternResult tag found!"); -} - -template -bool result_is_ok(ffi::ExternResult result) { - if (result.tag == ffi::ExternResult::Tag::Ok) { - return true; - } else if (result.tag == ffi::ExternResult::Tag::Err) { - return false; - } - throw IOException("Invalid error ExternResult tag found!"); -} - -ffi::KernelStringSlice to_delta_string_slice(const string &str) { - return {str.data(), str.size()}; -} -string from_delta_string_slice(const struct ffi::KernelStringSlice slice) { - return {slice.ptr, slice.len}; -} - -vector from_delta_bool_slice(const struct ffi::KernelBoolSlice slice) { - vector result; - result.assign(slice.ptr, slice.ptr + slice.len); - return result; -} - -// Template wrapper function that implements get_next for EngineIteratorFromCallable. -template -static auto GetNextFromCallable(Callable* callable) -> decltype(std::declval()()) { - return callable->operator()(); -} - -// Wraps a callable object (e.g. C++11 lambda) as an EngineIterator. -template -ffi::EngineIterator EngineIteratorFromCallable(Callable& callable) { - auto* get_next = &GetNextFromCallable; - return {.data = &callable, .get_next = (const void *(*)(void*)) get_next}; -}; - -// Helper function to prevent pushing down filters kernel cant handle -// TODO: remove once kernel handles this properly? -static bool CanHandleFilter(TableFilter *filter) { - switch (filter->filter_type) { - case TableFilterType::CONSTANT_COMPARISON: - return true; - case TableFilterType::CONJUNCTION_AND: { - auto &conjunction = static_cast(*filter); - bool can_handle = true; - for (const auto& child : conjunction.child_filters) { - can_handle = can_handle && CanHandleFilter(child.get()); - } - return can_handle; +typedef TemplatedUniqueKernelPointer KernelSnapshot; +typedef TemplatedUniqueKernelPointer KernelExternEngine; +typedef TemplatedUniqueKernelPointer KernelScan; +typedef TemplatedUniqueKernelPointer KernelGlobalScanState; +typedef TemplatedUniqueKernelPointer KernelScanDataIterator; + +struct KernelUtils { + static ffi::KernelStringSlice ToDeltaString(const string &str); + static string FromDeltaString(const struct ffi::KernelStringSlice slice); + static vector FromDeltaBoolSlice(const struct ffi::KernelBoolSlice slice); + + // TODO: all kernel results need to be unpacked, not doing so will result in an error. This should be cleaned up + template + static T UnpackResult(ffi::ExternResult result, const string &from_where) { + if (result.tag == ffi::ExternResult::Tag::Err) { + if (result.err._0){ + auto error_cast = static_cast(result.err._0); + error_cast->Throw(from_where); + } else { + throw IOException("Hit DeltaKernel FFI error (from: %s): Hit error, but error was nullptr", from_where.c_str()); } - - default: - return false; - } -} - -// Prunes the list of predicates to ones that we can handle -static std::map PrunePredicates(std::map predicates) { - std::map result; - for (const auto &predicate : predicates) { - if (CanHandleFilter(predicate.second)) { - result[predicate.first] = predicate.second; + } else if (result.tag == ffi::ExternResult::Tag::Ok) { + return result.ok._0; } - + throw IOException("Invalid error ExternResult tag found!"); } - return result; -} +}; class PredicateVisitor : public ffi::EnginePredicate { public: - PredicateVisitor(const vector &column_names, optional_ptr filters) : EnginePredicate { - .predicate = this, - .visitor = (uintptr_t (*)(void*, ffi::KernelExpressionVisitorState*)) &VisitPredicate} - { - if (filters) { - for (auto& filter : filters->filters) { - column_filters[column_names[filter.first]] = filter.second.get(); - } - } - } + PredicateVisitor(const vector &column_names, optional_ptr filters); private: - std::map column_filters; + unordered_map column_filters; - static uintptr_t VisitPredicate(PredicateVisitor* predicate, ffi::KernelExpressionVisitorState* state) { - auto filters = PrunePredicates(predicate->column_filters); + static uintptr_t VisitPredicate(PredicateVisitor* predicate, ffi::KernelExpressionVisitorState* state); - auto it = filters.begin(); - auto end = filters.end(); - auto get_next = [predicate, state, &it, &end]() -> uintptr_t { - if (it == end) { - return 0; - } - auto &filter = *it++; - return predicate->VisitFilter(filter.first, *filter.second, state); - }; - auto eit = EngineIteratorFromCallable(get_next); - - // TODO: this should be fixed upstream? - try { - return visit_expression_and(state, &eit); - } catch (...) { - return ~0; - } - } - - uintptr_t VisitConstantFilter(const string &col_name, const ConstantFilter &filter, ffi::KernelExpressionVisitorState* state) { - auto maybe_left = ffi::visit_expression_column(state, to_delta_string_slice(col_name), error_allocator); - uintptr_t left = unpack_result_or_throw(maybe_left, "VisitConstantFilter failed to visit_expression_column"); - - uintptr_t right = ~0; - auto &value = filter.constant; - switch (value.type().id()) { - case LogicalType::BIGINT: - right = visit_expression_literal_long(state, BigIntValue::Get(value)); - break; - - case LogicalType::VARCHAR: { - // WARNING: C++ lifetime extension rules don't protect calls of the form foo(std::string(...).c_str()) - auto str = StringValue::Get(value); - auto maybe_right = ffi::visit_expression_literal_string(state, to_delta_string_slice(col_name), error_allocator); - right = unpack_result_or_throw(maybe_right, "VisitConstantFilter failed to visit_expression_literal_string"); - break; - } - - default: - break; // unsupported type - } - - // TODO support other comparison types? - switch (filter.comparison_type) { - case ExpressionType::COMPARE_LESSTHAN: - return visit_expression_lt(state, left, right); - case ExpressionType::COMPARE_LESSTHANOREQUALTO: - return visit_expression_le(state, left, right); - case ExpressionType::COMPARE_GREATERTHAN: - return visit_expression_gt(state, left, right); - case ExpressionType::COMPARE_GREATERTHANOREQUALTO: - return visit_expression_ge(state, left, right); - case ExpressionType::COMPARE_EQUAL: - return visit_expression_eq(state, left, right); - - default: - std::cout << " Unsupported operation: " << (int) filter.comparison_type << std::endl; - return ~0; // Unsupported operation - } - } - - uintptr_t VisitAndFilter(const string &col_name, const ConjunctionAndFilter &filter, ffi::KernelExpressionVisitorState* state) { - auto it = filter.child_filters.begin(); - auto end = filter.child_filters.end(); - auto get_next = [this, col_name, state, &it, &end]() -> uintptr_t { - if (it == end) { - return 0; - } - auto &child_filter = *it++; - return VisitFilter(col_name, *child_filter, state); - }; - auto eit = EngineIteratorFromCallable(get_next); - return visit_expression_and(state, &eit); - } - - uintptr_t VisitFilter(const string &col_name, const TableFilter &filter, ffi::KernelExpressionVisitorState* state) { - switch (filter.filter_type) { - case TableFilterType::CONSTANT_COMPARISON: - return VisitConstantFilter(col_name, static_cast(filter), state); - case TableFilterType::CONJUNCTION_AND: - return VisitAndFilter(col_name, static_cast(filter), state); - default: - throw NotImplementedException("Attempted to push down unimplemented filter type: '%s'", EnumUtil::ToString(filter.filter_type)); - } - } + uintptr_t VisitConstantFilter(const string &col_name, const ConstantFilter &filter, ffi::KernelExpressionVisitorState* state); + uintptr_t VisitAndFilter(const string &col_name, const ConjunctionAndFilter &filter, ffi::KernelExpressionVisitorState* state); + uintptr_t VisitFilter(const string &col_name, const TableFilter &filter, ffi::KernelExpressionVisitorState* state); }; } // namespace duckdb diff --git a/src/include/functions/delta_scan.hpp b/src/include/functions/delta_scan.hpp index 980f0fa..07c782b 100644 --- a/src/include/functions/delta_scan.hpp +++ b/src/include/functions/delta_scan.hpp @@ -57,16 +57,21 @@ struct DeltaSnapshot : public MultiFileList { // TODO: How to guarantee we only call this after the filter pushdown? void InitializeFiles(); + template + T TryUnpackKernelResult(ffi::ExternResult result) { + return KernelUtils::UnpackResult(result, StringUtil::Format("While trying to read from delta table: '%s'", paths[0])); + } + // TODO: change back to protected public: idx_t version; //! Delta Kernel Structures - ffi::SharedSnapshot* snapshot; - ffi::SharedExternEngine* extern_engine; - ffi::SharedScan* scan; - ffi::SharedGlobalScanState* global_state; - UniqueKernelPointer scan_data_iterator; + KernelSnapshot snapshot; + KernelExternEngine extern_engine; + KernelScan scan; + KernelGlobalScanState global_state; + KernelScanDataIterator scan_data_iterator; //! Names vector names; @@ -113,7 +118,7 @@ struct DeltaMultiFileReader : public MultiFileReader { const vector &local_names, const vector &global_types, const vector &global_names, const vector &global_column_ids, MultiFileReaderData &reader_data, const string &initial_file, - optional_ptr global_state); + optional_ptr global_state) override; unique_ptr InitializeGlobalState(ClientContext &context, const MultiFileReaderOptions &file_options, const MultiFileReaderBindData &bind_data, const MultiFileList &file_list, diff --git a/test/sql/generated/tpcds.test_slow b/test/sql/generated/tpcds.test_slow index b3d7e00..25ae8ca 100644 --- a/test/sql/generated/tpcds.test_slow +++ b/test/sql/generated/tpcds.test_slow @@ -25,11 +25,6 @@ create view ${table} as from ${table}_delta endloop -# FIXME: for now this sporadically hits too many open files -mode skip - -mode output_result - loop i 1 9 query I