Skip to content

Commit

Permalink
Column query / thread-safe detoasting
Browse files Browse the repository at this point in the history
* Read columns that are needed for query
* Skip query if any table is from PG_CATALOG_NAMESPACE or PG_TOAST_NAMESPACE
* Thread safe detoasting
  • Loading branch information
mkaruza committed Apr 25, 2024
1 parent d4ffc83 commit 24c6c40
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 56 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ MODULE_big = quack
EXTENSION = quack
DATA = quack.control $(wildcard quack--*.sql)

SRCS = src/quack_heap_seq_scan.cpp \
SRCS = src/quack_detoast.cpp \
src/quack_heap_scan.cpp \
src/quack_heap_seq_scan.cpp \
src/quack_hooks.cpp \
src/quack_select.cpp \
src/quack_types.cpp \
Expand Down
15 changes: 15 additions & 0 deletions include/quack/quack_detoast.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once

#include "duckdb.hpp"

extern "C" {
#include "postgres.h"
}

#include <mutex>

namespace quack {

Datum DetoastPostgresDatum(struct varlena *value, std::mutex &lock, bool *shouldFree);

} // namespace quack
7 changes: 3 additions & 4 deletions include/quack/quack_heap_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ extern "C" {

// Postgres Relation


namespace quack {

struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState {
Expand All @@ -23,15 +22,15 @@ struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState {
~PostgresHeapScanLocalState() override;

public:
PostgresHeapSeqScan & m_rel;
PostgresHeapSeqScan &m_rel;
PostgresHeapSeqScanThreadInfo m_thread_seq_scan_info;
bool m_exhausted_scan = false;
};

// Global State

struct PostgresHeapScanGlobalState : public duckdb::GlobalTableFunctionState {
explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation);
explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation, duckdb::TableFunctionInitInput &input);
~PostgresHeapScanGlobalState();
idx_t
MaxThreads() const override {
Expand Down Expand Up @@ -67,7 +66,7 @@ struct PostgresHeapScanFunction : public duckdb::TableFunction {
// LocalTableFunctionState *lstate, GlobalTableFunctionState *gstate); static double PostgresProgress(ClientContext
// &context, const FunctionData *bind_data_p, const GlobalTableFunctionState *gstate);
static void PostgresHeapScanFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p,
duckdb::DataChunk &output);
duckdb::DataChunk &output);
// static unique_ptr<NodeStatistics> PostgresCardinality(ClientContext &context, const FunctionData *bind_data);
// static idx_t PostgresGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p,
// LocalTableFunctionState *local_state, GlobalTableFunctionState *global_state); static void
Expand Down
31 changes: 19 additions & 12 deletions include/quack/quack_heap_seq_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ extern "C" {
}

#include <mutex>
#include <atomic>

namespace quack {

Expand All @@ -31,18 +32,23 @@ class PostgresHeapSeqScanThreadInfo {
HeapTupleData m_tuple;
};

class PostgresHeapSeqParallelScanState {
public:
PostgresHeapSeqParallelScanState()
: m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_total_row_count(0) {
}
BlockNumber AssignNextBlockNumber();
std::mutex m_lock;
BlockNumber m_nblocks;
BlockNumber m_last_assigned_block_number;
duckdb::map<duckdb::column_t, duckdb::idx_t> m_columns;
duckdb::vector<duckdb::idx_t> m_projections;
duckdb::TableFilterSet *m_filters = nullptr;
std::atomic<std::uint32_t> m_total_row_count;
};

class PostgresHeapSeqScan {
private:
class ParallelScanState {
public:
ParallelScanState() : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber) {
}
BlockNumber AssignNextBlockNumber();
std::mutex m_lock;
BlockNumber m_nblocks;
BlockNumber m_last_assigned_block_number;
};

public:
PostgresHeapSeqScan(RangeTblEntry *table);
~PostgresHeapSeqScan();
Expand All @@ -52,7 +58,8 @@ class PostgresHeapSeqScan {
PostgresHeapSeqScan(PostgresHeapSeqScan &&other);

public:
void InitParallelScanState();
void InitParallelScanState(const duckdb::vector<duckdb::column_t> &columns,
const duckdb::vector<duckdb::idx_t> &projections, duckdb::TableFilterSet *filters);
void
SetSnapshot(Snapshot snapshot) {
m_snapshot = snapshot;
Expand All @@ -70,7 +77,7 @@ class PostgresHeapSeqScan {
private:
Relation m_rel = nullptr;
Snapshot m_snapshot = nullptr;
ParallelScanState m_parallel_scan_state;
PostgresHeapSeqParallelScanState m_parallel_scan_state;
};

} // namespace quack
5 changes: 4 additions & 1 deletion include/quack/quack_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ extern "C" {
#include "executor/tuptable.h"
}

#include "quack/quack_heap_seq_scan.hpp"

namespace quack {
duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type);
void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset);
void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col);
void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, HeapTupleData *slot, idx_t offset);
void InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo,
PostgresHeapSeqParallelScanState &parallelScanState);
} // namespace quack
163 changes: 163 additions & 0 deletions src/quack_detoast.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#include "duckdb.hpp"

