Skip to content

Commit

Permalink
WIP IndexOnlyScan
Browse files Browse the repository at this point in the history
  • Loading branch information
mkaruza committed Oct 2, 2024
1 parent e889d9a commit e4ac9e9
Show file tree
Hide file tree
Showing 12 changed files with 392 additions and 110 deletions.
10 changes: 8 additions & 2 deletions include/pgduckdb/catalog/pgduckdb_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ class PostgresTable : public TableCatalogEntry {
}

public:
static bool PopulateColumns(CreateTableInfo &info, Oid relid, Snapshot snapshot);
static bool PopulateHeapColumns(CreateTableInfo &info, Oid relid, Snapshot snapshot);
static bool PopulateIndexColumns(CreateTableInfo &info, Oid relid, Path *path, bool indexonly);

protected:
PostgresTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, Cardinality cardinality,
Snapshot snapshot);

private:
static TupleDesc ExecTypeFromTLWithNames(List *target_list, TupleDesc tuple_desc);

protected:
Cardinality cardinality;
Snapshot snapshot;
Expand All @@ -55,7 +59,7 @@ class PostgresHeapTable : public PostgresTable {
class PostgresIndexTable : public PostgresTable {
public:
PostgresIndexTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, Cardinality cardinality,
Snapshot snapshot, Path *path, PlannerInfo *planner_info);
Snapshot snapshot, bool is_indexonly_scan, Path *path, PlannerInfo *planner_info, Oid oid);

