Skip to content

Commit

Permalink
Update to DuckDB 1.2 and fix regression tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mkaruza committed Jan 20, 2025
1 parent 41b8c71 commit a2e7ca3
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 57 deletions.
6 changes: 4 additions & 2 deletions include/pgduckdb/scan/postgres_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState {
}
void ConstructTableScanQuery(const duckdb::TableFunctionInitInput &input);

private:
void ConstructQueryFilter(duckdb::TableFilter *filter, const char *column_name);

public:
Snapshot snapshot;
Relation rel;
Expand Down Expand Up @@ -50,7 +53,6 @@ struct PostgresScanLocalState : public duckdb::LocalTableFunctionState {
struct PostgresScanFunctionData : public duckdb::TableFunctionData {
PostgresScanFunctionData(Relation rel, uint64_t cardinality, Snapshot snapshot);
~PostgresScanFunctionData() override;

duckdb::vector<duckdb::string> complex_filters;
Relation rel;
uint64_t cardinality;
Expand All @@ -74,7 +76,7 @@ struct PostgresScanTableFunction : public duckdb::TableFunction {

static duckdb::unique_ptr<duckdb::NodeStatistics> PostgresScanCardinality(duckdb::ClientContext &context,
const duckdb::FunctionData *data);
static std::string ToString(const duckdb::FunctionData *bind_data);
static duckdb::InsertionOrderPreservingMap<duckdb::string> ToString(duckdb::TableFunctionToStringInput &input);
};

} // namespace pgduckdb
8 changes: 4 additions & 4 deletions src/pgduckdb_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col
break;
}
case TIMESTAMPTZOID: {
duckdb::timestamp_t timestamp = value.GetValue<duckdb::timestamp_t>();
duckdb::timestamp_tz_t timestamp = value.GetValue<duckdb::timestamp_tz_t>();
slot->tts_values[col] = timestamp.value - pgduckdb::PGDUCKDB_DUCK_TIMESTAMP_OFFSET;
break;
}
Expand Down Expand Up @@ -1165,7 +1165,7 @@ ConvertPostgresParameterToDuckValue(Datum value, Oid postgres_type) {
return duckdb::Value::TIMESTAMP(duckdb::timestamp_t(DatumGetTimestamp(value) + PGDUCKDB_DUCK_TIMESTAMP_OFFSET));
case TIMESTAMPTZOID:
return duckdb::Value::TIMESTAMPTZ(
duckdb::timestamp_t(DatumGetTimestampTz(value) + PGDUCKDB_DUCK_TIMESTAMP_OFFSET));
duckdb::timestamp_tz_t(DatumGetTimestampTz(value) + PGDUCKDB_DUCK_TIMESTAMP_OFFSET));
case FLOAT4OID:
return duckdb::Value::FLOAT(DatumGetFloat4(value));
case FLOAT8OID:
Expand Down Expand Up @@ -1228,8 +1228,8 @@ ConvertPostgresToDuckValue(Oid attr_type, Datum value, duckdb::Vector &result, i
result, duckdb::timestamp_t(static_cast<int64_t>(value + PGDUCKDB_DUCK_TIMESTAMP_OFFSET)), offset);
break;
case duckdb::LogicalTypeId::TIMESTAMP_TZ:
Append<duckdb::timestamp_t>(
result, duckdb::timestamp_t(static_cast<int64_t>(value + PGDUCKDB_DUCK_TIMESTAMP_OFFSET)), offset);
Append<duckdb::timestamp_tz_t>(
result, duckdb::timestamp_tz_t(static_cast<int64_t>(value + PGDUCKDB_DUCK_TIMESTAMP_OFFSET)), offset);
break;
case duckdb::LogicalTypeId::FLOAT:
Append<float>(result, DatumGetFloat4(value), offset);
Expand Down
39 changes: 32 additions & 7 deletions src/scan/postgres_scan.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include "duckdb/planner/filter/optional_filter.hpp"

