Skip to content

Commit

Permalink
Projection & column selection
Browse files Browse the repository at this point in the history
* Read columns that are needed for query execution
  • Loading branch information
mkaruza committed Apr 22, 2024
1 parent d4ffc83 commit 111f266
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 38 deletions.
7 changes: 3 additions & 4 deletions include/quack/quack_heap_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ extern "C" {

// Postgres Relation


namespace quack {

struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState {
Expand All @@ -23,15 +22,15 @@ struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState {
~PostgresHeapScanLocalState() override;

public:
PostgresHeapSeqScan & m_rel;
PostgresHeapSeqScan &m_rel;
PostgresHeapSeqScanThreadInfo m_thread_seq_scan_info;
bool m_exhausted_scan = false;
};

// Global State

struct PostgresHeapScanGlobalState : public duckdb::GlobalTableFunctionState {
explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation);
explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation, duckdb::TableFunctionInitInput &input);
~PostgresHeapScanGlobalState();
idx_t
MaxThreads() const override {
Expand Down Expand Up @@ -67,7 +66,7 @@ struct PostgresHeapScanFunction : public duckdb::TableFunction {
// LocalTableFunctionState *lstate, GlobalTableFunctionState *gstate); static double PostgresProgress(ClientContext
// &context, const FunctionData *bind_data_p, const GlobalTableFunctionState *gstate);
static void PostgresHeapScanFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p,
duckdb::DataChunk &output);
duckdb::DataChunk &output);
// static unique_ptr<NodeStatistics> PostgresCardinality(ClientContext &context, const FunctionData *bind_data);
// static idx_t PostgresGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p,
// LocalTableFunctionState *local_state, GlobalTableFunctionState *global_state); static void
Expand Down
6 changes: 5 additions & 1 deletion include/quack/quack_heap_seq_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class PostgresHeapSeqScan {
std::mutex m_lock;
BlockNumber m_nblocks;
BlockNumber m_last_assigned_block_number;
duckdb::vector<duckdb::column_t> m_columns;
duckdb::vector<duckdb::idx_t> m_projections;
duckdb::TableFilterSet *m_filters = nullptr;
};

public:
Expand All @@ -52,7 +55,8 @@ class PostgresHeapSeqScan {
PostgresHeapSeqScan(PostgresHeapSeqScan &&other);

public:
void InitParallelScanState();
void InitParallelScanState(const duckdb::vector<duckdb::column_t> &columns,
const duckdb::vector<duckdb::idx_t> &projections, duckdb::TableFilterSet *filters);
void
SetSnapshot(Snapshot snapshot) {
m_snapshot = snapshot;
Expand Down
4 changes: 3 additions & 1 deletion include/quack/quack_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ namespace quack {
duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type);
void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset);
void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col);
void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, HeapTupleData *slot, idx_t offset);
void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, HeapTupleData *slot, idx_t offset,
duckdb::vector<duckdb::column_t> &columns, duckdb::vector<duckdb::idx_t> &projections,
duckdb::TableFilterSet *filters);
} // namespace quack
18 changes: 11 additions & 7 deletions src/quack_heap_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ PostgresHeapScanFunctionData::~PostgresHeapScanFunctionData() {
// PostgresHeapScanGlobalState
//

PostgresHeapScanGlobalState::PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation) {
relation.InitParallelScanState();
PostgresHeapScanGlobalState::PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation,
duckdb::TableFunctionInitInput &input) {
elog(DEBUG3, "-- (DuckDB/PostgresHeapScanGlobalState) Running %lu threads -- ", MaxThreads());
relation.InitParallelScanState(input.column_ids, input.projection_ids, input.filters.get());
}

PostgresHeapScanGlobalState::~PostgresHeapScanGlobalState() {
Expand Down Expand Up @@ -58,7 +59,9 @@ PostgresHeapScanFunction::PostgresHeapScanFunction()
PostgresHeapInitLocal) {
named_parameters["table"] = duckdb::LogicalType::POINTER;
named_parameters["snapshot"] = duckdb::LogicalType::POINTER;
// projection_pushdown = true;
projection_pushdown = true;
//filter_pushdown = true;
//filter_prune = true;
}