extern "C" {
#include "postgres.h"
#include "varatt.h"

#ifdef USE_LZ4
#include <lz4.h>
#endif

#include "access/detoast.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/toast_internals.h"
#include "common/pg_lzcompress.h"
#include "utils/expandeddatum.h"
}

#include "quack/quack_types.hpp"
#include "quack/quack_detoast.hpp"

/*
* Following functions are direct logic found in postgres code but for duckdb execution they are needed to be thread
* safe. Functions as palloc/pfree are exchanged with duckdb_malloc/duckdb_free. Access to toast table is protected with
* lock also for thread safe reasons. This is initial implementation but should be revisisted in future for better
* performances.
*/

namespace quack {

struct varlena *
_pglz_decompress_datum(const struct varlena *value) {
struct varlena *result;
int32 rawsize;

result = (struct varlena *)duckdb_malloc(VARDATA_COMPRESSED_GET_EXTSIZE(value) + VARHDRSZ);

rawsize = pglz_decompress((char *)value + VARHDRSZ_COMPRESSED, VARSIZE(value) - VARHDRSZ_COMPRESSED,
VARDATA(result), VARDATA_COMPRESSED_GET_EXTSIZE(value), true);
if (rawsize < 0)
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg_internal("compressed pglz data is corrupt")));

SET_VARSIZE(result, rawsize + VARHDRSZ);

return result;
}

struct varlena *
_lz4_decompress_datum(const struct varlena *value) {
#ifndef USE_LZ4
NO_LZ4_SUPPORT();
return NULL; /* keep compiler quiet */
#else
int32 rawsize;
struct varlena *result;

result = (struct varlena *)duckdb_malloc(VARDATA_COMPRESSED_GET_EXTSIZE(value) + VARHDRSZ);

rawsize = LZ4_decompress_safe((char *)value + VARHDRSZ_COMPRESSED, VARDATA(result),
VARSIZE(value) - VARHDRSZ_COMPRESSED, VARDATA_COMPRESSED_GET_EXTSIZE(value));
if (rawsize < 0)
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg_internal("compressed lz4 data is corrupt")));

SET_VARSIZE(result, rawsize + VARHDRSZ);

return result;
#endif
}

static struct varlena *
_toast_decompress_datum(struct varlena *attr) {
switch (TOAST_COMPRESS_METHOD(attr)) {
case TOAST_PGLZ_COMPRESSION_ID:
return _pglz_decompress_datum(attr);
case TOAST_LZ4_COMPRESSION_ID:
return _lz4_decompress_datum(attr);
default:
elog(ERROR, "invalid compression method id %d", TOAST_COMPRESS_METHOD(attr));
return NULL; /* keep compiler quiet */
}
}

static struct varlena *
_toast_fetch_datum(struct varlena *attr, std::mutex &lock) {
Relation toastrel;
struct varlena *result;
struct varatt_external toast_pointer;
int32 attrsize;

if (!VARATT_IS_EXTERNAL_ONDISK(attr))
elog(ERROR, "toast_fetch_datum shouldn't be called for non-ondisk datums");

/* Must copy to access aligned fields */
VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);

attrsize = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer);

result = (struct varlena *)duckdb_malloc(attrsize + VARHDRSZ);

if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)) {
SET_VARSIZE_COMPRESSED(result, attrsize + VARHDRSZ);
} else {
SET_VARSIZE(result, attrsize + VARHDRSZ);
}

if (attrsize == 0)
return result;

lock.lock();
toastrel = table_open(toast_pointer.va_toastrelid, AccessShareLock);
table_relation_fetch_toast_slice(toastrel, toast_pointer.va_valueid, attrsize, 0, attrsize, result);
table_close(toastrel, AccessShareLock);
lock.unlock();

return result;
}

