Skip to content

Commit

Permalink
Merge pull request #6 from samansmink/fix-count-star-issue
Browse files Browse the repository at this point in the history
Fix count star issue & unauthorized s3 requests
  • Loading branch information
samansmink authored May 23, 2024
2 parents e5a04cc + dc5ca00 commit d9e5cc1
Show file tree
Hide file tree
Showing 9 changed files with 1,027 additions and 15 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ include_directories(duckdb/third_party/parquet)
include_directories(duckdb/third_party/thrift)

set(EXTENSION_SOURCES
src/inlined_parquet/parquet_extension.cpp
src/delta_extension.cpp
src/delta_functions.cpp
src/functions/delta_scan.cpp)
Expand Down
41 changes: 27 additions & 14 deletions src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "duckdb/function/table_function.hpp"

#include "parquet_override.hpp"
#include "delta_functions.hpp"
#include "functions/delta_scan.hpp"
#include "duckdb/optimizer/filter_combiner.hpp"
Expand Down Expand Up @@ -109,19 +110,21 @@ static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &p
}
const auto &kv_secret = dynamic_cast<const KeyValueSecret &>(*secret_match.secret_entry->secret);

auto key_id = kv_secret.TryGetValue("key_id");
auto secret = kv_secret.TryGetValue("secret");
auto region = kv_secret.TryGetValue("region");
auto endpoint = kv_secret.TryGetValue("endpoint");
auto session_token = kv_secret.TryGetValue("session_token");
auto key_id = kv_secret.TryGetValue("key_id").ToString();
auto secret = kv_secret.TryGetValue("secret").ToString();
auto region = kv_secret.TryGetValue("region").ToString();

if (!key_id.ToString().empty()) {
ffi::set_builder_option(builder, to_delta_string_slice("aws_access_key_id"), to_delta_string_slice(key_id.ToString()));
if (key_id.empty() && secret.empty()) {
ffi::set_builder_option(builder, to_delta_string_slice("skip_signature"), to_delta_string_slice("true"));
}
if (!secret.ToString().empty()) {
ffi::set_builder_option(builder, to_delta_string_slice("aws_secret_access_key"), to_delta_string_slice(secret.ToString()));

if (!key_id.empty()) {
ffi::set_builder_option(builder, to_delta_string_slice("aws_access_key_id"), to_delta_string_slice(key_id));
}
if (!secret.empty()) {
ffi::set_builder_option(builder, to_delta_string_slice("aws_secret_access_key"), to_delta_string_slice(secret));
}
ffi::set_builder_option(builder, to_delta_string_slice("aws_region"), to_delta_string_slice(region.ToString()));
ffi::set_builder_option(builder, to_delta_string_slice("aws_region"), to_delta_string_slice(region));

return builder;
}
Expand Down Expand Up @@ -432,12 +435,19 @@ unique_ptr<MultiFileReaderGlobalState> DeltaMultiFileReader::InitializeGlobalSta
selected_columns.insert({global_name, i});
}

// The hardcoded (for now) columns to be mapped
// TODO: only add file_row_number column if there are deletes
case_insensitive_map_t<LogicalType> columns_to_map = {
{"file_row_number", LogicalType::BIGINT},
{"delta_file_number", LogicalType::UBIGINT}
};

// Add the delta_file_number column to the columns to map
auto demo_gen_col_opt = file_options.custom_options.find("delta_file_number");
if (demo_gen_col_opt != file_options.custom_options.end()) {
if (demo_gen_col_opt->second.GetValue<bool>()) {
columns_to_map.insert({"delta_file_number", LogicalType::UBIGINT});
}
}

// Map every column to either a column in the projection, or add it to the extra columns if it doesn't exist
idx_t col_offset = 0;
for (const auto &required_column : columns_to_map) {
Expand Down Expand Up @@ -578,8 +588,11 @@ TableFunctionSet DeltaFunctions::GetDeltaScanFunction(DatabaseInstance &instance
// The delta_scan function is constructed by grabbing the parquet scan from the Catalog, then injecting the
// DeltaMultiFileReader into it to create a Delta-based multi file read

auto &parquet_scan = ExtensionUtil::GetTableFunction(instance, "parquet_scan");
auto parquet_scan_copy = parquet_scan.functions;
// FIXME revert when all required changes are applied upstream
// auto &parquet_scan = ExtensionUtil::GetTableFunction(instance, "parquet_scan");
// auto parquet_scan_copy = parquet_scan.functions;
auto parquet_scan_copy = ParquetOverrideFunction::GetFunctionSet();

for (auto &function : parquet_scan_copy.functions) {
// Register the MultiFileReader as the driver for reads
function.get_multi_file_reader = DeltaMultiFileReader::CreateInstance;
Expand Down
12 changes: 12 additions & 0 deletions src/include/parquet_override.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#pragma once

#include "duckdb.hpp"

namespace duckdb {

class ParquetOverrideFunction {
public:
static TableFunctionSet GetFunctionSet();
};

} // namespace duckdb
24 changes: 24 additions & 0 deletions src/inlined_parquet/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Overridden Parquet Scan
This exists because some upstream changes in the parquet reader are required right now.


## Changes made
### Fix extra columns not allowed for count(*) queries

In the parquet_extension.cpp `ParquetInitGlobalMethod`
We changed:
```c++
result->projection_ids = input.projection_ids;
```
into
```c++
if (!input.projection_ids.empty()) {
result->projection_ids = input.projection_ids;
} else {
result->projection_ids.resize(input.column_ids.size());
iota(begin(result->projection_ids), end(result->projection_ids), 0);
}
```
to esnure the projections ids are set even if they are trivial: in the case where
the delta extension has added extra columns, we need the trivial projection ids
to ensure the extra columns are removed before leaving the scan
Loading

0 comments on commit d9e5cc1

Please sign in to comment.