duckdb::unique_ptr<duckdb::FunctionData>
Expand Down Expand Up @@ -94,7 +97,7 @@ duckdb::unique_ptr<duckdb::GlobalTableFunctionState>
PostgresHeapScanFunction::PostgresHeapInitGlobal(duckdb::ClientContext &context,
duckdb::TableFunctionInitInput &input) {
auto &bind_data = input.bind_data->CastNoConst<PostgresHeapScanFunctionData>();
return duckdb::make_uniq<PostgresHeapScanGlobalState>(bind_data.m_relation);
return duckdb::make_uniq<PostgresHeapScanGlobalState>(bind_data.m_relation, input);
}

duckdb::unique_ptr<duckdb::LocalTableFunctionState>
Expand Down Expand Up @@ -153,10 +156,11 @@ FindMatchingHeapRelation(List *tables, const duckdb::string &to_find) {
if (!rel->rd_amhandler || (GetTableAmRoutine(rel->rd_amhandler) != GetHeapamTableAmRoutine())) {
/* This doesn't have an access method handler, we cant read from this */
RelationClose(rel);
return nullptr;
continue;
} else {
RelationClose(rel);
return table;
}
RelationClose(rel);
return table;
}
RelationClose(rel);
}
Expand Down
10 changes: 8 additions & 2 deletions src/quack_heap_seq_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,13 @@ PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanIn
}

void
PostgresHeapSeqScan::InitParallelScanState() {
PostgresHeapSeqScan::InitParallelScanState(const duckdb::vector<duckdb::column_t> &columns,
const duckdb::vector<duckdb::idx_t> &projections,
duckdb::TableFilterSet *filters) {
m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel);
m_parallel_scan_state.m_columns = columns;
m_parallel_scan_state.m_projections = projections;
m_parallel_scan_state.m_filters = filters;
}

bool
Expand Down Expand Up @@ -115,7 +120,8 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc
pgstat_count_heap_getnext(m_rel);

InsertTupleIntoChunk(output, threadScanInfo.m_tuple_desc, &threadScanInfo.m_tuple,
threadScanInfo.m_output_vector_size);
threadScanInfo.m_output_vector_size, m_parallel_scan_state.m_columns,
m_parallel_scan_state.m_projections, m_parallel_scan_state.m_filters);
}

/* No more items on current page */
Expand Down
7 changes: 5 additions & 2 deletions src/quack_select.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace quack {
static duckdb::unique_ptr<duckdb::DuckDB>
quack_open_database() {
duckdb::DBConfig config;
//config.allocator = duckdb::make_uniq<duckdb::Allocator>(QuackAllocate, QuackFree, QuackReallocate, nullptr);
// config.allocator = duckdb::make_uniq<duckdb::Allocator>(QuackAllocate, QuackFree, QuackReallocate, nullptr);
return duckdb::make_uniq<duckdb::DuckDB>(nullptr, &config);
}

Expand Down Expand Up @@ -59,7 +59,10 @@ quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t co
TupleTableSlot *slot = NULL;

// FIXME: try-catch ?
auto res = connection->Query(query_desc->sourceText);

duckdb::unique_ptr<duckdb::MaterializedQueryResult> res = nullptr;

res = connection->Query(query_desc->sourceText);
if (res->HasError()) {
return false;
}
Expand Down
53 changes: 32 additions & 21 deletions src/quack_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) {
}
}

typedef struct HeapTuplePageReadState {
typedef struct HeapTupleReadState {
bool m_slow = 0;
int m_nvalid = 0;
uint32 m_offset = 0;
} HeapTuplePageReadState;
int m_last_tuple_att = 0;
uint32 m_page_tuple_offset = 0;
} HeapTupleReadState;

