Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Column selection / Filtering / Projections #15

Merged
merged 5 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ MODULE_big = quack
EXTENSION = quack
DATA = quack.control $(wildcard quack--*.sql)

SRCS = src/quack_heap_seq_scan.cpp \
SRCS = src/quack_detoast.cpp \
src/quack_filter.cpp \
src/quack_heap_scan.cpp \
src/quack_heap_seq_scan.cpp \
src/quack_hooks.cpp \
src/quack_select.cpp \
src/quack_types.cpp \
Expand Down Expand Up @@ -34,7 +36,7 @@ else
QUACK_BUILD_DUCKDB = release
endif

override PG_CPPFLAGS += -Iinclude -Ithird_party/duckdb/src/include -std=c++17 ${QUACK_BUILD_CXX_FLAGS}
override PG_CPPFLAGS += -Iinclude -Ithird_party/duckdb/src/include -std=c++17 -Wno-sign-compare ${QUACK_BUILD_CXX_FLAGS}

SHLIB_LINK += -Wl,-rpath,$(PG_LIB)/ -lpq -L$(PG_LIB) -lduckdb -Lthird_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src -lstdc++

Expand Down
15 changes: 15 additions & 0 deletions include/quack/quack_detoast.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once

#include "duckdb.hpp"

extern "C" {
#include "postgres.h"
}

#include <mutex>

namespace quack {

Datum DetoastPostgresDatum(struct varlena *value, std::mutex &lock, bool *shouldFree);

} // namespace quack
11 changes: 11 additions & 0 deletions include/quack/quack_filter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#pragma once

#include "duckdb.hpp"

extern "C" {
#include "postgres.h"
}

namespace quack {
bool ApplyValueFilter(duckdb::TableFilter &filter, Datum &value, bool isNull, Oid typeOid);
} // namespace quack
7 changes: 3 additions & 4 deletions include/quack/quack_heap_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ extern "C" {

// Postgres Relation


namespace quack {

struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState {
Expand All @@ -23,15 +22,15 @@ struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState {
~PostgresHeapScanLocalState() override;

public:
PostgresHeapSeqScan & m_rel;
PostgresHeapSeqScan &m_rel;
PostgresHeapSeqScanThreadInfo m_thread_seq_scan_info;
bool m_exhausted_scan = false;
};

// Global State

struct PostgresHeapScanGlobalState : public duckdb::GlobalTableFunctionState {
explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation);
explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation, duckdb::TableFunctionInitInput &input);
~PostgresHeapScanGlobalState();
idx_t
MaxThreads() const override {
Expand Down Expand Up @@ -67,7 +66,7 @@ struct PostgresHeapScanFunction : public duckdb::TableFunction {
// LocalTableFunctionState *lstate, GlobalTableFunctionState *gstate); static double PostgresProgress(ClientContext
// &context, const FunctionData *bind_data_p, const GlobalTableFunctionState *gstate);
static void PostgresHeapScanFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p,
duckdb::DataChunk &output);
duckdb::DataChunk &output);
// static unique_ptr<NodeStatistics> PostgresCardinality(ClientContext &context, const FunctionData *bind_data);
// static idx_t PostgresGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p,
// LocalTableFunctionState *local_state, GlobalTableFunctionState *global_state); static void
Expand Down
43 changes: 31 additions & 12 deletions include/quack/quack_heap_seq_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ extern "C" {
#include "postgres.h"
#include "access/tableam.h"
#include "access/heapam.h"
#include "storage/bufmgr.h"
}

#include <mutex>
#include <atomic>

namespace quack {

Expand All @@ -31,18 +33,35 @@ class PostgresHeapSeqScanThreadInfo {
HeapTupleData m_tuple;
};

class PostgresHeapSeqScan {
class PostgresHeapSeqParallelScanState {
private:
class ParallelScanState {
public:
ParallelScanState() : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber) {
}
BlockNumber AssignNextBlockNumber();
std::mutex m_lock;
BlockNumber m_nblocks;
BlockNumber m_last_assigned_block_number;
};
static int const k_max_prefetch_block_number = 32;

public:
PostgresHeapSeqParallelScanState()
: m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_count_tuples_only(false),
m_total_row_count(0), m_last_prefetch_block(0), m_strategy(nullptr) {
}
~PostgresHeapSeqParallelScanState() {
if (m_strategy)
pfree(m_strategy);
}
BlockNumber AssignNextBlockNumber();
void PrefetchNextRelationPages(Relation rel);
std::mutex m_lock;
BlockNumber m_nblocks;
BlockNumber m_last_assigned_block_number;
bool m_count_tuples_only;
duckdb::map<duckdb::idx_t, duckdb::idx_t> m_columns;
duckdb::map<duckdb::idx_t, duckdb::idx_t> m_projections;
duckdb::TableFilterSet *m_filters = nullptr;
std::atomic<std::uint32_t> m_total_row_count;
BlockNumber m_last_prefetch_block;
BufferAccessStrategy m_strategy;
};

class PostgresHeapSeqScan {
private:
public:
PostgresHeapSeqScan(RangeTblEntry *table);
~PostgresHeapSeqScan();
Expand All @@ -52,7 +71,7 @@ class PostgresHeapSeqScan {
PostgresHeapSeqScan(PostgresHeapSeqScan &&other);

public:
void InitParallelScanState();
void InitParallelScanState( duckdb::TableFunctionInitInput &input);
void
SetSnapshot(Snapshot snapshot) {
m_snapshot = snapshot;
Expand All @@ -70,7 +89,7 @@ class PostgresHeapSeqScan {
private:
Relation m_rel = nullptr;
Snapshot m_snapshot = nullptr;
ParallelScanState m_parallel_scan_state;
PostgresHeapSeqParallelScanState m_parallel_scan_state;
};

} // namespace quack
1 change: 0 additions & 1 deletion include/quack/quack_select.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

extern "C" {
#include "postgres.h"

#include "executor/executor.h"
}

Expand Down
5 changes: 4 additions & 1 deletion include/quack/quack_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ extern "C" {
#include "executor/tuptable.h"
}

#include "quack/quack_heap_seq_scan.hpp"

namespace quack {
duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type);
void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset);
void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col);
void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, HeapTupleData *slot, idx_t offset);
void InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo,
PostgresHeapSeqParallelScanState &parallelScanState);
} // namespace quack
165 changes: 165 additions & 0 deletions src/quack_detoast.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#include "duckdb.hpp"

