Skip to content

Commit

Permalink
Merge pull request #9 from samansmink/improve-type-testing
Browse files Browse the repository at this point in the history
TPCH correctness
  • Loading branch information
samansmink authored May 23, 2024
2 parents d9e5cc1 + 577471d commit 04c61e4
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 171 deletions.
5 changes: 4 additions & 1 deletion extension_config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ duckdb_extension_load(delta
)

# Build the httpfs extension to test with s3/http
duckdb_extension_load(httpfs)
duckdb_extension_load(httpfs)

# Build the tpch extension for testing/benchmarking
duckdb_extension_load(tpch)
65 changes: 17 additions & 48 deletions scripts/generate_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,6 @@
BASE_PATH = os.path.dirname(os.path.realpath(__file__)) + "/../data/generated"
TMP_PATH = '/tmp'

# Query to deal with our currently not-implemented types
modified_lineitem_query = """
SELECT
l_orderkey,
l_partkey,
l_suppkey,
l_linenumber,
(l_quantity*100)::INTEGER as l_quantity,
(l_extendedprice*100)::INTEGER as l_extendedprice,
(l_discount*100)::INTEGER as l_discount,
(l_tax*100)::INTEGER as l_tax,
l_returnflag,
l_linestatus,
l_shipdate::VARCHAR as l_shipdate,
l_commitdate::VARCHAR as l_commitdate,
l_receiptdate::VARCHAR as l_receiptdate,
l_shipinstruct,
l_shipmode,
l_comment
FROM
lineitem
"""

def delete_old_files():
if (os.path.isdir(BASE_PATH)):
shutil.rmtree(BASE_PATH)
Expand Down Expand Up @@ -68,7 +45,7 @@ def generate_test_data_delta_rs(path, query, part_column=False):
else:
con.sql(f"COPY test_table to '{generated_path}/duckdb/data.parquet' (FORMAT parquet)")

