Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TPCH correctness #9

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion extension_config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ duckdb_extension_load(delta
)

# Build the httpfs extension to test with s3/http
duckdb_extension_load(httpfs)
duckdb_extension_load(httpfs)

# Build the tpch extension for testing/benchmarking
duckdb_extension_load(tpch)
65 changes: 17 additions & 48 deletions scripts/generate_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,6 @@
BASE_PATH = os.path.dirname(os.path.realpath(__file__)) + "/../data/generated"
TMP_PATH = '/tmp'

# Query to deal with our currently not-implemented types
modified_lineitem_query = """
SELECT
l_orderkey,
l_partkey,
l_suppkey,
l_linenumber,
(l_quantity*100)::INTEGER as l_quantity,
(l_extendedprice*100)::INTEGER as l_extendedprice,
(l_discount*100)::INTEGER as l_discount,
(l_tax*100)::INTEGER as l_tax,
l_returnflag,
l_linestatus,
l_shipdate::VARCHAR as l_shipdate,
l_commitdate::VARCHAR as l_commitdate,
l_receiptdate::VARCHAR as l_receiptdate,
l_shipinstruct,
l_shipmode,
l_comment
FROM
lineitem
"""

def delete_old_files():
if (os.path.isdir(BASE_PATH)):
shutil.rmtree(BASE_PATH)
Expand Down Expand Up @@ -68,7 +45,7 @@ def generate_test_data_delta_rs(path, query, part_column=False):
else:
con.sql(f"COPY test_table to '{generated_path}/duckdb/data.parquet' (FORMAT parquet)")