extern "C" {
#include "postgres.h"
#include "pg_config.h"
#include "varatt.h"

#ifdef USE_LZ4
#include <lz4.h>
#endif

#include "access/detoast.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/toast_internals.h"
#include "common/pg_lzcompress.h"
#include "utils/expandeddatum.h"
}

#include "quack/quack_types.hpp"
#include "quack/quack_detoast.hpp"

/*
* Following functions are direct logic found in postgres code but for duckdb execution they are needed to be thread
* safe. Functions as palloc/pfree are exchanged with duckdb_malloc/duckdb_free. Access to toast table is protected with
* lock also for thread safe reasons. This is initial implementation but should be revisisted in future for better
* performances.
*/

namespace quack {

struct varlena *
_pglz_decompress_datum(const struct varlena *value) {
struct varlena *result;
int32 rawsize;

result = (struct varlena *)duckdb_malloc(VARDATA_COMPRESSED_GET_EXTSIZE(value) + VARHDRSZ);
Copy link
Collaborator

@Tishj Tishj Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duckdb_malloc/free are just wrappers around malloc and free, they don't serve an added benefit
Also these methods are part of the C api, which we're not using here

void *duckdb_malloc(size_t size) {
	return malloc(size);
}

void duckdb_free(void *ptr) {
	free(ptr);
}


rawsize = pglz_decompress((char *)value + VARHDRSZ_COMPRESSED, VARSIZE(value) - VARHDRSZ_COMPRESSED,
VARDATA(result), VARDATA_COMPRESSED_GET_EXTSIZE(value), true);
if (rawsize < 0)
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg_internal("compressed pglz data is corrupt")));

SET_VARSIZE(result, rawsize + VARHDRSZ);

return result;
}

