diff --git a/extension_config.cmake b/extension_config.cmake index 7dc689e..942a16f 100644 --- a/extension_config.cmake +++ b/extension_config.cmake @@ -7,4 +7,7 @@ duckdb_extension_load(delta ) # Build the httpfs extension to test with s3/http -duckdb_extension_load(httpfs) \ No newline at end of file +duckdb_extension_load(httpfs) + +# Build the tpch extension for testing/benchmarking +duckdb_extension_load(tpch) \ No newline at end of file diff --git a/scripts/generate_test_data.py b/scripts/generate_test_data.py index e96019f..fc27f6e 100644 --- a/scripts/generate_test_data.py +++ b/scripts/generate_test_data.py @@ -10,29 +10,6 @@ BASE_PATH = os.path.dirname(os.path.realpath(__file__)) + "/../data/generated" TMP_PATH = '/tmp' -# Query to deal with our currently not-implemented types -modified_lineitem_query = """ -SELECT - l_orderkey, - l_partkey, - l_suppkey, - l_linenumber, - (l_quantity*100)::INTEGER as l_quantity, - (l_extendedprice*100)::INTEGER as l_extendedprice, - (l_discount*100)::INTEGER as l_discount, - (l_tax*100)::INTEGER as l_tax, - l_returnflag, - l_linestatus, - l_shipdate::VARCHAR as l_shipdate, - l_commitdate::VARCHAR as l_commitdate, - l_receiptdate::VARCHAR as l_receiptdate, - l_shipinstruct, - l_shipmode, - l_comment -FROM - lineitem -""" - def delete_old_files(): if (os.path.isdir(BASE_PATH)): shutil.rmtree(BASE_PATH) @@ -68,7 +45,7 @@ def generate_test_data_delta_rs(path, query, part_column=False): else: con.sql(f"COPY test_table to '{generated_path}/duckdb/data.parquet' (FORMAT parquet)") -def generate_test_data_pyspark(current_path, input_path, delete_predicate): +def generate_test_data_pyspark(name, current_path, input_path, delete_predicate = False): """ generate_test_data_pyspark generates some test data using pyspark and duckdb @@ -97,18 +74,19 @@ def generate_test_data_pyspark(current_path, input_path, delete_predicate): ## DATA GENERATION # df = spark.read.parquet(input_path) # df.write.format("delta").mode("overwrite").save(delta_table_path) - spark.sql(f"CREATE TABLE test_table USING delta LOCATION '{delta_table_path}' AS SELECT * FROM parquet.`{input_path}`") + spark.sql(f"CREATE TABLE test_table_{name} USING delta LOCATION '{delta_table_path}' AS SELECT * FROM parquet.`{input_path}`") ## CREATE ## CONFIGURE USAGE OF DELETION VECTORS - spark.sql(f"ALTER TABLE test_table SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);") + spark.sql(f"ALTER TABLE test_table_{name} SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);") ## ADDING DELETES deltaTable = DeltaTable.forPath(spark, delta_table_path) - deltaTable.delete(delete_predicate) + if delete_predicate: + deltaTable.delete(delete_predicate) ## Writing the - df = spark.table('test_table') + df = spark.table(f'test_table_{name}') df.write.parquet(parquet_reference_path, mode='overwrite') # TO CLEAN, uncomment @@ -133,21 +111,6 @@ def generate_test_data_pyspark(current_path, input_path, delete_predicate): query += "CREATE table test_table AS SELECT *, l_orderkey%10 as part from lineitem;" generate_test_data_delta_rs("lineitem_sf1_10part", query, "part") -### Lineitem_modified SF0.01 -query = "call dbgen(sf=0.01);" -query += f"CREATE table test_table AS SELECT *, l_orderkey%10 as part from ({modified_lineitem_query});" -generate_test_data_delta_rs("lineitem_modified_sf0.01", query, "part") - -### Lineitem_modified SF1 -query = "call dbgen(sf=1);" -query += f"CREATE table test_table AS SELECT *, l_orderkey%10 as part from ({modified_lineitem_query});" -generate_test_data_delta_rs("lineitem_modified_sf1", query, "part") - -### Lineitem_modified SF10 -query = "call dbgen(sf=10);" -query += f"CREATE table test_table AS SELECT *, l_orderkey%10 as part from ({modified_lineitem_query});" -generate_test_data_delta_rs("lineitem_modified_sf10", query, "part") - ## Simple partitioned table with structs query = "CREATE table test_table AS SELECT {'i':i, 'j':i+1} as value, i%2 as part from range(0,10) tbl(i);" generate_test_data_delta_rs("simple_partitioned_with_structs", query, "part"); @@ -155,14 +118,20 @@ def generate_test_data_pyspark(current_path, input_path, delete_predicate): ## Simple table with deletion vector con = duckdb.connect() con.query(f"COPY (SELECT i as id, ('val' || i::VARCHAR) as value FROM range(0,1000000) tbl(i))TO '{TMP_PATH}/simple_sf1_with_dv.parquet'") -generate_test_data_pyspark('simple_sf1_with_dv', f'{TMP_PATH}/simple_sf1_with_dv.parquet', "id % 1000 = 0") +generate_test_data_pyspark('simple_sf1_with_dv', 'simple_sf1_with_dv', f'{TMP_PATH}/simple_sf1_with_dv.parquet', "id % 1000 = 0") ## Lineitem SF0.01 with deletion vector con = duckdb.connect() -con.query(f"call dbgen(sf=0.01); COPY ({modified_lineitem_query}) TO '{TMP_PATH}/modified_lineitem_sf0_01.parquet'") -generate_test_data_pyspark('lineitem_sf0_01_with_dv', f'{TMP_PATH}/modified_lineitem_sf0_01.parquet', "l_shipdate = '1994-01-01'") +con.query(f"call dbgen(sf=0.01); COPY (from lineitem) TO '{TMP_PATH}/modified_lineitem_sf0_01.parquet'") +generate_test_data_pyspark('lineitem_sf0_01_with_dv', 'lineitem_sf0_01_with_dv', f'{TMP_PATH}/modified_lineitem_sf0_01.parquet', "l_shipdate = '1994-01-01'") ## Lineitem SF1 with deletion vector con = duckdb.connect() -con.query(f"call dbgen(sf=1); COPY ({modified_lineitem_query}) TO '{TMP_PATH}/modified_lineitem_sf1.parquet'") -generate_test_data_pyspark('lineitem_sf1_with_dv', f'{TMP_PATH}/modified_lineitem_sf1.parquet', "l_shipdate = '1994-01-01'") \ No newline at end of file +con.query(f"call dbgen(sf=1); COPY (from lineitem) TO '{TMP_PATH}/modified_lineitem_sf1.parquet'") +generate_test_data_pyspark('lineitem_sf1_with_dv', 'lineitem_sf1_with_dv', f'{TMP_PATH}/modified_lineitem_sf1.parquet', "l_shipdate = '1994-01-01'") + +## TPCH SF0.01 full dataset +con = duckdb.connect() +con.query(f"call dbgen(sf=0.01); EXPORT DATABASE '{TMP_PATH}/tpch_sf0_01_export' (FORMAT parquet)") +for table in ["customer","lineitem","nation","orders","part","partsupp","region","supplier"]: + generate_test_data_pyspark(f"tpch_sf0_01_{table}", f'tpch_sf0_01/{table}', f'{TMP_PATH}/tpch_sf0_01_export/{table}.parquet') \ No newline at end of file diff --git a/src/functions/delta_scan.cpp b/src/functions/delta_scan.cpp index 77bec7b..8545da5 100644 --- a/src/functions/delta_scan.cpp +++ b/src/functions/delta_scan.cpp @@ -246,7 +246,6 @@ unique_ptr DeltaSnapshot::ComplexFilterPushdown(ClientContext &co auto filterstmp = combiner.GenerateTableScanFilters(get.column_ids); // TODO: can/should we figure out if this filtered anything? - // TODO2: make this copy more efficient? can we move-copy this thing leaving the old one uninitialized? auto filtered_list = make_uniq(context, paths[0]); filtered_list->table_filters = std::move(filterstmp); filtered_list->names = names; diff --git a/src/include/delta_utils.hpp b/src/include/delta_utils.hpp index f0cf962..c7f2274 100644 --- a/src/include/delta_utils.hpp +++ b/src/include/delta_utils.hpp @@ -265,7 +265,13 @@ class PredicateVisitor : public ffi::EnginePredicate { return predicate->VisitFilter(filter.first, *filter.second, state); }; auto eit = EngineIteratorFromCallable(get_next); - return visit_expression_and(state, &eit); + + // TODO: this should be fixed upstream? + try { + return visit_expression_and(state, &eit); + } catch (...) { + return ~0; + } } uintptr_t VisitConstantFilter(const string &col_name, const ConstantFilter &filter, ffi::KernelExpressionVisitorState* state) { @@ -291,6 +297,7 @@ class PredicateVisitor : public ffi::EnginePredicate { break; // unsupported type } + // TODO support other comparison types? switch (filter.comparison_type) { case ExpressionType::COMPARE_LESSTHAN: return visit_expression_lt(state, left, right); @@ -327,12 +334,10 @@ class PredicateVisitor : public ffi::EnginePredicate { switch (filter.filter_type) { case TableFilterType::CONSTANT_COMPARISON: return VisitConstantFilter(col_name, static_cast(filter), state); - case TableFilterType::CONJUNCTION_AND: return VisitAndFilter(col_name, static_cast(filter), state); - default: - return ~0; // Unsupported filter + throw NotImplementedException("Attempted to push down unimplemented filter type: '%s'", EnumUtil::ToString(filter.filter_type)); } } }; diff --git a/test/sql/generated/lineitem_modified.test_slow b/test/sql/generated/lineitem_modified.test_slow deleted file mode 100644 index 31b515a..0000000 --- a/test/sql/generated/lineitem_modified.test_slow +++ /dev/null @@ -1,35 +0,0 @@ -# name: test/sql/generated/lineitem_modified.test_slow -# description: Test on some medium sized data -# group: [delta_generated] - -require parquet - -require delta - -require-env GENERATED_DATA_AVAILABLE - -query I rowsort q1 -SELECT - part, sum(l_extendedprice * l_discount) AS revenue -FROM - delta_scan('./data/generated/lineitem_modified_sf10/delta_lake') -WHERE - l_shipdate::date >= CAST('1994-01-01' AS date) - AND l_shipdate::date < CAST('1995-01-01' AS date) - AND l_discount BETWEEN 5 AND 7 - AND l_quantity < 2400 -GROUP BY part; ----- - -query I rowsort q1 -SELECT - part, sum(l_extendedprice * l_discount) AS revenue -FROM - parquet_scan('./data/generated/lineitem_modified_sf10/duckdb/**/*.parquet') -WHERE - l_shipdate::date >= CAST('1994-01-01' AS date) - AND l_shipdate::date < CAST('1995-01-01' AS date) - AND l_discount BETWEEN 5 AND 7 - AND l_quantity < 2400 -GROUP BY part; ----- diff --git a/test/sql/generated/partitioned_with_structs.test b/test/sql/generated/partitioned_with_structs.test index f1dc5af..bd683fd 100644 --- a/test/sql/generated/partitioned_with_structs.test +++ b/test/sql/generated/partitioned_with_structs.test @@ -8,16 +8,16 @@ require delta require-env GENERATED_DATA_AVAILABLE -query II +query II rowsort from delta_scan('./data/generated/simple_partitioned_with_structs/delta_lake'); ---- -{'i': 1, 'j': 2} 1 -{'i': 3, 'j': 4} 1 -{'i': 5, 'j': 6} 1 -{'i': 7, 'j': 8} 1 -{'i': 9, 'j': 10} 1 {'i': 0, 'j': 1} 0 +{'i': 1, 'j': 2} 1 {'i': 2, 'j': 3} 0 +{'i': 3, 'j': 4} 1 {'i': 4, 'j': 5} 0 +{'i': 5, 'j': 6} 1 {'i': 6, 'j': 7} 0 +{'i': 7, 'j': 8} 1 {'i': 8, 'j': 9} 0 +{'i': 9, 'j': 10} 1 diff --git a/test/sql/generated/simple_partitioned.test b/test/sql/generated/simple_partitioned.test index e4c60ec..ba9f806 100644 --- a/test/sql/generated/simple_partitioned.test +++ b/test/sql/generated/simple_partitioned.test @@ -11,126 +11,134 @@ require-env GENERATED_DATA_AVAILABLE # With a projection and delta constant column query III SELECT delta_file_number, part, i -FROM delta_scan('./data/generated/simple_partitioned/delta_lake', delta_file_number=1); +FROM delta_scan('./data/generated/simple_partitioned/delta_lake', delta_file_number=1) +ORDER BY i ---- -0 0 0 -0 0 2 -0 0 4 -0 0 6 -0 0 8 -1 1 1 -1 1 3 -1 1 5 -1 1 7 -1 1 9 +1 0 0 +0 1 1 +1 0 2 +0 1 3 +1 0 4 +0 1 5 +1 0 6 +0 1 7 +1 0 8 +0 1 9 # Simplest case query II -FROM delta_scan('./data/generated/simple_partitioned/delta_lake/'); +FROM delta_scan('./data/generated/simple_partitioned/delta_lake/') +ORDER BY i ---- 0 0 -2 0 -4 0 -6 0 -8 0 1 1 +2 0 3 1 +4 0 5 1 +6 0 7 1 +8 0 9 1 # With a projection query II SELECT part, i -FROM delta_scan('./data/generated/simple_partitioned/delta_lake'); +FROM delta_scan('./data/generated/simple_partitioned/delta_lake') +ORDER BY i ---- 0 0 -0 2 -0 4 -0 6 -0 8 1 1 +0 2 1 3 +0 4 1 5 +0 6 1 7 +0 8 1 9 # With a projection and delta constant column query III SELECT delta_file_number, part, i -FROM delta_scan('./data/generated/simple_partitioned/delta_lake', delta_file_number=1); +FROM delta_scan('./data/generated/simple_partitioned/delta_lake', delta_file_number=1) +ORDER BY i ---- -0 0 0 -0 0 2 -0 0 4 -0 0 6 -0 0 8 -1 1 1 -1 1 3 -1 1 5 -1 1 7 -1 1 9 +1 0 0 +0 1 1 +1 0 2 +0 1 3 +1 0 4 +0 1 5 +1 0 6 +0 1 7 +1 0 8 +0 1 9 # different permutation query III SELECT part, delta_file_number, i -FROM delta_scan('./data/generated/simple_partitioned/delta_lake', delta_file_number=1); +FROM delta_scan('./data/generated/simple_partitioned/delta_lake', delta_file_number=1) +ORDER BY i ---- -0 0 0 -0 0 2 -0 0 4 -0 0 6 -0 0 8 -1 1 1 -1 1 3 -1 1 5 -1 1 7 -1 1 9 +0 1 0 +1 0 1 +0 1 2 +1 0 3 +0 1 4 +1 0 5 +0 1 6 +1 0 7 +0 1 8 +1 0 9 # different permutation again query III SELECT part, i, delta_file_number -FROM delta_scan('./data/generated/simple_partitioned/delta_lake', delta_file_number=1); +FROM delta_scan('./data/generated/simple_partitioned/delta_lake', delta_file_number=1) +ORDER BY i ---- -0 0 0 -0 2 0 -0 4 0 -0 6 0 -0 8 0 -1 1 1 -1 3 1 -1 5 1 -1 7 1 -1 9 1 +0 0 1 +1 1 0 +0 2 1 +1 3 0 +0 4 1 +1 5 0 +0 6 1 +1 7 0 +0 8 1 +1 9 0 # With a projection and both a base multifilereader column and the file_row_number option query IIII SELECT parse_filename(filename), part, i, file_row_number -FROM delta_scan('./data/generated/simple_partitioned/delta_lake', file_row_number=1, filename=1); +FROM delta_scan('./data/generated/simple_partitioned/delta_lake', file_row_number=1, filename=1) +ORDER BY i ---- -0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 0 0 0 -0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 0 2 1 -0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 0 4 2 -0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 0 6 3 -0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 0 8 4 -0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 1 1 0 -0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 1 3 1 -0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 1 5 2 -0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 1 7 3 -0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 1 9 4 +0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 0 0 0 +0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 1 1 0 +0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 0 2 1 +0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 1 3 1 +0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 0 4 2 +0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 1 5 2 +0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 0 6 3 +0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 1 7 3 +0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 0 8 4 +0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 1 9 4 # Final boss: add the delta_file_number to the mix query IIIII SELECT delta_file_number, parse_filename(filename), part, i, file_row_number -FROM delta_scan('./data/generated/simple_partitioned/delta_lake', file_row_number=1, filename=1, delta_file_number=1); +FROM delta_scan('./data/generated/simple_partitioned/delta_lake', file_row_number=1, filename=1, delta_file_number=1) +ORDER BY i ---- -0 0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 0 0 0 -0 0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 0 2 1 -0 0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 0 4 2 -0 0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 0 6 3 -0 0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 0 8 4 -1 0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 1 1 0 -1 0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 1 3 1 -1 0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 1 5 2 -1 0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 1 7 3 -1 0-acbdd600-ea69-4180-81c7-530d09bcfcfe-0.parquet 1 9 4 +1 0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 0 0 0 +0 0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 1 1 0 +1 0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 0 2 1 +0 0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 1 3 1 +1 0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 0 4 2 +0 0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 1 5 2 +1 0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 0 6 3 +0 0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 1 7 3 +1 0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 0 8 4 +0 0-5d866d26-492b-4edb-9c65-414a26b51b52-0.parquet 1 9 4 diff --git a/test/sql/generated/tpch.test_slow b/test/sql/generated/tpch.test_slow new file mode 100644 index 0000000..195dd32 --- /dev/null +++ b/test/sql/generated/tpch.test_slow @@ -0,0 +1,44 @@ +# name: test/sql/generated/lineitem_modified.test_slow +# description: Test on some medium sized data +# group: [delta_generated] + +require parquet + +require delta + +require tpch + +require-env GENERATED_DATA_AVAILABLE + +# Register delta views +foreach table customer lineitem nation orders part partsupp region supplier + +statement ok +create view ${table}_delta as from delta_scan('./data/generated/tpch_sf0_01/${table}/delta_lake'); + +statement ok +create view ${table}_parquet as from parquet_scan('./data/generated/tpch_sf0_01/${table}/parquet/**/*.parquet'); + +# NOTE: switch this to _parquet to easily compare plans while debugging +statement ok +create view ${table} as from ${table}_delta + +endloop + +loop i 1 9 + +query I +PRAGMA tpch(${i}) +---- +:duckdb/extension/tpch/dbgen/answers/sf0.01/q0${i}.csv + +endloop + +loop i 10 23 + +query I +PRAGMA tpch(${i}) +---- +:duckdb/extension/tpch/dbgen/answers/sf0.01/q${i}.csv + +endloop