Skip to content

Commit

Permalink
Merge pull request #32 from samansmink/bupm-delta
Browse files Browse the repository at this point in the history
Bump delta extension
  • Loading branch information
samansmink authored Jun 17, 2024
2 parents a2d6601 + 7291aa5 commit 6e72a72
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 25 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ endif()
# Add rust_example as a CMake target
ExternalProject_Add(
${KERNEL_NAME}
GIT_REPOSITORY "https://github.com/delta-incubator/delta-kernel-rs"
GIT_TAG 08f0764a00e89f42136fd478823d28278adc7ee8
GIT_REPOSITORY "https://github.com/nicklan/delta-kernel-rs"
GIT_TAG 181232a45562ca78be763c2f5fb46b88a2463b5c
CONFIGURE_COMMAND ""
UPDATE_COMMAND ""
BUILD_IN_SOURCE 1
Expand Down
5 changes: 0 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ test_debug: export DAT_PATH=./build/debug/rust/src/delta_kernel/acceptance/tests
# Include the Makefile from extension-ci-tools
include extension-ci-tools/makefiles/duckdb_extension.Makefile

reldebug:
mkdir -p build/reldebug && \
cmake $(GENERATOR) $(BUILD_FLAGS) $(EXT_RELEASE_FLAGS) -DCMAKE_BUILD_TYPE=RelWithDebInfo -S ./duckdb/ -B build/reldebug && \
cmake --build build/reldebug --config RelWithDebInfo

# Generate some test data to test with
generate-data:
python3 -m pip install delta-spark duckdb pandas deltalake pyspark delta
Expand Down
16 changes: 14 additions & 2 deletions scripts/generate_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate

## CREATE
## CONFIGURE USAGE OF DELETION VECTORS
spark.sql(f"ALTER TABLE test_table_{name} SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);")
if (delete_predicate):
spark.sql(f"ALTER TABLE test_table_{name} SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);")

## ADDING DELETES
deltaTable = DeltaTable.forPath(spark, delta_table_path)
Expand Down Expand Up @@ -115,6 +116,11 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate
query = "CREATE table test_table AS SELECT {'i':i, 'j':i+1} as value, i%2 as part from range(0,10) tbl(i);"
generate_test_data_delta_rs("simple_partitioned_with_structs", query, "part");

## Partitioned table with all types we can file skip on
for type in ["bool", "int", "tinyint", "smallint", "bigint", "float", "double", "varchar"]:
query = f"CREATE table test_table as select i::{type} as value, i::{type} as part from range(0,2) tbl(i)"
generate_test_data_delta_rs(f"test_file_skipping/{type}", query, "part");

## Simple table with deletion vector
con = duckdb.connect()
con.query(f"COPY (SELECT i as id, ('val' || i::VARCHAR) as value FROM range(0,1000000) tbl(i))TO '{TMP_PATH}/simple_sf1_with_dv.parquet'")
Expand All @@ -136,8 +142,14 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate
for table in ["customer","lineitem","nation","orders","part","partsupp","region","supplier"]:
generate_test_data_pyspark(f"tpch_sf0_01_{table}", f'tpch_sf0_01/{table}', f'{TMP_PATH}/tpch_sf0_01_export/{table}.parquet')

## TPCH SF1 full dataset
con = duckdb.connect()
con.query(f"call dbgen(sf=1); EXPORT DATABASE '{TMP_PATH}/tpch_sf1_export' (FORMAT parquet)")
for table in ["customer","lineitem","nation","orders","part","partsupp","region","supplier"]:
generate_test_data_pyspark(f"tpch_sf1_{table}", f'tpch_sf1/{table}', f'{TMP_PATH}/tpch_sf1_export/{table}.parquet')

## TPCDS SF0.01 full dataset
con = duckdb.connect()
con.query(f"call dsdgen(sf=0.01); EXPORT DATABASE '{TMP_PATH}/tpcds_sf0_01_export' (FORMAT parquet)")
for table in ["call_center","catalog_page","catalog_returns","catalog_sales","customer","customer_demographics","customer_address","date_dim","household_demographics","inventory","income_band","item","promotion","reason","ship_mode","store","store_returns","store_sales","time_dim","warehouse","web_page","web_returns","web_sales","web_site"]:
generate_test_data_pyspark(f"tpcds_sf0_01_{table}", f'tpcds_sf0_01/{table}', f'{TMP_PATH}/tpcds_sf0_01_export/{table}.parquet')
generate_test_data_pyspark(f"tpcds_sf0_01_{table}", f'tpcds_sf0_01/{table}', f'{TMP_PATH}/tpcds_sf0_01_export/{table}.parquet')
47 changes: 41 additions & 6 deletions src/delta_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "duckdb.hpp"
#include "duckdb/main/extension_util.hpp"
#include <duckdb/parser/parsed_data/create_scalar_function_info.hpp>
#include <duckdb/planner/filter/null_filter.hpp>

namespace duckdb {

Expand Down Expand Up @@ -199,6 +200,10 @@ static bool CanHandleFilter(TableFilter *filter) {
switch (filter->filter_type) {
case TableFilterType::CONSTANT_COMPARISON:
return true;
case TableFilterType::IS_NULL:
return true;
case TableFilterType::IS_NOT_NULL:
return true;
case TableFilterType::CONJUNCTION_AND: {
auto &conjunction = static_cast<const ConjunctionAndFilter&>(*filter);
bool can_handle = true;
Expand Down Expand Up @@ -226,7 +231,7 @@ static unordered_map<string, TableFilter*> PrunePredicates(unordered_map<string,
}

uintptr_t PredicateVisitor::VisitPredicate(PredicateVisitor* predicate, ffi::KernelExpressionVisitorState* state) {
auto filters = PrunePredicates(predicate->column_filters);
auto filters = predicate->column_filters;

auto it = filters.begin();
auto end = filters.end();
Expand Down Expand Up @@ -257,16 +262,31 @@ uintptr_t PredicateVisitor::VisitConstantFilter(const string &col_name, const Co
case LogicalType::BIGINT:
right = visit_expression_literal_long(state, BigIntValue::Get(value));
break;


case LogicalType::INTEGER:
right = visit_expression_literal_int(state, IntegerValue::Get(value));
break;
case LogicalType::SMALLINT:
right = visit_expression_literal_short(state, SmallIntValue::Get(value));
break;
case LogicalType::TINYINT:
right = visit_expression_literal_byte(state, TinyIntValue::Get(value));
break;
case LogicalType::FLOAT:
right = visit_expression_literal_float(state, FloatValue::Get(value));
break;
case LogicalType::DOUBLE:
right = visit_expression_literal_double(state, DoubleValue::Get(value));
break;
case LogicalType::BOOLEAN:
right = visit_expression_literal_bool(state, BooleanValue::Get(value));
break;
case LogicalType::VARCHAR: {
// WARNING: C++ lifetime extension rules don't protect calls of the form foo(std::string(...).c_str())
auto str = StringValue::Get(value);
auto maybe_right = ffi::visit_expression_literal_string(state, KernelUtils::ToDeltaString(col_name), DuckDBEngineError::AllocateError);
auto maybe_right = ffi::visit_expression_literal_string(state, KernelUtils::ToDeltaString(str), DuckDBEngineError::AllocateError);
right = KernelUtils::UnpackResult(maybe_right, "VisitConstantFilter failed to visit_expression_literal_string");
break;
}

default:
break; // unsupported type
}
Expand Down Expand Up @@ -299,20 +319,35 @@ uintptr_t PredicateVisitor::VisitAndFilter(const string &col_name, const Conjunc
return 0;
}
auto &child_filter = *it++;

return VisitFilter(col_name, *child_filter, state);
};
auto eit = EngineIteratorFromCallable(get_next);
return visit_expression_and(state, &eit);
}

uintptr_t PredicateVisitor::VisitIsNull(const string &col_name, ffi::KernelExpressionVisitorState *state) {
auto maybe_inner = ffi::visit_expression_column(state, KernelUtils::ToDeltaString(col_name), DuckDBEngineError::AllocateError);
uintptr_t inner = KernelUtils::UnpackResult(maybe_inner, "VisitIsNull failed to visit_expression_column");
return ffi::visit_expression_is_null(state, inner);
}

uintptr_t PredicateVisitor::VisitIsNotNull(const string &col_name, ffi::KernelExpressionVisitorState *state) {
return ffi::visit_expression_not(state, VisitIsNull(col_name, state));
}

uintptr_t PredicateVisitor::VisitFilter(const string &col_name, const TableFilter &filter, ffi::KernelExpressionVisitorState* state) {
switch (filter.filter_type) {
case TableFilterType::CONSTANT_COMPARISON:
return VisitConstantFilter(col_name, static_cast<const ConstantFilter&>(filter), state);
case TableFilterType::CONJUNCTION_AND:
return VisitAndFilter(col_name, static_cast<const ConjunctionAndFilter&>(filter), state);
case TableFilterType::IS_NULL:
return VisitIsNull(col_name, state);
case TableFilterType::IS_NOT_NULL:
return VisitIsNotNull(col_name, state);
default:
throw NotImplementedException("Attempted to push down unimplemented filter type: '%s'", EnumUtil::ToString(filter.filter_type));
return ~0;
}
}

Expand Down
1 change: 0 additions & 1 deletion src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,6 @@ TableFunctionSet DeltaFunctions::GetDeltaScanFunction(DatabaseInstance &instance
function.deserialize = nullptr;
function.statistics = nullptr;
function.table_scan_progress = nullptr;
function.cardinality = nullptr;
function.get_bind_info = nullptr;

// Schema param is just confusing here
Expand Down
5 changes: 5 additions & 0 deletions src/include/delta_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "duckdb/planner/filter/conjunction_filter.hpp"
#include "duckdb/common/enum_util.hpp"
#include <iostream>
#include <duckdb/planner/filter/null_filter.hpp>

// TODO: clean up this file as we go

Expand Down Expand Up @@ -140,6 +141,10 @@ class PredicateVisitor : public ffi::EnginePredicate {

uintptr_t VisitConstantFilter(const string &col_name, const ConstantFilter &filter, ffi::KernelExpressionVisitorState* state);
uintptr_t VisitAndFilter(const string &col_name, const ConjunctionAndFilter &filter, ffi::KernelExpressionVisitorState* state);

uintptr_t VisitIsNull(const string &col_name, ffi::KernelExpressionVisitorState* state);
uintptr_t VisitIsNotNull(const string &col_name, ffi::KernelExpressionVisitorState* state);

uintptr_t VisitFilter(const string &col_name, const TableFilter &filter, ffi::KernelExpressionVisitorState* state);
};

Expand Down
7 changes: 0 additions & 7 deletions test/sql/dat/basic_append.test
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/basic_append/delta')
2
3

# TODO: Figure out what's wrong here
mode skip

# Now we add a filter that filters out one of the files
query II
SELECT letter, number
Expand All @@ -67,8 +64,6 @@ WHERE number < 2
----
a 1

mode unskip

# Now we add a filter that filters out the other file
query III
SELECT a_float, letter, number,
Expand All @@ -77,8 +72,6 @@ WHERE number > 4
----
5.5 e 5

mode skip

# Now we add a filter that filters out all columns
query III
SELECT a_float, number, letter
Expand Down
44 changes: 44 additions & 0 deletions test/sql/generated/file_skipping_all_types.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# name: test/sql/generated/file_skipping_all_types.test
# description: Test filter pushdown succeeds on all file types we can push down
# group: [delta_generated]

require parquet

require delta

require-env GENERATED_DATA_AVAILABLE

# TODO: this doesn't appear to skip files yet
# TODO: add tests once https://github.com/duckdb/duckdb/pull/12488 is available

query I
select value
from delta_scan('./data/generated/test_file_skipping/bool/delta_lake')
where part != false
order by value
----
true

foreach type bool int tinyint smallint bigint varchar

query I
select value
from delta_scan('./data/generated/test_file_skipping/${type}/delta_lake')
where part != 0
order by value
----
1

endloop

foreach type float double

query I
select value
from delta_scan('./data/generated/test_file_skipping/${type}/delta_lake')
where part > 0.5
order by value
----
1.0

endloop

0 comments on commit 6e72a72

Please sign in to comment.