static Datum
HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePageReadState &heapTupleReadState,
int natts, bool *isNull) {
HeapTupleFetchNextColumnDatum(TupleDesc tupleDesc, HeapTuple tuple, HeapTupleReadState &heapTupleReadState, int attNum,
bool *isNull) {

HeapTupleHeader tup = tuple->t_data;
bool hasnulls = HeapTupleHasNulls(tuple);
Expand All @@ -168,23 +168,21 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage
bool slow = false;
Datum value = (Datum)0;

/* We can only fetch as many attributes as the tuple has. */
natts = Min(HeapTupleHeaderGetNatts(tuple->t_data), natts);
attnum = heapTupleReadState.m_last_tuple_att;

attnum = heapTupleReadState.m_nvalid;
if (attnum == 0) {
/* Start from the first attribute */
off = 0;
heapTupleReadState.m_slow = false;
} else {
/* Restore state from previous execution */
off = heapTupleReadState.m_offset;
off = heapTupleReadState.m_page_tuple_offset;
slow = heapTupleReadState.m_slow;
}

tp = (char *)tup + tup->t_hoff;

for (; attnum < natts; attnum++) {
for (; attnum < attNum; attnum++) {
Form_pg_attribute thisatt = TupleDescAttr(tupleDesc, attnum);

if (hasnulls && att_isnull(attnum, bp)) {
Expand All @@ -199,7 +197,6 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage
if (!slow && thisatt->attcacheoff >= 0) {
off = thisatt->attcacheoff;
} else if (thisatt->attlen == -1) {

if (!slow && off == att_align_nominal(off, thisatt->attalign)) {
thisatt->attcacheoff = off;
} else {
Expand All @@ -208,7 +205,6 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage
}
} else {
off = att_align_nominal(off, thisatt->attalign);

if (!slow) {
thisatt->attcacheoff = off;
}
Expand All @@ -223,8 +219,8 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage
}
}

heapTupleReadState.m_nvalid = attnum;
heapTupleReadState.m_offset = off;
heapTupleReadState.m_last_tuple_att = attNum;
heapTupleReadState.m_page_tuple_offset = off;

if (slow) {
heapTupleReadState.m_slow = true;
Expand All @@ -236,19 +232,34 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage
}

void
InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tupleDesc, HeapTupleData *slot, idx_t offset) {
HeapTuplePageReadState heapTupleReadState = {};
for (int i = 0; i < tupleDesc->natts; i++) {
InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tupleDesc, HeapTupleData *slot, idx_t offset,
duckdb::vector<duckdb::column_t> &columns, duckdb::vector<duckdb::idx_t> &projections,
duckdb::TableFilterSet *filters) {
HeapTupleReadState heapTupleReadState = {};
Datum *values = (Datum *)palloc0(sizeof(Datum) * columns.size());
bool *nulls = (bool *)palloc0(sizeof(bool) * columns.size());
bool skipTuple = false;

for (duckdb::column_t i = 0; i < columns.size(); i++) {
values[i] = HeapTupleFetchNextColumnDatum(tupleDesc, slot, heapTupleReadState, columns[i] + 1, &nulls[i]);
}

duckdb::column_t outputSize = projections.empty() ? columns.size() : projections.size();

/* Append tuple to output vector */
for (duckdb::column_t i = 0; !skipTuple && i < outputSize; i++) {
auto &result = output.data[i];
bool isNull = false;
Datum value = HeapTupleFetchNextDatumValue(tupleDesc, slot, heapTupleReadState, i + 1, &isNull);
if (isNull) {
if (nulls[i]) {
auto &array_mask = duckdb::FlatVector::Validity(result);
array_mask.SetInvalid(offset);
} else {
Datum value = projections.empty() ? values[i] : values[projections[i]];
ConvertPostgresToDuckValue(value, result, offset);
}
}

pfree(values);
pfree(nulls);
}

} // namespace quack

0 comments on commit 111f266

Please sign in to comment.