Datum
DetoastPostgresDatum(struct varlena *attr, std::mutex &lock, bool *shouldFree) {
struct varlena *toastedValue = nullptr;
*shouldFree = true;
if (VARATT_IS_EXTERNAL_ONDISK(attr)) {
toastedValue = _toast_fetch_datum(attr, lock);
if (VARATT_IS_COMPRESSED(toastedValue)) {
struct varlena *tmp = toastedValue;
toastedValue = _toast_decompress_datum(tmp);
duckdb_free(tmp);
}
} else if (VARATT_IS_EXTERNAL_INDIRECT(attr)) {
struct varatt_indirect redirect;
VARATT_EXTERNAL_GET_POINTER(redirect, attr);
toastedValue = (struct varlena *)redirect.pointer;
toastedValue = reinterpret_cast<struct varlena *>(DetoastPostgresDatum(attr, lock, shouldFree));
if (attr == (struct varlena *)redirect.pointer) {
struct varlena *result;
result = (struct varlena *)(VARSIZE_ANY(attr));
memcpy(result, attr, VARSIZE_ANY(attr));
toastedValue = result;
}
} else if (VARATT_IS_EXTERNAL_EXPANDED(attr)) {
ExpandedObjectHeader *eoh;
Size resultsize;
eoh = DatumGetEOHP(PointerGetDatum(attr));
resultsize = EOH_get_flat_size(eoh);
toastedValue = (struct varlena *)duckdb_malloc(resultsize);
EOH_flatten_into(eoh, (void *)toastedValue, resultsize);
} else if (VARATT_IS_COMPRESSED(attr)) {
toastedValue = _toast_decompress_datum(attr);
} else if (VARATT_IS_SHORT(attr)) {
Size data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
Size new_size = data_size + VARHDRSZ;
toastedValue = (struct varlena *)duckdb_malloc(new_size);
SET_VARSIZE(toastedValue, new_size);
memcpy(VARDATA(toastedValue), VARDATA_SHORT(attr), data_size);
} else {
toastedValue = attr;
*shouldFree = false;
}

return reinterpret_cast<Datum>(toastedValue);
}

} // namespace quack
16 changes: 10 additions & 6 deletions src/quack_heap_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ PostgresHeapScanFunctionData::~PostgresHeapScanFunctionData() {
// PostgresHeapScanGlobalState
//

PostgresHeapScanGlobalState::PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation) {
relation.InitParallelScanState();
PostgresHeapScanGlobalState::PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation,
duckdb::TableFunctionInitInput &input) {
elog(DEBUG3, "-- (DuckDB/PostgresHeapScanGlobalState) Running %lu threads -- ", MaxThreads());
relation.InitParallelScanState(input.column_ids, input.projection_ids, input.filters.get());
}

PostgresHeapScanGlobalState::~PostgresHeapScanGlobalState() {
Expand Down Expand Up @@ -58,7 +59,9 @@ PostgresHeapScanFunction::PostgresHeapScanFunction()
PostgresHeapInitLocal) {
named_parameters["table"] = duckdb::LogicalType::POINTER;
named_parameters["snapshot"] = duckdb::LogicalType::POINTER;
// projection_pushdown = true;
projection_pushdown = true;
// filter_pushdown = true;
// filter_prune = true;
}

duckdb::unique_ptr<duckdb::FunctionData>
Expand Down Expand Up @@ -94,7 +97,7 @@ duckdb::unique_ptr<duckdb::GlobalTableFunctionState>
PostgresHeapScanFunction::PostgresHeapInitGlobal(duckdb::ClientContext &context,
duckdb::TableFunctionInitInput &input) {
auto &bind_data = input.bind_data->CastNoConst<PostgresHeapScanFunctionData>();
return duckdb::make_uniq<PostgresHeapScanGlobalState>(bind_data.m_relation);
return duckdb::make_uniq<PostgresHeapScanGlobalState>(bind_data.m_relation, input);
}

duckdb::unique_ptr<duckdb::LocalTableFunctionState>
Expand Down Expand Up @@ -154,9 +157,10 @@ FindMatchingHeapRelation(List *tables, const duckdb::string &to_find) {
/* This doesn't have an access method handler, we cant read from this */
RelationClose(rel);
return nullptr;
} else {
RelationClose(rel);
return table;
}
RelationClose(rel);
return table;
}
RelationClose(rel);
}
Expand Down
16 changes: 11 additions & 5 deletions src/quack_heap_seq_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,16 @@ PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanIn
}

void
PostgresHeapSeqScan::InitParallelScanState() {
PostgresHeapSeqScan::InitParallelScanState(const duckdb::vector<duckdb::column_t> &columns,
const duckdb::vector<duckdb::idx_t> &projections,
duckdb::TableFilterSet *filters) {
m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel);
/* We need ordered columns ids for tuple fetch */
for(duckdb::idx_t i = 0; i < columns.size(); i++) {
m_parallel_scan_state.m_columns[columns[i]] = i;
}
m_parallel_scan_state.m_projections = projections;
m_parallel_scan_state.m_filters = filters;
}

bool
Expand Down Expand Up @@ -113,9 +121,7 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc
}

pgstat_count_heap_getnext(m_rel);

InsertTupleIntoChunk(output, threadScanInfo.m_tuple_desc, &threadScanInfo.m_tuple,
threadScanInfo.m_output_vector_size);
InsertTupleIntoChunk(output, threadScanInfo, m_parallel_scan_state);
}

/* No more items on current page */
Expand Down Expand Up @@ -150,7 +156,7 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc
}

BlockNumber
PostgresHeapSeqScan::ParallelScanState::AssignNextBlockNumber() {
PostgresHeapSeqParallelScanState::AssignNextBlockNumber() {
m_lock.lock();
BlockNumber block_number = InvalidBlockNumber;
if (m_last_assigned_block_number == InvalidBlockNumber) {
Expand Down
Loading

0 comments on commit 24c6c40

Please sign in to comment.