Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add total files filtered #123

Merged
merged 2 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/generate_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate

## Partitioned table with all types we can file skip on
for type in ["bool", "int", "tinyint", "smallint", "bigint", "float", "double", "varchar"]:
query = f"CREATE table test_table as select i::{type} as value, i::{type} as part from range(0,2) tbl(i)"
query = f"CREATE table test_table as select i::{type} as value1, (i)::{type} as value2, (i)::{type} as value3, i::{type} as part from range(0,5) tbl(i)"
generate_test_data_delta_rs(f"test_file_skipping/{type}", query, "part")

## Simple table with deletion vector
Expand Down
5 changes: 5 additions & 0 deletions src/delta_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ static void LoadInternal(DatabaseInstance &instance) {
// Register the "single table" delta catalog (to ATTACH a single delta table)
auto &config = DBConfig::GetConfig(instance);
config.storage_extensions["delta"] = make_uniq<DeltaStorageExtension>();

config.AddExtensionOption("delta_scan_explain_files_filtered",
"Adds the filtered files to the explain output. Warning: this may change performance of "
"delta scan during explain analyze queries.",
LogicalType::BOOLEAN, Value(true));
}

void DeltaExtension::Load(DuckDB &db) {
Expand Down
55 changes: 52 additions & 3 deletions src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "duckdb/parser/parsed_expression.hpp"
#include "duckdb/planner/binder.hpp"
#include "duckdb/planner/operator/logical_get.hpp"
#include "duckdb/main/query_profiler.hpp"

#include <duckdb/main/client_data.hpp>
#include <numeric>
Expand Down Expand Up @@ -523,19 +524,67 @@ unique_ptr<MultiFileList> DeltaSnapshot::ComplexFilterPushdown(ClientContext &co
return nullptr;
}

for (const auto &filter : filters) {
combiner.AddFilter(filter->Copy());
for (auto riter = filters.rbegin(); riter != filters.rend(); ++riter) {
combiner.AddFilter(riter->get()->Copy());
}

auto filterstmp = combiner.GenerateTableScanFilters(info.column_indexes);

// TODO: can/should we figure out if this filtered anything?
auto filtered_list = make_uniq<DeltaSnapshot>(context, paths[0]);
filtered_list->table_filters = std::move(filterstmp);
filtered_list->names = names;

// Copy over the snapshot, this avoids reparsing metadata
filtered_list->snapshot = snapshot;

auto &profiler = QueryProfiler::Get(context);

// Note: this is potentially quite expensive: we are creating 2 scans of the snapshot and fully materializing both
// file lists Therefore this is only done when profile is enabled. This is enable by default in debug mode or for
// EXPLAIN ANALYZE queries
if (profiler.IsEnabled()) {
Value result;
if (!context.TryGetCurrentSetting("delta_scan_explain_files_filtered", result)) {
throw InternalException("Failed to find 'delta_scan_explain_files_filtered' option!");
} else if (result.GetValue<bool>()) {
auto old_total = GetTotalFileCount();
auto new_total = filtered_list->GetTotalFileCount();

if (old_total != new_total) {
string filters_info;
bool first_item = true;
for (auto &f : filtered_list->table_filters.filters) {
auto &column_index = f.first;
auto &filter = f.second;
if (column_index < names.size()) {
if (!first_item) {
filters_info += "\n";
}
first_item = false;
auto &col_name = names[column_index];
filters_info += filter->ToString(col_name);
}
}

info.extra_info.file_filters = filters_info;
}

if (!info.extra_info.total_files.IsValid()) {
info.extra_info.total_files = old_total;
} else if (info.extra_info.total_files.GetIndex() < old_total) {
throw InternalException(
"Error encountered when analyzing filtered out files for delta scan: total_files inconsistent!");
}

if (!info.extra_info.filtered_files.IsValid() || info.extra_info.filtered_files.GetIndex() >= new_total) {
info.extra_info.filtered_files = new_total;
} else {
throw InternalException(
"Error encountered when analyzing filtered out files for delta scan: filtered_files inconsistent!");
}
}
}

return std::move(filtered_list);
}

Expand Down
134 changes: 112 additions & 22 deletions test/sql/generated/file_skipping_all_types.test
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,127 @@ require delta

require-env GENERATED_DATA_AVAILABLE

# TODO: this doesn't appear to skip files yet
# TODO: add tests once https://github.com/duckdb/duckdb/pull/12488 is available
foreach type float double

query I
select value
from delta_scan('./data/generated/test_file_skipping/bool/delta_lake')
where part != false
order by value
# using <type> column to skip files
query II
EXPLAIN ANALYZE SELECT value1, value2, value3
FROM delta_scan('./data/generated/test_file_skipping/${type}/delta_lake')
WHERE
value1 > 0.5 and
value2 > 2.5 and
value3 < 3.5
----
true
analyzed_plan <REGEX>:.*File Filters:.*value1>0.5.*value2>2.5.*value3<3.5.*Scanning Files: 1/5.*

foreach type bool int tinyint smallint bigint varchar
query III
SELECT value1, value2, value3
FROM delta_scan('./data/generated/test_file_skipping/${type}/delta_lake')
WHERE
value1 > 0.5 and
value2 > 2.5 and
value3 < 3.5
----
3.0 3.0 3.0

query I
select value
from delta_scan('./data/generated/test_file_skipping/${type}/delta_lake')
where part != 0
order by value
# FIXME: Partition columns currently don't cause file skipping yet
query II
EXPLAIN ANALYZE SELECT part
FROM delta_scan('./data/generated/test_file_skipping/${type}/delta_lake')
WHERE part > 0.5
----
1
analyzed_plan <!REGEX>:.*File Filters:.*

endloop

foreach type float double
# use bool column to skip files
query II
EXPLAIN ANALYZE SELECT *
FROM delta_scan('./data/generated/test_file_skipping/bool/delta_lake')
WHERE value1=false
----
analyzed_plan <REGEX>:.*File Filters:.*value1=false.*Scanning Files: 1/2.*

# FIXME: Partition columns currently don't cause file skipping yet
query II
EXPLAIN ANALYZE SELECT part
FROM delta_scan('./data/generated/test_file_skipping/bool/delta_lake')
WHERE part=false
----
analyzed_plan <!REGEX>:.*File Filters:.*

foreach type int tinyint smallint bigint

# using <type> column to skip files
query II
EXPLAIN ANALYZE SELECT value1, value2, value3
FROM delta_scan('./data/generated/test_file_skipping/${type}/delta_lake')
WHERE
value1 > 1 and
value2 > 2 and
value3 < 4
----
analyzed_plan <REGEX>:.*File Filters:.*value1>1.*value2>2.*value3<4.*Scanning Files: 1/5.*

query III
SELECT value1, value2, value3
FROM delta_scan('./data/generated/test_file_skipping/${type}/delta_lake')
WHERE
value1 > 1 and
value2 > 2 and
value3 < 4
----
3 3 3

query I
select value
from delta_scan('./data/generated/test_file_skipping/${type}/delta_lake')
where part > 0.5
order by value
# FIXME: Partition columns currently don't cause file skipping yet
query II
EXPLAIN ANALYZE SELECT part
FROM delta_scan('./data/generated/test_file_skipping/${type}/delta_lake')
WHERE part = 0
----
1.0
analyzed_plan <!REGEX>:.*File Filters:.*

endloop

# using <type> column to skip files
query II
EXPLAIN ANALYZE SELECT value1, value2, value3
FROM delta_scan('./data/generated/test_file_skipping/varchar/delta_lake')
WHERE
value1 = '2' and
value2 = '2' and
value3 = '2'
----
analyzed_plan <REGEX>:.*File Filters:.*value1='2'.*value2='2'.*value3='2'.*Scanning Files: 1/5.*

query III
SELECT value1, value2, value3
FROM delta_scan('./data/generated/test_file_skipping/varchar/delta_lake')
WHERE
value1 = '2' and
value2 = '2' and
value3 = '2'
----
2 2 2

# FIXME: Partition columns currently don't cause file skipping yet
query II
EXPLAIN ANALYZE SELECT part
FROM delta_scan('./data/generated/test_file_skipping/varchar/delta_lake')
WHERE part = '0'
----
analyzed_plan <!REGEX>:.*File Filters:.*

# We can remove this from output if precise operator timing is crucial
statement ok
set delta_scan_explain_files_filtered = false;

query II
EXPLAIN ANALYZE SELECT value1, value2, value3
FROM delta_scan('./data/generated/test_file_skipping/varchar/delta_lake')
WHERE
value1 = '2' and
value2 = '2' and
value3 = '2'
----
analyzed_plan <!REGEX>:.*File Filters:.*
Loading