#include "pgduckdb/scan/postgres_scan.hpp"
#include "pgduckdb/scan/postgres_table_reader.hpp"
#include "pgduckdb/pgduckdb_types.hpp"
Expand All @@ -13,6 +15,29 @@ namespace pgduckdb {
// PostgresScanGlobalState
//

void
PostgresScanGlobalState::ConstructQueryFilter(duckdb::TableFilter *filter, const char *column_name) {
switch (filter->filter_type) {
case duckdb::TableFilterType::CONSTANT_COMPARISON:
case duckdb::TableFilterType::IS_NULL:
case duckdb::TableFilterType::IS_NOT_NULL:
case duckdb::TableFilterType::CONJUNCTION_OR:
case duckdb::TableFilterType::CONJUNCTION_AND:
case duckdb::TableFilterType::IN_FILTER:
scan_query << filter->ToString(column_name).c_str();
break;
case duckdb::TableFilterType::OPTIONAL_FILTER: {
auto optional_filter = reinterpret_cast<duckdb::OptionalFilter*>(filter);
ConstructQueryFilter(optional_filter->child_filter.get(), column_name);
break;
}
case duckdb::TableFilterType::STRUCT_EXTRACT:
case duckdb::TableFilterType::DYNAMIC_FILTER:
scan_query << "1 = 1";
break;
}
}

void
PostgresScanGlobalState::ConstructTableScanQuery(const duckdb::TableFunctionInitInput &input) {
/* SELECT COUNT(*) FROM */
Expand Down Expand Up @@ -101,7 +126,7 @@ PostgresScanGlobalState::ConstructTableScanQuery(const duckdb::TableFunctionInit
scan_query << "(";
auto attr = GetAttr(table_tuple_desc, attr_num - 1);
auto col = pgduckdb::QuoteIdentifier(GetAttName(attr));
scan_query << filter->ToString(col).c_str();
ConstructQueryFilter(filter, col);
scan_query << ") ";
}
}
Expand Down Expand Up @@ -157,12 +182,12 @@ PostgresScanTableFunction::PostgresScanTableFunction()
to_string = ToString;
}