struct varlena *
_lz4_decompress_datum(const struct varlena *value) {
#ifndef USE_LZ4
return NULL; /* keep compiler quiet */
#else
int32 rawsize;
struct varlena *result;

result = (struct varlena *)duckdb_malloc(VARDATA_COMPRESSED_GET_EXTSIZE(value) + VARHDRSZ);

rawsize = LZ4_decompress_safe((char *)value + VARHDRSZ_COMPRESSED, VARDATA(result),
VARSIZE(value) - VARHDRSZ_COMPRESSED, VARDATA_COMPRESSED_GET_EXTSIZE(value));
if (rawsize < 0)
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg_internal("compressed lz4 data is corrupt")));

SET_VARSIZE(result, rawsize + VARHDRSZ);

return result;
#endif
}

static struct varlena *
_toast_decompress_datum(struct varlena *attr) {
ToastCompressionId cmid;
cmid = (ToastCompressionId)TOAST_COMPRESS_METHOD(attr);
switch (cmid) {
case TOAST_PGLZ_COMPRESSION_ID:
return _pglz_decompress_datum(attr);
case TOAST_LZ4_COMPRESSION_ID:
return _lz4_decompress_datum(attr);
default:
elog(ERROR, "invalid compression method id %d", TOAST_COMPRESS_METHOD(attr));
return NULL; /* keep compiler quiet */
}
}

static struct varlena *
_toast_fetch_datum(struct varlena *attr, std::mutex &lock) {
Relation toastrel;
struct varlena *result;
struct varatt_external toast_pointer;
int32 attrsize;

if (!VARATT_IS_EXTERNAL_ONDISK(attr))
elog(ERROR, "toast_fetch_datum shouldn't be called for non-ondisk datums");

/* Must copy to access aligned fields */
VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);

attrsize = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer);

result = (struct varlena *)duckdb_malloc(attrsize + VARHDRSZ);

if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)) {
SET_VARSIZE_COMPRESSED(result, attrsize + VARHDRSZ);
} else {
SET_VARSIZE(result, attrsize + VARHDRSZ);
}

if (attrsize == 0)
return result;

lock.lock();
toastrel = table_open(toast_pointer.va_toastrelid, AccessShareLock);
table_relation_fetch_toast_slice(toastrel, toast_pointer.va_valueid, attrsize, 0, attrsize, result);
table_close(toastrel, AccessShareLock);
lock.unlock();

return result;
}

Datum
DetoastPostgresDatum(struct varlena *attr, std::mutex &lock, bool *shouldFree) {
struct varlena *toastedValue = nullptr;
*shouldFree = true;
if (VARATT_IS_EXTERNAL_ONDISK(attr)) {
toastedValue = _toast_fetch_datum(attr, lock);
if (VARATT_IS_COMPRESSED(toastedValue)) {
struct varlena *tmp = toastedValue;
toastedValue = _toast_decompress_datum(tmp);
duckdb_free(tmp);
}
} else if (VARATT_IS_EXTERNAL_INDIRECT(attr)) {
struct varatt_indirect redirect;
VARATT_EXTERNAL_GET_POINTER(redirect, attr);
toastedValue = (struct varlena *)redirect.pointer;
toastedValue = reinterpret_cast<struct varlena *>(DetoastPostgresDatum(attr, lock, shouldFree));
if (attr == (struct varlena *)redirect.pointer) {
struct varlena *result;
result = (struct varlena *)(VARSIZE_ANY(attr));
memcpy(result, attr, VARSIZE_ANY(attr));
toastedValue = result;
}
} else if (VARATT_IS_EXTERNAL_EXPANDED(attr)) {
ExpandedObjectHeader *eoh;
Size resultsize;
eoh = DatumGetEOHP(PointerGetDatum(attr));
resultsize = EOH_get_flat_size(eoh);
toastedValue = (struct varlena *)duckdb_malloc(resultsize);
EOH_flatten_into(eoh, (void *)toastedValue, resultsize);
} else if (VARATT_IS_COMPRESSED(attr)) {
toastedValue = _toast_decompress_datum(attr);
} else if (VARATT_IS_SHORT(attr)) {
Size data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
Size new_size = data_size + VARHDRSZ;
toastedValue = (struct varlena *)duckdb_malloc(new_size);
SET_VARSIZE(toastedValue, new_size);
memcpy(VARDATA(toastedValue), VARDATA_SHORT(attr), data_size);
} else {
toastedValue = attr;
*shouldFree = false;
}

return reinterpret_cast<Datum>(toastedValue);
}

} // namespace quack
Loading