def generate_test_data_pyspark(current_path, input_path, delete_predicate):
def generate_test_data_pyspark(name, current_path, input_path, delete_predicate = False):
"""
generate_test_data_pyspark generates some test data using pyspark and duckdb

Expand Down Expand Up @@ -97,18 +74,19 @@ def generate_test_data_pyspark(current_path, input_path, delete_predicate):
## DATA GENERATION
# df = spark.read.parquet(input_path)
# df.write.format("delta").mode("overwrite").save(delta_table_path)
spark.sql(f"CREATE TABLE test_table USING delta LOCATION '{delta_table_path}' AS SELECT * FROM parquet.`{input_path}`")
spark.sql(f"CREATE TABLE test_table_{name} USING delta LOCATION '{delta_table_path}' AS SELECT * FROM parquet.`{input_path}`")

## CREATE
## CONFIGURE USAGE OF DELETION VECTORS
spark.sql(f"ALTER TABLE test_table SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);")
spark.sql(f"ALTER TABLE test_table_{name} SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);")

## ADDING DELETES
deltaTable = DeltaTable.forPath(spark, delta_table_path)
deltaTable.delete(delete_predicate)
if delete_predicate:
deltaTable.delete(delete_predicate)

## Writing the
df = spark.table('test_table')
df = spark.table(f'test_table_{name}')
df.write.parquet(parquet_reference_path, mode='overwrite')

# TO CLEAN, uncomment
Expand All @@ -133,36 +111,27 @@ def generate_test_data_pyspark(current_path, input_path, delete_predicate):
query += "CREATE table test_table AS SELECT *, l_orderkey%10 as part from lineitem;"
generate_test_data_delta_rs("lineitem_sf1_10part", query, "part")

### Lineitem_modified SF0.01
query = "call dbgen(sf=0.01);"
query += f"CREATE table test_table AS SELECT *, l_orderkey%10 as part from ({modified_lineitem_query});"
generate_test_data_delta_rs("lineitem_modified_sf0.01", query, "part")

### Lineitem_modified SF1
query = "call dbgen(sf=1);"
query += f"CREATE table test_table AS SELECT *, l_orderkey%10 as part from ({modified_lineitem_query});"
generate_test_data_delta_rs("lineitem_modified_sf1", query, "part")

### Lineitem_modified SF10
query = "call dbgen(sf=10);"
query += f"CREATE table test_table AS SELECT *, l_orderkey%10 as part from ({modified_lineitem_query});"
generate_test_data_delta_rs("lineitem_modified_sf10", query, "part")

## Simple partitioned table with structs
query = "CREATE table test_table AS SELECT {'i':i, 'j':i+1} as value, i%2 as part from range(0,10) tbl(i);"
generate_test_data_delta_rs("simple_partitioned_with_structs", query, "part");

## Simple table with deletion vector
con = duckdb.connect()
con.query(f"COPY (SELECT i as id, ('val' || i::VARCHAR) as value FROM range(0,1000000) tbl(i))TO '{TMP_PATH}/simple_sf1_with_dv.parquet'")
generate_test_data_pyspark('simple_sf1_with_dv', f'{TMP_PATH}/simple_sf1_with_dv.parquet', "id % 1000 = 0")
generate_test_data_pyspark('simple_sf1_with_dv', 'simple_sf1_with_dv', f'{TMP_PATH}/simple_sf1_with_dv.parquet', "id % 1000 = 0")

## Lineitem SF0.01 with deletion vector
con = duckdb.connect()
con.query(f"call dbgen(sf=0.01); COPY ({modified_lineitem_query}) TO '{TMP_PATH}/modified_lineitem_sf0_01.parquet'")
generate_test_data_pyspark('lineitem_sf0_01_with_dv', f'{TMP_PATH}/modified_lineitem_sf0_01.parquet', "l_shipdate = '1994-01-01'")
con.query(f"call dbgen(sf=0.01); COPY (from lineitem) TO '{TMP_PATH}/modified_lineitem_sf0_01.parquet'")
generate_test_data_pyspark('lineitem_sf0_01_with_dv', 'lineitem_sf0_01_with_dv', f'{TMP_PATH}/modified_lineitem_sf0_01.parquet', "l_shipdate = '1994-01-01'")

## Lineitem SF1 with deletion vector
con = duckdb.connect()
con.query(f"call dbgen(sf=1); COPY ({modified_lineitem_query}) TO '{TMP_PATH}/modified_lineitem_sf1.parquet'")
generate_test_data_pyspark('lineitem_sf1_with_dv', f'{TMP_PATH}/modified_lineitem_sf1.parquet', "l_shipdate = '1994-01-01'")
con.query(f"call dbgen(sf=1); COPY (from lineitem) TO '{TMP_PATH}/modified_lineitem_sf1.parquet'")
generate_test_data_pyspark('lineitem_sf1_with_dv', 'lineitem_sf1_with_dv', f'{TMP_PATH}/modified_lineitem_sf1.parquet', "l_shipdate = '1994-01-01'")

## TPCH SF0.01 full dataset
con = duckdb.connect()
con.query(f"call dbgen(sf=0.01); EXPORT DATABASE '{TMP_PATH}/tpch_sf0_01_export' (FORMAT parquet)")
for table in ["customer","lineitem","nation","orders","part","partsupp","region","supplier"]:
generate_test_data_pyspark(f"tpch_sf0_01_{table}", f'tpch_sf0_01/{table}', f'{TMP_PATH}/tpch_sf0_01_export/{table}.parquet')
1 change: 0 additions & 1 deletion src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ unique_ptr<MultiFileList> DeltaSnapshot::ComplexFilterPushdown(ClientContext &co
auto filterstmp = combiner.GenerateTableScanFilters(get.column_ids);

// TODO: can/should we figure out if this filtered anything?
// TODO2: make this copy more efficient? can we move-copy this thing leaving the old one uninitialized?
auto filtered_list = make_uniq<DeltaSnapshot>(context, paths[0]);
filtered_list->table_filters = std::move(filterstmp);
filtered_list->names = names;
Expand Down
13 changes: 9 additions & 4 deletions src/include/delta_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,13 @@ class PredicateVisitor : public ffi::EnginePredicate {
return predicate->VisitFilter(filter.first, *filter.second, state);
};
auto eit = EngineIteratorFromCallable(get_next);
return visit_expression_and(state, &eit);

// TODO: this should be fixed upstream?
try {
return visit_expression_and(state, &eit);
} catch (...) {
return ~0;
}
}

uintptr_t VisitConstantFilter(const string &col_name, const ConstantFilter &filter, ffi::KernelExpressionVisitorState* state) {
Expand All @@ -291,6 +297,7 @@ class PredicateVisitor : public ffi::EnginePredicate {
break; // unsupported type
}

// TODO support other comparison types?
switch (filter.comparison_type) {
case ExpressionType::COMPARE_LESSTHAN:
return visit_expression_lt(state, left, right);
Expand Down Expand Up @@ -327,12 +334,10 @@ class PredicateVisitor : public ffi::EnginePredicate {
switch (filter.filter_type) {
case TableFilterType::CONSTANT_COMPARISON:
return VisitConstantFilter(col_name, static_cast<const ConstantFilter&>(filter), state);

case TableFilterType::CONJUNCTION_AND:
return VisitAndFilter(col_name, static_cast<const ConjunctionAndFilter&>(filter), state);

default:
return ~0; // Unsupported filter
throw NotImplementedException("Attempted to push down unimplemented filter type: '%s'", EnumUtil::ToString(filter.filter_type));
}
}
};
Expand Down
35 changes: 0 additions & 35 deletions test/sql/generated/lineitem_modified.test_slow

This file was deleted.

12 changes: 6 additions & 6 deletions test/sql/generated/partitioned_with_structs.test
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ require delta

require-env GENERATED_DATA_AVAILABLE

query II
query II rowsort
from delta_scan('./data/generated/simple_partitioned_with_structs/delta_lake');
----
{'i': 1, 'j': 2} 1
{'i': 3, 'j': 4} 1
{'i': 5, 'j': 6} 1
{'i': 7, 'j': 8} 1
{'i': 9, 'j': 10} 1
{'i': 0, 'j': 1} 0
{'i': 1, 'j': 2} 1
{'i': 2, 'j': 3} 0
{'i': 3, 'j': 4} 1
{'i': 4, 'j': 5} 0
{'i': 5, 'j': 6} 1
{'i': 6, 'j': 7} 0
{'i': 7, 'j': 8} 1
{'i': 8, 'j': 9} 0
{'i': 9, 'j': 10} 1
Loading
Loading