Skip to content

Commit

Permalink
fix blob type
Browse files Browse the repository at this point in the history
  • Loading branch information
samansmink committed Aug 1, 2024
1 parent d320bb2 commit 6de941e
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 38 deletions.
23 changes: 14 additions & 9 deletions scripts/generate_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
def delete_old_files():
if (os.path.isdir(BASE_PATH)):
shutil.rmtree(BASE_PATH)
def generate_test_data_delta_rs(path, query, part_column=False):
def generate_test_data_delta_rs(path, query, part_column=False, add_golden_table=True):
"""
generate_test_data_delta_rs generates some test data using delta-rs and duckdb
Expand All @@ -38,12 +38,13 @@ def generate_test_data_delta_rs(path, query, part_column=False):
else:
write_deltalake(f"{generated_path}/delta_lake", test_table_df)

# Write DuckDB's reference data
os.mkdir(f'{generated_path}/duckdb')
if (part_column):
con.sql(f"COPY test_table to '{generated_path}/duckdb' (FORMAT parquet, PARTITION_BY {part_column})")
else:
con.sql(f"COPY test_table to '{generated_path}/duckdb/data.parquet' (FORMAT parquet)")
if add_golden_table:
# Write DuckDB's reference data
os.mkdir(f'{generated_path}/duckdb')
if (part_column):
con.sql(f"COPY test_table to '{generated_path}/duckdb' (FORMAT parquet, PARTITION_BY {part_column})")
else:
con.sql(f"COPY test_table to '{generated_path}/duckdb/data.parquet' (FORMAT parquet)")

def generate_test_data_pyspark(name, current_path, input_path, delete_predicate = False):
"""
Expand Down Expand Up @@ -112,14 +113,18 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate
query += "CREATE table test_table AS SELECT *, l_orderkey%10 as part from lineitem;"
generate_test_data_delta_rs("lineitem_sf1_10part", query, "part")

## Simple table with a blob as a value
query = "create table test_table as SELECT encode('ABCDE') as blob, encode('ABCDE') as blob_part, 'ABCDE' as string UNION ALL SELECT encode('😈') as blob, encode('😈') as blob_part, '😈' as string"
generate_test_data_delta_rs("simple_blob_table", query, "blob_part", add_golden_table=False)

## Simple partitioned table with structs
query = "CREATE table test_table AS SELECT {'i':i, 'j':i+1} as value, i%2 as part from range(0,10) tbl(i);"
generate_test_data_delta_rs("simple_partitioned_with_structs", query, "part");
generate_test_data_delta_rs("simple_partitioned_with_structs", query, "part")

## Partitioned table with all types we can file skip on
for type in ["bool", "int", "tinyint", "smallint", "bigint", "float", "double", "varchar"]:
query = f"CREATE table test_table as select i::{type} as value, i::{type} as part from range(0,2) tbl(i)"
generate_test_data_delta_rs(f"test_file_skipping/{type}", query, "part");
generate_test_data_delta_rs(f"test_file_skipping/{type}", query, "part")

## Simple table with deletion vector
con = duckdb.connect()
Expand Down
26 changes: 1 addition & 25 deletions src/delta_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ unique_ptr<SchemaVisitor::FieldList> SchemaVisitor::VisitSnapshotSchema(ffi::Sha
visitor.visit_float = VisitSimpleType<LogicalType::FLOAT>();
visitor.visit_double = VisitSimpleType<LogicalType::DOUBLE>();
visitor.visit_boolean = VisitSimpleType<LogicalType::BOOLEAN>();
visitor.visit_binary = VisitSimpleType<LogicalType::VARCHAR>();
visitor.visit_binary = VisitSimpleType<LogicalType::BLOB>();
visitor.visit_date = VisitSimpleType<LogicalType::DATE>();
visitor.visit_timestamp = VisitSimpleType<LogicalType::TIMESTAMP_TZ>();
visitor.visit_timestamp_ntz = VisitSimpleType<LogicalType::TIMESTAMP>();
Expand Down Expand Up @@ -194,30 +194,6 @@ ffi::EngineIterator EngineIteratorFromCallable(Callable& callable) {
return {&callable, (const void *(*)(void*)) get_next};
};

// Helper function to prevent pushing down filters kernel cant handle
// TODO: remove once kernel handles this properly?
static bool CanHandleFilter(TableFilter *filter) {
switch (filter->filter_type) {
case TableFilterType::CONSTANT_COMPARISON:
return true;
case TableFilterType::IS_NULL:
return true;
case TableFilterType::IS_NOT_NULL:
return true;
case TableFilterType::CONJUNCTION_AND: {
auto &conjunction = static_cast<const ConjunctionAndFilter&>(*filter);
bool can_handle = true;
for (const auto& child : conjunction.child_filters) {
can_handle = can_handle && CanHandleFilter(child.get());
}
return can_handle;
}

default:
return false;
}
}

uintptr_t PredicateVisitor::VisitPredicate(PredicateVisitor* predicate, ffi::KernelExpressionVisitorState* state) {
auto &filters = predicate->column_filters;

Expand Down
10 changes: 7 additions & 3 deletions src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,13 @@ void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_optio
}
auto col_partition_entry = file_metadata->partition_map.find(global_names[col_id]);
if (col_partition_entry != file_metadata->partition_map.end()) {
// Todo: use https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
auto maybe_value = Value(col_partition_entry->second).DefaultCastAs(global_types[col_id]);
reader_data.constant_map.emplace_back(i, maybe_value);
auto &current_type = global_types[col_id];
if (current_type == LogicalType::BLOB) {
reader_data.constant_map.emplace_back(i, Value::BLOB_RAW(col_partition_entry->second));
} else {
auto maybe_value = Value(col_partition_entry->second).DefaultCastAs(current_type);
reader_data.constant_map.emplace_back(i, maybe_value);
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion test/sql/dat/all.test
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/multi_partitioned/delta'
----

query I rowsort multi_partitioned
SELECT letter, date, decode(data) as data, number
SELECT *
FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/multi_partitioned/expected/latest/**/*.parquet')
----

Expand Down
23 changes: 23 additions & 0 deletions test/sql/generated/blob.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# name: test/sql/generated/blob.test
# description: Test the BLOB type
# group: [delta_generated]

require parquet

require delta

require-env GENERATED_DATA_AVAILABLE

query IIIIII
describe select *
from delta_scan('./data/generated/simple_blob_table/delta_lake');
----
blob BLOB YES NULL NULL NULL
blob_part BLOB YES NULL NULL NULL
string VARCHAR YES NULL NULL NULL

query III
from delta_scan('./data/generated/simple_blob_table/delta_lake') order by string;
----
ABCDE ABCDE ABCDE
\xF0\x9F\x98\x88 \xF0\x9F\x98\x88 😈

0 comments on commit 6de941e

Please sign in to comment.