std::string
PostgresScanTableFunction::ToString(const duckdb::FunctionData *data) {
auto &bind_data = data->Cast<PostgresScanFunctionData>();
std::ostringstream oss;
oss << "(POSTGRES_SCAN) " << GetRelationName(bind_data.rel);
return oss.str();
duckdb::InsertionOrderPreservingMap<duckdb::string>
PostgresScanTableFunction::ToString(duckdb::TableFunctionToStringInput &input) {
auto &bind_data = input.bind_data->Cast<PostgresScanFunctionData>();
duckdb::InsertionOrderPreservingMap<duckdb::string> result;
result["Table"] = GetRelationName(bind_data.rel);
return result;
}

duckdb::unique_ptr<duckdb::GlobalTableFunctionState>
Expand Down
4 changes: 2 additions & 2 deletions test/regression/expected/duckdb_recycle.out
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ EXPLAIN SELECT count(*) FROM ta;
┌─────────────┴─────────────┐
│ POSTGRES_SCAN │
│ ──────────────────── │
(POSTGRES_SCAN) ta
Table: ta
│ │
│ ~2550 Rows │
└───────────────────────────┘
Expand All @@ -39,7 +39,7 @@ EXPLAIN SELECT count(*) FROM ta;
┌─────────────┴─────────────┐
│ POSTGRES_SCAN │
│ ──────────────────── │
(POSTGRES_SCAN) ta
Table: ta
│ │
│ ~2550 Rows │
└───────────────────────────┘
Expand Down
3 changes: 2 additions & 1 deletion test/regression/expected/execution_error.out
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ INSERT INTO int_as_varchar SELECT * from (
) t(a);
SELECT a::INTEGER FROM int_as_varchar;
ERROR: (PGDuckDB/Duckdb_ExecCustomScan) Conversion Error: Could not convert string 'abc' to INT32
LINE 1: SELECT (a)::integer AS a FROM pgduckdb.public.int...

LINE 1: SELECT (a)::integer AS a FROM pgduckdb.public.int_as_varchar
^
DROP TABLE int_as_varchar;
68 changes: 35 additions & 33 deletions test/regression/expected/scan_postgres_tables.out
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ Parallel Aggregate
SELECT COUNT(a) FROM t1 WHERE a < 10;
DEBUG: (PGDuckDB/PostgresTableReader)

QUERY: SELECT a FROM public.t1 WHERE (a<10 AND a IS NOT NULL)
QUERY: SELECT a FROM public.t1 WHERE (a<10)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Seq Scan on t1
Filter: ((a IS NOT NULL) AND (a < 10))
Filter: (a < 10)

count
-------
Expand All @@ -39,13 +39,13 @@ SET client_min_messages TO DEBUG1;
SELECT COUNT(a) FROM t1 WHERE a < 10;
DEBUG: (PGDuckDB/PostgresTableReader)

QUERY: SELECT a FROM public.t1 WHERE (a<10 AND a IS NOT NULL)
QUERY: SELECT a FROM public.t1 WHERE (a<10)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Bitmap Heap Scan on t1
Recheck Cond: ((a < 10) AND (a IS NOT NULL))
Recheck Cond: (a < 10)
-> Bitmap Index Scan on t1_a_idx
Index Cond: ((a < 10) AND (a IS NOT NULL))
Index Cond: (a < 10)

count
-------
Expand All @@ -57,11 +57,11 @@ SET enable_bitmapscan TO false;
SELECT COUNT(a) FROM t1 WHERE a = 1;
DEBUG: (PGDuckDB/PostgresTableReader)

QUERY: SELECT a FROM public.t1 WHERE (a=1 AND a IS NOT NULL)
QUERY: SELECT a FROM public.t1 WHERE (a=1)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Index Only Scan using t1_a_idx on t1
Index Cond: ((a IS NOT NULL) AND (a = 1))
Index Cond: (a = 1)

count
-------
Expand All @@ -72,11 +72,11 @@ Parallel Index Only Scan using t1_a_idx on t1
SELECT COUNT(c) FROM t1 WHERE a = 1;
DEBUG: (PGDuckDB/PostgresTableReader)

QUERY: SELECT c FROM public.t1 WHERE (a=1 AND a IS NOT NULL)
QUERY: SELECT c FROM public.t1 WHERE (a=1)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Index Scan using t1_a_idx on t1
Index Cond: ((a IS NOT NULL) AND (a = 1))
Index Cond: (a = 1)

count
-------
Expand All @@ -98,11 +98,11 @@ Seq Scan on t2

DEBUG: (PGDuckDB/PostgresTableReader)

QUERY: SELECT a FROM public.t1 WHERE (a>=1 AND a<=3 AND a IS NOT NULL)
QUERY: SELECT a FROM public.t1 WHERE (a>=1 AND a<=3)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Index Only Scan using t1_a_idx on t1
Index Cond: ((a >= 1) AND (a <= 3) AND (a IS NOT NULL))
Index Cond: ((a >= 1) AND (a <= 3))

a | a
---+---
Expand All @@ -115,19 +115,19 @@ Parallel Index Only Scan using t1_a_idx on t1
SELECT COUNT(*) FROM t1 AS t1_1, t1 AS t1_2 WHERE t1_1.a < 2 AND t1_2.a > 8;
DEBUG: (PGDuckDB/PostgresTableReader)

QUERY: SELECT a FROM public.t1 WHERE (a<2 AND a IS NOT NULL)
QUERY: SELECT a FROM public.t1 WHERE (a>8)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Seq Scan on t1
Filter: ((a IS NOT NULL) AND (a < 2))
Filter: (a > 8)

DEBUG: (PGDuckDB/PostgresTableReader)

QUERY: SELECT a FROM public.t1 WHERE (a>8 AND a IS NOT NULL)
QUERY: SELECT a FROM public.t1 WHERE (a<2)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Seq Scan on t1
Filter: ((a IS NOT NULL) AND (a > 8))
Filter: (a < 2)

count
-------
Expand All @@ -139,19 +139,19 @@ SET max_parallel_workers TO 0;
SELECT COUNT(*) FROM t1 AS t1_1, t1 AS t1_2 WHERE t1_1.a < 2 AND t1_2.a > 8;
DEBUG: (PGDuckDB/PostgresTableReader)

QUERY: SELECT a FROM public.t1 WHERE (a<2 AND a IS NOT NULL)
QUERY: SELECT a FROM public.t1 WHERE (a>8)
RUNNING: IN PROCESS THREAD.
EXECUTING:
Parallel Seq Scan on t1
Filter: ((a IS NOT NULL) AND (a < 2))
Filter: (a > 8)

DEBUG: (PGDuckDB/PostgresTableReader)

QUERY: SELECT a FROM public.t1 WHERE (a>8 AND a IS NOT NULL)
QUERY: SELECT a FROM public.t1 WHERE (a<2)
RUNNING: IN PROCESS THREAD.
EXECUTING:
Parallel Seq Scan on t1
Filter: ((a IS NOT NULL) AND (a > 8))
Filter: (a < 2)

count
-------
Expand All @@ -170,11 +170,11 @@ SET client_min_messages TO DEBUG1;
SELECT COUNT(*) FROM partitioned_table WHERE a < 25;
DEBUG: (PGDuckDB/PostgresTableReader)

QUERY: SELECT a FROM public.partitioned_table WHERE (a<25 AND a IS NOT NULL)
QUERY: SELECT a FROM public.partitioned_table WHERE (a<25)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Seq Scan on partition_1 partitioned_table
Filter: ((a IS NOT NULL) AND (a < 25))
Filter: (a < 25)

count
-------
Expand All @@ -184,14 +184,14 @@ Parallel Seq Scan on partition_1 partitioned_table
SELECT COUNT(*) FROM partitioned_table WHERE a < 75;
DEBUG: (PGDuckDB/PostgresTableReader)

QUERY: SELECT a FROM public.partitioned_table WHERE (a<75 AND a IS NOT NULL)
QUERY: SELECT a FROM public.partitioned_table WHERE (a<75)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Append
-> Seq Scan on partition_1 partitioned_table_1
Filter: ((a IS NOT NULL) AND (a < 75))
Filter: (a < 75)
-> Seq Scan on partition_2 partitioned_table_2
Filter: ((a IS NOT NULL) AND (a < 75))
Filter: (a < 75)

count
-------
Expand All @@ -201,12 +201,14 @@ Parallel Append
SELECT COUNT(*) FROM partitioned_table WHERE a < 25 OR a > 75;
DEBUG: (PGDuckDB/PostgresTableReader)

QUERY: SELECT a FROM public.partitioned_table
RUNNING: ON 2 PARALLEL WORKER(S).
QUERY: SELECT a FROM public.partitioned_table WHERE (a<25 OR a>75)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Append
-> Seq Scan on partition_1 partitioned_table_1
Filter: ((a < 25) OR (a > 75))
-> Seq Scan on partition_2 partitioned_table_2
Filter: ((a < 25) OR (a > 75))

count
-------
Expand All @@ -216,12 +218,12 @@ Parallel Append
SELECT COUNT(*) FROM partitioned_table WHERE a < 25 AND b = 1;
DEBUG: (PGDuckDB/PostgresTableReader)

QUERY: SELECT a, b FROM public.partitioned_table WHERE (a<25 AND a IS NOT NULL) AND (b=1 AND b IS NOT NULL)
QUERY: SELECT a, b FROM public.partitioned_table WHERE (a<25) AND (b=1)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Index Scan using partition_1_b_idx on partition_1 partitioned_table
Index Cond: ((b IS NOT NULL) AND (b = 1))
Filter: ((a IS NOT NULL) AND (a < 25))
Index Cond: (b = 1)
Filter: (a < 25)

count
-------
Expand All @@ -231,19 +233,19 @@ Parallel Index Scan using partition_1_b_idx on partition_1 partitioned_table
SELECT COUNT(*) FROM partitioned_table, t2 WHERE partitioned_table.a = t2.a AND partitioned_table.a < 2;
DEBUG: (PGDuckDB/PostgresTableReader)

QUERY: SELECT a FROM public.partitioned_table WHERE (a<2 AND a IS NOT NULL)
QUERY: SELECT a FROM public.partitioned_table WHERE (a<2)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Seq Scan on partition_1 partitioned_table
Filter: ((a IS NOT NULL) AND (a < 2))
Filter: (a < 2)

DEBUG: (PGDuckDB/PostgresTableReader)

QUERY: SELECT a FROM pg_temp.t2 WHERE (a<2 AND a IS NOT NULL AND a>=0 AND a<=1 AND a IS NOT NULL)
QUERY: SELECT a FROM pg_temp.t2 WHERE (a<2 AND a>=0 AND a<=1)
RUNNING: IN PROCESS THREAD.
EXECUTING:
Seq Scan on t2
Filter: ((a IS NOT NULL) AND (a IS NOT NULL) AND (a < 2) AND (a >= 0) AND (a <= 1))
Filter: ((a < 2) AND (a >= 0) AND (a <= 1))

count
-------
Expand Down
Loading

0 comments on commit a2e7ca3

Please sign in to comment.