public:
// -- Table API --
Expand All @@ -66,6 +70,8 @@ class PostgresIndexTable : public PostgresTable {
private:
Path *path;
PlannerInfo *planner_info;
bool indexonly_scan;
Oid oid;
};

} // namespace duckdb
4 changes: 4 additions & 0 deletions include/pgduckdb/pgduckdb_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,9 @@ void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offse
bool ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col);
void InsertTupleIntoChunk(duckdb::DataChunk &output, duckdb::shared_ptr<PostgresScanGlobalState> scan_global_state,
duckdb::shared_ptr<PostgresScanLocalState> scan_local_state, HeapTupleData *tuple);
void InsertTupleValuesIntoChunk(duckdb::DataChunk &output,
duckdb::shared_ptr<PostgresScanGlobalState> scan_global_state,
duckdb::shared_ptr<PostgresScanLocalState> scan_local_state, Datum *values,
bool *nulls);

} // namespace pgduckdb
18 changes: 9 additions & 9 deletions include/pgduckdb/scan/postgres_index_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ namespace pgduckdb {
// Global State

struct PostgresIndexScanGlobalState : public duckdb::GlobalTableFunctionState {
explicit PostgresIndexScanGlobalState(IndexScanState *index_scan_state, Relation relation,
duckdb::TableFunctionInitInput &input);
explicit PostgresIndexScanGlobalState(IndexOptInfo *index, IndexScanState *index_scan_state, bool indexonly,
Relation relation, duckdb::TableFunctionInitInput &input);
~PostgresIndexScanGlobalState();
idx_t
MaxThreads() const override {
Expand All @@ -27,34 +27,38 @@ struct PostgresIndexScanGlobalState : public duckdb::GlobalTableFunctionState {

public:
duckdb::shared_ptr<PostgresScanGlobalState> m_global_state;
IndexOptInfo *m_index;
IndexScanState *m_index_scan;
bool m_indexonly;
Relation m_relation;
};

// Local State

struct PostgresIndexScanLocalState : public duckdb::LocalTableFunctionState {
public:
PostgresIndexScanLocalState(IndexScanDesc index_scan_desc, Relation relation);
PostgresIndexScanLocalState(IndexScanDesc index_scan_desc, TupleDesc desc, Relation relation);
~PostgresIndexScanLocalState() override;

public:
duckdb::shared_ptr<PostgresScanLocalState> m_local_state;
IndexScanDesc m_index_scan_desc;
Relation m_relation;
TupleTableSlot *m_slot;
TupleTableSlot *m_index_only_slot;
};

// PostgresIndexScanFunctionData

struct PostgresIndexScanFunctionData : public duckdb::TableFunctionData {
public:
PostgresIndexScanFunctionData(uint64_t cardinality, Path *path, PlannerInfo *planner_info, Oid relation_oid,
Snapshot snapshot);
PostgresIndexScanFunctionData(uint64_t cardinality, bool indexonly, Path *path, PlannerInfo *planner_info,
Oid relation_oid, Snapshot Snapshot);
~PostgresIndexScanFunctionData() override;

public:
uint64_t m_cardinality;
bool m_indexonly;
Path *m_path;
PlannerInfo *m_planner_info;
Snapshot m_snapshot;
Expand All @@ -68,10 +72,6 @@ struct PostgresIndexScanFunction : public duckdb::TableFunction {
PostgresIndexScanFunction();

public:
static duckdb::unique_ptr<duckdb::FunctionData>
PostgresIndexScanBind(duckdb::ClientContext &context, duckdb::TableFunctionBindInput &input,
duckdb::vector<duckdb::LogicalType> &return_types, duckdb::vector<duckdb::string> &names);

static duckdb::unique_ptr<duckdb::GlobalTableFunctionState>
PostgresIndexScanInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input);

Expand Down
95 changes: 82 additions & 13 deletions src/catalog/pgduckdb_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ extern "C" {
#include "utils/syscache.h"
#include "access/htup_details.h"
#include "parser/parsetree.h"
#include "postgres.h"
#include "executor/nodeIndexscan.h"
#include "nodes/pathnodes.h"
#include "nodes/execnodes.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "parser/parsetree.h"
#include "utils/rel.h"
}

namespace duckdb {
Expand All @@ -31,23 +39,84 @@ PostgresTable::PostgresTable(Catalog &catalog, SchemaCatalogEntry &schema, Creat
}

bool
PostgresTable::PopulateColumns(CreateTableInfo &info, Oid relid, Snapshot snapshot) {
PostgresTable::PopulateHeapColumns(CreateTableInfo &info, Oid relid, Snapshot snapshot) {
auto rel = RelationIdGetRelation(relid);
auto tupleDesc = RelationGetDescr(rel);
auto tuple_desc = RelationGetDescr(rel);

if (!tuple_desc) {
elog(WARNING, "Failed to get tuple descriptor for relation with OID %u", relid);
RelationClose(rel);
return false;
}

for (int i = 0; i < tuple_desc->natts; i++) {
Form_pg_attribute attr = &tuple_desc->attrs[i];
auto col_name = duckdb::string(NameStr(attr->attname));
auto duck_type = pgduckdb::ConvertPostgresToDuckColumnType(attr);
info.columns.AddColumn(duckdb::ColumnDefinition(col_name, duck_type));
/* Log column name and type */
elog(DEBUG2, "(DuckDB/PopulateHeapColumns) Column name: %s, Type: %s --", col_name.c_str(),
duck_type.ToString().c_str());
}

RelationClose(rel);
return true;
}

/*
* Generate tuple descriptor from target_list with column names matching
* relation original tuple descriptor.
*/
TupleDesc
PostgresTable::ExecTypeFromTLWithNames(List *target_list, TupleDesc tuple_desc) {
TupleDesc type_info;
ListCell *l;
int len;
int cur_resno = 1;

len = ExecTargetListLength(target_list);
type_info = CreateTemplateTupleDesc(len);

foreach (l, target_list) {
TargetEntry *tle = (TargetEntry *)lfirst(l);
const Var *var = (Var *)tle->expr;

TupleDescInitEntry(type_info, cur_resno, tuple_desc->attrs[var->varattno - 1].attname.data,
exprType((Node *)tle->expr), exprTypmod((Node *)tle->expr), 0);

TupleDescInitEntryCollation(type_info, cur_resno, exprCollation((Node *)tle->expr));
cur_resno++;
}

return type_info;
}

bool
PostgresTable::PopulateIndexColumns(CreateTableInfo &info, Oid relid, Path *path, bool indexonly) {
auto rel = RelationIdGetRelation(relid);
auto tuple_desc = RelationGetDescr(rel);

if (indexonly) {
tuple_desc = ExecTypeFromTLWithNames(((IndexPath *)path)->indexinfo->indextlist, RelationGetDescr(rel));
elog(DEBUG2, "(PostgresTable::PopulateIndexColumns) IndexOnlyScan selected");
} else {
tuple_desc = RelationGetDescr(rel);
elog(DEBUG2, "(PostgresTable::PopulateIndexColumns) IndexScan selected");
}

if (!tupleDesc) {
elog(ERROR, "Failed to get tuple descriptor for relation with OID %u", relid);
if (!tuple_desc) {
elog(WARNING, "Failed to get tuple descriptor for relation with OID %u", relid);
RelationClose(rel);
return false;
}

for (int i = 0; i < tupleDesc->natts; i++) {
Form_pg_attribute attr = &tupleDesc->attrs[i];
for (int i = 0; i < tuple_desc->natts; i++) {
Form_pg_attribute attr = &tuple_desc->attrs[i];
auto col_name = duckdb::string(NameStr(attr->attname));
auto duck_type = pgduckdb::ConvertPostgresToDuckColumnType(attr);
info.columns.AddColumn(duckdb::ColumnDefinition(col_name, duck_type));
/* Log column name and type */
elog(DEBUG3, "-- (DuckDB/PostgresHeapBind) Column name: %s, Type: %s --", col_name.c_str(),
elog(DEBUG2, "-- (DuckDB/PopulateIndexColumns) Column name: %s, Type: %s --", col_name.c_str(),
duck_type.ToString().c_str());
}

Expand Down Expand Up @@ -85,9 +154,10 @@ PostgresHeapTable::GetStorageInfo(ClientContext &context) {
//===--------------------------------------------------------------------===//

PostgresIndexTable::PostgresIndexTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info,
Cardinality cardinality, Snapshot snapshot, Path *path,
PlannerInfo *planner_info)
: PostgresTable(catalog, schema, info, cardinality, snapshot), path(path), planner_info(planner_info) {
Cardinality cardinality, Snapshot snapshot, bool is_indexonly_scan, Path *path,
PlannerInfo *planner_info, Oid oid)
: PostgresTable(catalog, schema, info, cardinality, snapshot), path(path), planner_info(planner_info),
indexonly_scan(is_indexonly_scan), oid(oid) {
}

unique_ptr<BaseStatistics>
Expand All @@ -97,9 +167,8 @@ PostgresIndexTable::GetStatistics(ClientContext &context, column_t column_id) {

TableFunction
PostgresIndexTable::GetScanFunction(ClientContext &context, unique_ptr<FunctionData> &bind_data) {
RangeTblEntry *rte = planner_rt_fetch(path->parent->relid, planner_info);
bind_data = duckdb::make_uniq<pgduckdb::PostgresIndexScanFunctionData>(cardinality, path, planner_info, rte->relid,
snapshot);
bind_data = duckdb::make_uniq<pgduckdb::PostgresIndexScanFunctionData>(cardinality, indexonly_scan, path,
planner_info, oid, snapshot);
return pgduckdb::PostgresIndexScanFunction();
}

Expand Down
30 changes: 19 additions & 11 deletions src/catalog/pgduckdb_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ FindMatchingRelEntry(Oid relid, PlannerInfo *planner_info) {
int i = 1;
RelOptInfo *node = nullptr;
for (; i < planner_info->simple_rel_array_size; i++) {
if (planner_info->simple_rte_array[i]->rtekind == RTE_SUBQUERY && planner_info->simple_rel_array[i]) {
if (planner_info->simple_rte_array[i]->rtekind == RTE_SUBQUERY && planner_info->simple_rel_array[i]
&& planner_info->simple_rel_array[i]->subroot) {
node = FindMatchingRelEntry(relid, planner_info->simple_rel_array[i]->subroot);
//node = FindMatchingRelEntry(relid, planner_info->simple_rte_array[i]->subquery);
if (node) {
return node;
}
Expand Down Expand Up @@ -106,27 +108,35 @@ SchemaItems::GetTable(const string &entry_name, PlannerInfo *planner_info) {
ReleaseSysCache(tuple);

Path *node_path = nullptr;
RelOptInfo *node = nullptr;

if (planner_info) {
auto node = FindMatchingRelEntry(rel_oid, planner_info);
if (node) {
node_path = get_cheapest_fractional_path(node, 0.0);
node = FindMatchingRelEntry(rel_oid, planner_info);
ListCell *lc;
/* We should prefer IndexOnlyScan */
foreach (lc, node->pathlist) {
Path *p = (Path *)lfirst(lc);
if (p->pathtype == T_IndexOnlyScan) {
node_path = p;
}
}
if (node && (node_path == nullptr))
node_path = get_cheapest_fractional_path(node, 0.0);
}

unique_ptr<PostgresTable> table;
CreateTableInfo info;
info.table = entry_name;
Cardinality cardinality = node_path ? node_path->rows : 1;
if (IsIndexScan(node_path)) {
RangeTblEntry *rte = planner_rt_fetch(node_path->parent->relid, planner_info);
rel_oid = rte->relid;
if (!PostgresTable::PopulateColumns(info, rel_oid, snapshot)) {
auto is_indexonly_scan = node_path->pathtype == T_IndexOnlyScan;
if (!PostgresTable::PopulateIndexColumns(info, rel_oid, node_path, is_indexonly_scan)) {
return nullptr;
}
table = make_uniq<PostgresIndexTable>(catalog, *schema, info, cardinality, snapshot, node_path, planner_info);
table = make_uniq<PostgresIndexTable>(catalog, *schema, info, cardinality, snapshot, is_indexonly_scan,
node_path, planner_info, rel_oid);
} else {
if (!PostgresTable::PopulateColumns(info, rel_oid, snapshot)) {
if (!PostgresTable::PopulateHeapColumns(info, rel_oid, snapshot)) {
return nullptr;
}
table = make_uniq<PostgresHeapTable>(catalog, *schema, info, cardinality, snapshot, rel_oid);
Expand Down Expand Up @@ -175,5 +185,3 @@ PostgresTransaction::GetCatalogEntry(CatalogType type, const string &schema, con
}

} // namespace duckdb

// namespace duckdb
63 changes: 63 additions & 0 deletions src/pgduckdb_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1084,4 +1084,67 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, duckdb::shared_ptr<PostgresScanG
duckdb_free(nulls);
}

void
InsertTupleValuesIntoChunk(duckdb::DataChunk &output, duckdb::shared_ptr<PostgresScanGlobalState> scan_global_state,
duckdb::shared_ptr<PostgresScanLocalState> scan_local_state, Datum *values, bool *nulls) {

if (scan_global_state->m_count_tuples_only) {
scan_local_state->m_output_vector_size++;
return;
}

bool valid_tuple = true;

/* First we are fetching all required columns ordered by column id
* and than we need to write this tuple into output vector. Output column id list
* could be out of order so we need to match column values from ordered list.
*/

/* Read heap tuple with all required columns. */
for (auto const &[columnIdx, valueIdx] : scan_global_state->m_read_columns_ids) {
if (scan_global_state->m_filters &&
(scan_global_state->m_filters->filters.find(valueIdx) != scan_global_state->m_filters->filters.end())) {
auto &filter = scan_global_state->m_filters->filters[valueIdx];
valid_tuple = ApplyValueFilter(*filter, values[valueIdx], nulls[valueIdx],
scan_global_state->m_tuple_desc->attrs[columnIdx].atttypid);
}

if (!valid_tuple) {
break;
}
}

/* Write tuple columns in output vector. */
for (idx_t idx = 0; valid_tuple && idx < scan_global_state->m_output_columns_ids.size(); idx++) {
auto &result = output.data[idx];
if (nulls[idx]) {
auto &array_mask = duckdb::FlatVector::Validity(result);
array_mask.SetInvalid(scan_local_state->m_output_vector_size);
} else {
idx_t output_column_idx =
scan_global_state->m_read_columns_ids[scan_global_state->m_output_columns_ids[idx]];
if (scan_global_state->m_tuple_desc->attrs[scan_global_state->m_output_columns_ids[idx]].attlen == -1) {
bool should_free = false;
values[output_column_idx] =
DetoastPostgresDatum(reinterpret_cast<varlena *>(values[output_column_idx]), &should_free);
ConvertPostgresToDuckValue(values[output_column_idx], result, scan_local_state->m_output_vector_size);
if (should_free) {
duckdb_free(reinterpret_cast<void *>(values[output_column_idx]));
}
} else {
ConvertPostgresToDuckValue(values[output_column_idx], result, scan_local_state->m_output_vector_size);
}
}
}

if (valid_tuple) {
scan_local_state->m_output_vector_size++;
scan_global_state->m_total_row_count++;
}

if (!(scan_global_state->m_total_row_count % 100000)) {
elog(WARNING, "JOE %d", scan_global_state->m_total_row_count.load());
}
}

} // namespace pgduckdb
Loading

0 comments on commit e4ac9e9

Please sign in to comment.