def generate_test_data_pyspark(current_path, input_path, delete_predicate):
def generate_test_data_pyspark(name, current_path, input_path, delete_predicate = False):
"""
generate_test_data_pyspark generates some test data using pyspark and duckdb
Expand Down Expand Up @@ -97,18 +74,19 @@ def generate_test_data_pyspark(current_path, input_path, delete_predicate):
## DATA GENERATION
# df = spark.read.parquet(input_path)
# df.write.format("delta").mode("overwrite").save(delta_table_path)
spark.sql(f"CREATE TABLE test_table USING delta LOCATION '{delta_table_path}' AS SELECT * FROM parquet.`{input_path}`")
spark.sql(f"CREATE TABLE test_table_{name} USING delta LOCATION '{delta_table_path}' AS SELECT * FROM parquet.`{input_path}`")

## CREATE
## CONFIGURE USAGE OF DELETION VECTORS
spark.sql(f"ALTER TABLE test_table SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);")
spark.sql(f"ALTER TABLE test_table_{name} SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);")

## ADDING DELETES
deltaTable = DeltaTable.forPath(spark, delta_table_path)
deltaTable.delete(delete_predicate)
if delete_predicate:
deltaTable.delete(delete_predicate)

## Writing the
df = spark.table('test_table')
df = spark.table(f'test_table_{name}')
df.write.parquet(parquet_reference_path, mode='overwrite')

# TO CLEAN, uncomment
Expand All @@ -133,36 +111,27 @@ def generate_test_data_pyspark(current_path, input_path, delete_predicate):
query += "CREATE table test_table AS SELECT *, l_orderkey%10 as part from lineitem;"
generate_test_data_delta_rs("lineitem_sf1_10part", query, "part")

### Lineitem_modified SF0.01
query = "call dbgen(sf=0.01);"
query += f"CREATE table test_table AS SELECT *, l_orderkey%10 as part from ({modified_lineitem_query});"
generate_test_data_delta_rs("lineitem_modified_sf0.01", query, "part")

### Lineitem_modified SF1
query = "call dbgen(sf=1);"
query += f"CREATE table test_table AS SELECT *, l_orderkey%10 as part from ({modified_lineitem_query});"
generate_test_data_delta_rs("lineitem_modified_sf1", query, "part")

### Lineitem_modified SF10
query = "call dbgen(sf=10);"
query += f"CREATE table test_table AS SELECT *, l_orderkey%10 as part from ({modified_lineitem_query});"
generate_test_data_delta_rs("lineitem_modified_sf10", query, "part")

## Simple partitioned table with structs
query = "CREATE table test_table AS SELECT {'i':i, 'j':i+1} as value, i%2 as part from range(0,10) tbl(i);"
generate_test_data_delta_rs("simple_partitioned_with_structs", query, "part");

## Simple table with deletion vector
con = duckdb.connect()
con.query(f"COPY (SELECT i as id, ('val' || i::VARCHAR) as value FROM range(0,1000000) tbl(i))TO '{TMP_PATH}/simple_sf1_with_dv.parquet'")
generate_test_data_pyspark('simple_sf1_with_dv', f'{TMP_PATH}/simple_sf1_with_dv.parquet', "id % 1000 = 0")
generate_test_data_pyspark('simple_sf1_with_dv', 'simple_sf1_with_dv', f'{TMP_PATH}/simple_sf1_with_dv.parquet', "id % 1000 = 0")

## Lineitem SF0.01 with deletion vector
con = duckdb.connect()
con.query(f"call dbgen(sf=0.01); COPY ({modified_lineitem_query}) TO '{TMP_PATH}/modified_lineitem_sf0_01.parquet'")
generate_test_data_pyspark('lineitem_sf0_01_with_dv', f'{TMP_PATH}/modified_lineitem_sf0_01.parquet', "l_shipdate = '1994-01-01'")
con.query(f"call dbgen(sf=0.01); COPY (from lineitem) TO '{TMP_PATH}/modified_lineitem_sf0_01.parquet'")
generate_test_data_pyspark('lineitem_sf0_01_with_dv', 'lineitem_sf0_01_with_dv', f'{TMP_PATH}/modified_lineitem_sf0_01.parquet', "l_shipdate = '1994-01-01'")

## Lineitem SF1 with deletion vector
con = duckdb.connect()
con.query(f"call dbgen(sf=1); COPY ({modified_lineitem_query}) TO '{TMP_PATH}/modified_lineitem_sf1.parquet'")
generate_test_data_pyspark('lineitem_sf1_with_dv', f'{TMP_PATH}/modified_lineitem_sf1.parquet', "l_shipdate = '1994-01-01'")
con.query(f"call dbgen(sf=1); COPY (from lineitem) TO '{TMP_PATH}/modified_lineitem_sf1.parquet'")
generate_test_data_pyspark('lineitem_sf1_with_dv', 'lineitem_sf1_with_dv', f'{TMP_PATH}/modified_lineitem_sf1.parquet', "l_shipdate = '1994-01-01'")

## TPCH SF0.01 full dataset
con = duckdb.connect()
con.query(f"call dbgen(sf=0.01); EXPORT DATABASE '{TMP_PATH}/tpch_sf0_01_export' (FORMAT parquet)")
for table in ["customer","lineitem","nation","orders","part","partsupp","region","supplier"]:
generate_test_data_pyspark(f"tpch_sf0_01_{table}", f'tpch_sf0_01/{table}', f'{TMP_PATH}/tpch_sf0_01_export/{table}.parquet')
1 change: 0 additions & 1 deletion src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ unique_ptr<MultiFileList> DeltaSnapshot::ComplexFilterPushdown(ClientContext &co
auto filterstmp = combiner.GenerateTableScanFilters(get.column_ids);

// TODO: can/should we figure out if this filtered anything?
// TODO2: make this copy more efficient? can we move-copy this thing leaving the old one uninitialized?
auto filtered_list = make_uniq<DeltaSnapshot>(context, paths[0]);
filtered_list->table_filters = std::move(filterstmp);
filtered_list->names = names;
Expand Down
13 changes: 9 additions & 4 deletions src/include/delta_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,13 @@ class PredicateVisitor : public ffi::EnginePredicate {
return predicate->VisitFilter(filter.first, *filter.second, state);
};
auto eit = EngineIteratorFromCallable(get_next);
return visit_expression_and(state, &eit);

// TODO: this should be fixed upstream?
try {
return visit_expression_and(state, &eit);
} catch (...) {
return ~0;
}
}

uintptr_t VisitConstantFilter(const string &col_name, const ConstantFilter &filter, ffi::KernelExpressionVisitorState* state) {
Expand All @@ -291,6 +297,7 @@ class PredicateVisitor : public ffi::EnginePredicate {
break; // unsupported type
}

// TODO support other comparison types?
switch (filter.comparison_type) {
case ExpressionType::COMPARE_LESSTHAN:
return visit_expression_lt(state, left, right);
Expand Down Expand Up @@ -327,12 +334,10 @@ class PredicateVisitor : public ffi::EnginePredicate {
switch (filter.filter_type) {
case TableFilterType::CONSTANT_COMPARISON:
return VisitConstantFilter(col_name, static_cast<const ConstantFilter&>(filter), state);

case TableFilterType::CONJUNCTION_AND:
return VisitAndFilter(col_name, static_cast<const ConjunctionAndFilter&>(filter), state);

default:
return ~0; // Unsupported filter
throw NotImplementedException("Attempted to push down unimplemented filter type: '%s'", EnumUtil::ToString(filter.filter_type));
}
}
};
Expand Down
35 changes: 0 additions & 35 deletions test/sql/generated/lineitem_modified.test_slow

This file was deleted.

12 changes: 6 additions & 6 deletions test/sql/generated/partitioned_with_structs.test
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ require delta

require-env GENERATED_DATA_AVAILABLE

query II
query II rowsort
from delta_scan('./data/generated/simple_partitioned_with_structs/delta_lake');
----
{'i': 1, 'j': 2} 1
{'i': 3, 'j': 4} 1
{'i': 5, 'j': 6} 1
{'i': 7, 'j': 8} 1
{'i': 9, 'j': 10} 1
{'i': 0, 'j': 1} 0
{'i': 1, 'j': 2} 1
{'i': 2, 'j': 3} 0
{'i': 3, 'j': 4} 1
{'i': 4, 'j': 5} 0
{'i': 5, 'j': 6} 1
{'i': 6, 'j': 7} 0
{'i': 7, 'j': 8} 1
{'i': 8, 'j': 9} 0
{'i': 9, 'j': 10} 1
Loading

0 comments on commit 04c61e4

Please sign in to comment.