Skip to content

Commit

Permalink
Merge pull request #129 from samansmink/bump-kernel-to-0.6.0
Browse files Browse the repository at this point in the history
Bump kernel to 0.6.0
  • Loading branch information
samansmink authored Dec 19, 2024
2 parents 7140762 + 829103e commit fd9dc3e
Show file tree
Hide file tree
Showing 7 changed files with 562 additions and 377 deletions.
26 changes: 8 additions & 18 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ elseif(UNIX)
elseif(WIN32)
set(PLATFORM_LIBS
ntdll
crypt32
ncrypt
secur32
ws2_32
Expand Down Expand Up @@ -118,11 +119,8 @@ set(RUST_UNSET_ENV_VARS --unset=CC --unset=CXX --unset=LD)
set(DELTA_KERNEL_LIBNAME
"${CMAKE_STATIC_LIBRARY_PREFIX}delta_kernel_ffi${CMAKE_STATIC_LIBRARY_SUFFIX}"
)
set(DELTA_KERNEL_LIBPATH_DEBUG
"${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/${RUST_PLATFORM_TARGET}/debug/${DELTA_KERNEL_LIBNAME}"
)
set(DELTA_KERNEL_LIBPATH_RELEASE
"${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/${RUST_PLATFORM_TARGET}/release/${DELTA_KERNEL_LIBNAME}"
set(DELTA_KERNEL_LIBPATH
"${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/${RUST_PLATFORM_TARGET}/$<IF:$<CONFIG:Debug>,debug,release>/${DELTA_KERNEL_LIBNAME}"
)
set(DELTA_KERNEL_FFI_HEADER_PATH
"${CMAKE_BINARY_DIR}/rust/src/delta_kernel/target/ffi-headers")
Expand All @@ -141,7 +139,7 @@ ExternalProject_Add(
# the c++ headers. Currently, when bumping the kernel version, the produced
# header in ./src/include/delta_kernel_ffi.hpp should be also bumped, applying
# the fix
GIT_TAG v0.5.0
GIT_TAG v0.6.0
# Prints the env variables passed to the cargo build to the terminal, useful
# in debugging because passing them through CMake is an error-prone mess
CONFIGURE_COMMAND ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS}
Expand All @@ -151,19 +149,13 @@ ExternalProject_Add(
# Build debug build
BUILD_COMMAND
${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} cargo build
--package delta_kernel_ffi --workspace --all-features ${RUST_PLATFORM_PARAM}
# Build release build
COMMAND
${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} cargo build
--package delta_kernel_ffi --workspace --all-features --release
${RUST_PLATFORM_PARAM}
--package delta_kernel_ffi --workspace $<$<CONFIG:Release>:--release> --all-features ${RUST_PLATFORM_PARAM}
# Build DATs
COMMAND
${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} cargo build
--manifest-path=${CMAKE_BINARY_DIR}/rust/src/delta_kernel/acceptance/Cargo.toml
# Define the byproducts, required for building with Ninja
BUILD_BYPRODUCTS "${DELTA_KERNEL_LIBPATH_DEBUG}"
BUILD_BYPRODUCTS "${DELTA_KERNEL_LIBPATH_RELEASE}"
BUILD_BYPRODUCTS "${DELTA_KERNEL_LIBPATH}"
BUILD_BYPRODUCTS "${DELTA_KERNEL_FFI_HEADER_C}"
BUILD_BYPRODUCTS "${DELTA_KERNEL_FFI_HEADER_CXX}"
INSTALL_COMMAND ""
Expand All @@ -185,14 +177,12 @@ add_compile_definitions(DEFINE_DEFAULT_ENGINE)

# Link delta-kernal-rs to static lib
target_link_libraries(
${EXTENSION_NAME} debug ${DELTA_KERNEL_LIBPATH_DEBUG} optimized
${DELTA_KERNEL_LIBPATH_RELEASE} ${PLATFORM_LIBS})
${EXTENSION_NAME} ${DELTA_KERNEL_LIBPATH} ${PLATFORM_LIBS})
add_dependencies(${EXTENSION_NAME} delta_kernel)

Check warning on line 181 in CMakeLists.txt

View workflow job for this annotation

GitHub Actions / Build extension binaries / MacOS (osx_amd64, x86_64, x64-osx)

Not disabling vptr sanitizer on M1 Macbook - set DISABLE_VPTR_SANITIZER

Check warning on line 181 in CMakeLists.txt

View workflow job for this annotation

GitHub Actions / Build extension binaries / MacOS (osx_arm64, arm64, arm64-osx)

Not disabling vptr sanitizer on M1 Macbook - set DISABLE_VPTR_SANITIZER

# Link delta-kernal-rs to dynamic lib
target_link_libraries(
${LOADABLE_EXTENSION_NAME} debug ${DELTA_KERNEL_LIBPATH_DEBUG} optimized
${DELTA_KERNEL_LIBPATH_RELEASE} ${PLATFORM_LIBS})
${LOADABLE_EXTENSION_NAME} ${DELTA_KERNEL_LIBPATH} ${PLATFORM_LIBS})
add_dependencies(${LOADABLE_EXTENSION_NAME} delta_kernel)

install(
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ test_debug: export DELTA_KERNEL_TESTS_PATH=./build/debug/rust/src/delta_kernel/k
test_debug: export DAT_PATH=./build/debug/rust/src/delta_kernel/acceptance/tests/dat

# Core extensions that we need for testing
CORE_EXTENSIONS='tpcds;tpch;aws;azure;httpfs'
#CORE_EXTENSIONS='tpcds;tpch;aws;azure;httpfs'

# Set this flag during building to enable the benchmark runner
ifeq (${BUILD_BENCHMARK}, 1)
Expand Down
15 changes: 7 additions & 8 deletions src/delta_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,15 @@ uintptr_t SchemaVisitor::MakeFieldListImpl(uintptr_t capacity_hint) {
void SchemaVisitor::AppendToList(uintptr_t id, ffi::KernelStringSlice name, LogicalType &&child) {
auto it = inflight_lists.find(id);
if (it == inflight_lists.end()) {
// TODO... some error...
throw InternalException("WEIRD SHIT");
} else {
it->second->emplace_back(std::make_pair(string(name.ptr, name.len), std::move(child)));
throw InternalException("Unhandled error in SchemaVisitor::AppendToList child");
}
it->second->emplace_back(std::make_pair(string(name.ptr, name.len), std::move(child)));
}

unique_ptr<SchemaVisitor::FieldList> SchemaVisitor::TakeFieldList(uintptr_t id) {
auto it = inflight_lists.find(id);
if (it == inflight_lists.end()) {
// TODO: Raise some kind of error.
throw InternalException("WEIRD SHIT 2");
throw InternalException("Unhandled error in SchemaVisitor::TakeFieldList");
}
auto rval = std::move(it->second);
inflight_lists.erase(it);
Expand Down Expand Up @@ -145,10 +142,12 @@ string DuckDBEngineError::KernelErrorEnumToString(ffi::KernelError err) {
"MissingCommitInfo",
"UnsupportedError",
"ParseIntervalError",
"ChangeDataFeedUnsupported"
"ChangeDataFeedUnsupported",
"ChangeDataFeedIncompatibleSchema",
"InvalidCheckpoint"
};

static_assert(sizeof(KERNEL_ERROR_ENUM_STRINGS) / sizeof(char *) - 1 == (int)ffi::KernelError::ChangeDataFeedUnsupported,
static_assert(sizeof(KERNEL_ERROR_ENUM_STRINGS) / sizeof(char *) - 1 == (int)ffi::KernelError::InvalidCheckpoint,
"KernelErrorEnumStrings mismatched with kernel");

if ((int)err < sizeof(KERNEL_ERROR_ENUM_STRINGS) / sizeof(char *)) {
Expand Down
103 changes: 48 additions & 55 deletions src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "functions/delta_scan.hpp"
#include "storage/delta_catalog.hpp"

#include "delta_functions.hpp"
#include "duckdb/catalog/catalog_entry/table_function_catalog_entry.hpp"
Expand All @@ -16,13 +17,9 @@
#include "duckdb/planner/binder.hpp"
#include "duckdb/planner/operator/logical_get.hpp"
#include "duckdb/main/query_profiler.hpp"
#include "duckdb/main/client_data.hpp"

#include <duckdb/main/client_data.hpp>
#include <numeric>
#include <regex>
#include <duckdb/main/attached_database.hpp>
#include <duckdb/main/client_data.hpp>
#include <storage/delta_catalog.hpp>

namespace duckdb {

Expand Down Expand Up @@ -241,50 +238,52 @@ static ffi::EngineBuilder *CreateBuilder(ClientContext &context, const string &p
// Here you would need to add the logic for setting the builder options for Azure
// This is just a placeholder and will need to be replaced with the actual logic
if (secret_type == "s3" || secret_type == "gcs" || secret_type == "r2") {

string key_id, secret, session_token, region, endpoint, url_style;
bool use_ssl = true;
secret_reader.TryGetSecretKey("key_id", key_id);
secret_reader.TryGetSecretKey("secret", secret);
secret_reader.TryGetSecretKey("session_token", session_token);
secret_reader.TryGetSecretKey("region", region);
secret_reader.TryGetSecretKey("endpoint", endpoint);
secret_reader.TryGetSecretKey("url_style", url_style);
secret_reader.TryGetSecretKey("use_ssl", use_ssl);

if (key_id.empty() && secret.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("skip_signature"),
KernelUtils::ToDeltaString("true"));
}

if (!key_id.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_access_key_id"),
KernelUtils::ToDeltaString(key_id));
}
if (!secret.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_secret_access_key"),
KernelUtils::ToDeltaString(secret));
}
if (!session_token.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_session_token"),
KernelUtils::ToDeltaString(session_token));
}
if (!endpoint.empty() && endpoint != "s3.amazonaws.com") {
if (!StringUtil::StartsWith(endpoint, "https://") && !StringUtil::StartsWith(endpoint, "http://")) {
if (use_ssl) {
endpoint = "https://" + endpoint;
} else {
endpoint = "http://" + endpoint;
}
}

if (StringUtil::StartsWith(endpoint, "http://")) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("allow_http"),
KernelUtils::ToDeltaString("true"));
}
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"),
KernelUtils::ToDeltaString(endpoint));
}
string key_id, secret, session_token, region, endpoint, url_style;
bool use_ssl = true;
secret_reader.TryGetSecretKey("key_id", key_id);
secret_reader.TryGetSecretKey("secret", secret);
secret_reader.TryGetSecretKey("session_token", session_token);
secret_reader.TryGetSecretKey("region", region);
secret_reader.TryGetSecretKey("endpoint", endpoint);
secret_reader.TryGetSecretKey("url_style", url_style);
secret_reader.TryGetSecretKey("use_ssl", use_ssl);

if (key_id.empty() && secret.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("skip_signature"),
KernelUtils::ToDeltaString("true"));
}

if (!key_id.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_access_key_id"),
KernelUtils::ToDeltaString(key_id));
}
if (!secret.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_secret_access_key"),
KernelUtils::ToDeltaString(secret));
}
if (!session_token.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_session_token"),
KernelUtils::ToDeltaString(session_token));
}
if (!endpoint.empty() && endpoint != "s3.amazonaws.com") {
if (!StringUtil::StartsWith(endpoint, "https://") && !StringUtil::StartsWith(endpoint, "http://")) {
if (use_ssl) {
endpoint = "https://" + endpoint;
} else {
endpoint = "http://" + endpoint;
}
}

if (StringUtil::StartsWith(endpoint, "http://")) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("allow_http"),
KernelUtils::ToDeltaString("true"));
}
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"),
KernelUtils::ToDeltaString(endpoint));
} else if (StringUtil::StartsWith(path, "gs://") || StringUtil::StartsWith(path, "gcs://")) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"),
KernelUtils::ToDeltaString("https://storage.googleapis.com"));
}

ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_region"), KernelUtils::ToDeltaString(region));

Expand Down Expand Up @@ -475,11 +474,6 @@ string DeltaSnapshot::GetFileInternal(idx_t i) {
}
}

// The kernel scan visitor should have resolved a file OR returned
if (i >= resolved_files.size()) {
throw IOException("Delta Kernel seems to have failed to resolve a new file");
}

return resolved_files[i];
}

Expand Down Expand Up @@ -797,7 +791,6 @@ static SelectionVector DuckSVFromDeltaSV(const ffi::KernelBoolSlice &dv, Vector
for (idx_t i = 0; i < count; i++) {
auto row_id = row_ids[data.sel->get_index(i)];

// TODO: why are deletion vectors not spanning whole data?
if (row_id >= dv.len || dv.ptr[row_id]) {
result.data()[current_select] = i;
current_select++;
Expand Down
Loading

0 comments on commit fd9dc3e

Please sign in to comment.