From 24c6c4056a89247e2da6debc078d27800258aa6a Mon Sep 17 00:00:00 2001 From: mkaruza Date: Mon, 22 Apr 2024 12:35:20 +0200 Subject: [PATCH] Column query / thread-safe detoasting * Read columns that are needed for query * Skip query if any table is from PG_CATALOG_NAMESPACE or PG_TOAST_NAMESPACE * Thread safe detoasting --- Makefile | 3 +- include/quack/quack_detoast.hpp | 15 +++ include/quack/quack_heap_scan.hpp | 7 +- include/quack/quack_heap_seq_scan.hpp | 31 +++-- include/quack/quack_types.hpp | 5 +- src/quack_detoast.cpp | 163 ++++++++++++++++++++++++++ src/quack_heap_scan.cpp | 16 ++- src/quack_heap_seq_scan.cpp | 16 ++- src/quack_hooks.cpp | 25 +++- src/quack_select.cpp | 7 +- src/quack_types.cpp | 67 +++++++---- 11 files changed, 299 insertions(+), 56 deletions(-) create mode 100644 include/quack/quack_detoast.hpp create mode 100644 src/quack_detoast.cpp diff --git a/Makefile b/Makefile index 72aee371..0d10d2f5 100644 --- a/Makefile +++ b/Makefile @@ -4,8 +4,9 @@ MODULE_big = quack EXTENSION = quack DATA = quack.control $(wildcard quack--*.sql) -SRCS = src/quack_heap_seq_scan.cpp \ +SRCS = src/quack_detoast.cpp \ src/quack_heap_scan.cpp \ + src/quack_heap_seq_scan.cpp \ src/quack_hooks.cpp \ src/quack_select.cpp \ src/quack_types.cpp \ diff --git a/include/quack/quack_detoast.hpp b/include/quack/quack_detoast.hpp new file mode 100644 index 00000000..792a70e7 --- /dev/null +++ b/include/quack/quack_detoast.hpp @@ -0,0 +1,15 @@ +#pragma once + +#include "duckdb.hpp" + +extern "C" { +#include "postgres.h" +} + +#include + +namespace quack { + +Datum DetoastPostgresDatum(struct varlena *value, std::mutex &lock, bool *shouldFree); + +} // namespace quack \ No newline at end of file diff --git a/include/quack/quack_heap_scan.hpp b/include/quack/quack_heap_scan.hpp index 5aa4ff32..9d7ecac3 100644 --- a/include/quack/quack_heap_scan.hpp +++ b/include/quack/quack_heap_scan.hpp @@ -14,7 +14,6 @@ extern "C" { // Postgres Relation - namespace quack { struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState { @@ -23,7 +22,7 @@ struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState { ~PostgresHeapScanLocalState() override; public: - PostgresHeapSeqScan & m_rel; + PostgresHeapSeqScan &m_rel; PostgresHeapSeqScanThreadInfo m_thread_seq_scan_info; bool m_exhausted_scan = false; }; @@ -31,7 +30,7 @@ struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState { // Global State struct PostgresHeapScanGlobalState : public duckdb::GlobalTableFunctionState { - explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation); + explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation, duckdb::TableFunctionInitInput &input); ~PostgresHeapScanGlobalState(); idx_t MaxThreads() const override { @@ -67,7 +66,7 @@ struct PostgresHeapScanFunction : public duckdb::TableFunction { // LocalTableFunctionState *lstate, GlobalTableFunctionState *gstate); static double PostgresProgress(ClientContext // &context, const FunctionData *bind_data_p, const GlobalTableFunctionState *gstate); static void PostgresHeapScanFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p, - duckdb::DataChunk &output); + duckdb::DataChunk &output); // static unique_ptr PostgresCardinality(ClientContext &context, const FunctionData *bind_data); // static idx_t PostgresGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p, // LocalTableFunctionState *local_state, GlobalTableFunctionState *global_state); static void diff --git a/include/quack/quack_heap_seq_scan.hpp b/include/quack/quack_heap_seq_scan.hpp index 4a30a9a8..dfeb9780 100644 --- a/include/quack/quack_heap_seq_scan.hpp +++ b/include/quack/quack_heap_seq_scan.hpp @@ -9,6 +9,7 @@ extern "C" { } #include +#include namespace quack { @@ -31,18 +32,23 @@ class PostgresHeapSeqScanThreadInfo { HeapTupleData m_tuple; }; +class PostgresHeapSeqParallelScanState { +public: + PostgresHeapSeqParallelScanState() + : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_total_row_count(0) { + } + BlockNumber AssignNextBlockNumber(); + std::mutex m_lock; + BlockNumber m_nblocks; + BlockNumber m_last_assigned_block_number; + duckdb::map m_columns; + duckdb::vector m_projections; + duckdb::TableFilterSet *m_filters = nullptr; + std::atomic m_total_row_count; +}; + class PostgresHeapSeqScan { private: - class ParallelScanState { - public: - ParallelScanState() : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber) { - } - BlockNumber AssignNextBlockNumber(); - std::mutex m_lock; - BlockNumber m_nblocks; - BlockNumber m_last_assigned_block_number; - }; - public: PostgresHeapSeqScan(RangeTblEntry *table); ~PostgresHeapSeqScan(); @@ -52,7 +58,8 @@ class PostgresHeapSeqScan { PostgresHeapSeqScan(PostgresHeapSeqScan &&other); public: - void InitParallelScanState(); + void InitParallelScanState(const duckdb::vector &columns, + const duckdb::vector &projections, duckdb::TableFilterSet *filters); void SetSnapshot(Snapshot snapshot) { m_snapshot = snapshot; @@ -70,7 +77,7 @@ class PostgresHeapSeqScan { private: Relation m_rel = nullptr; Snapshot m_snapshot = nullptr; - ParallelScanState m_parallel_scan_state; + PostgresHeapSeqParallelScanState m_parallel_scan_state; }; } // namespace quack \ No newline at end of file diff --git a/include/quack/quack_types.hpp b/include/quack/quack_types.hpp index 05f38603..eeb7e4a8 100644 --- a/include/quack/quack_types.hpp +++ b/include/quack/quack_types.hpp @@ -7,9 +7,12 @@ extern "C" { #include "executor/tuptable.h" } +#include "quack/quack_heap_seq_scan.hpp" + namespace quack { duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type); void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset); void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col); -void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, HeapTupleData *slot, idx_t offset); +void InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo, + PostgresHeapSeqParallelScanState ¶llelScanState); } // namespace quack \ No newline at end of file diff --git a/src/quack_detoast.cpp b/src/quack_detoast.cpp new file mode 100644 index 00000000..c3d39663 --- /dev/null +++ b/src/quack_detoast.cpp @@ -0,0 +1,163 @@ +#include "duckdb.hpp" + +extern "C" { +#include "postgres.h" +#include "varatt.h" + +#ifdef USE_LZ4 +#include +#endif + +#include "access/detoast.h" +#include "access/table.h" +#include "access/tableam.h" +#include "access/toast_internals.h" +#include "common/pg_lzcompress.h" +#include "utils/expandeddatum.h" +} + +#include "quack/quack_types.hpp" +#include "quack/quack_detoast.hpp" + +/* + * Following functions are direct logic found in postgres code but for duckdb execution they are needed to be thread + * safe. Functions as palloc/pfree are exchanged with duckdb_malloc/duckdb_free. Access to toast table is protected with + * lock also for thread safe reasons. This is initial implementation but should be revisisted in future for better + * performances. + */ + +namespace quack { + +struct varlena * +_pglz_decompress_datum(const struct varlena *value) { + struct varlena *result; + int32 rawsize; + + result = (struct varlena *)duckdb_malloc(VARDATA_COMPRESSED_GET_EXTSIZE(value) + VARHDRSZ); + + rawsize = pglz_decompress((char *)value + VARHDRSZ_COMPRESSED, VARSIZE(value) - VARHDRSZ_COMPRESSED, + VARDATA(result), VARDATA_COMPRESSED_GET_EXTSIZE(value), true); + if (rawsize < 0) + ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg_internal("compressed pglz data is corrupt"))); + + SET_VARSIZE(result, rawsize + VARHDRSZ); + + return result; +} + +struct varlena * +_lz4_decompress_datum(const struct varlena *value) { +#ifndef USE_LZ4 + NO_LZ4_SUPPORT(); + return NULL; /* keep compiler quiet */ +#else + int32 rawsize; + struct varlena *result; + + result = (struct varlena *)duckdb_malloc(VARDATA_COMPRESSED_GET_EXTSIZE(value) + VARHDRSZ); + + rawsize = LZ4_decompress_safe((char *)value + VARHDRSZ_COMPRESSED, VARDATA(result), + VARSIZE(value) - VARHDRSZ_COMPRESSED, VARDATA_COMPRESSED_GET_EXTSIZE(value)); + if (rawsize < 0) + ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg_internal("compressed lz4 data is corrupt"))); + + SET_VARSIZE(result, rawsize + VARHDRSZ); + + return result; +#endif +} + +static struct varlena * +_toast_decompress_datum(struct varlena *attr) { + switch (TOAST_COMPRESS_METHOD(attr)) { + case TOAST_PGLZ_COMPRESSION_ID: + return _pglz_decompress_datum(attr); + case TOAST_LZ4_COMPRESSION_ID: + return _lz4_decompress_datum(attr); + default: + elog(ERROR, "invalid compression method id %d", TOAST_COMPRESS_METHOD(attr)); + return NULL; /* keep compiler quiet */ + } +} + +static struct varlena * +_toast_fetch_datum(struct varlena *attr, std::mutex &lock) { + Relation toastrel; + struct varlena *result; + struct varatt_external toast_pointer; + int32 attrsize; + + if (!VARATT_IS_EXTERNAL_ONDISK(attr)) + elog(ERROR, "toast_fetch_datum shouldn't be called for non-ondisk datums"); + + /* Must copy to access aligned fields */ + VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr); + + attrsize = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer); + + result = (struct varlena *)duckdb_malloc(attrsize + VARHDRSZ); + + if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)) { + SET_VARSIZE_COMPRESSED(result, attrsize + VARHDRSZ); + } else { + SET_VARSIZE(result, attrsize + VARHDRSZ); + } + + if (attrsize == 0) + return result; + + lock.lock(); + toastrel = table_open(toast_pointer.va_toastrelid, AccessShareLock); + table_relation_fetch_toast_slice(toastrel, toast_pointer.va_valueid, attrsize, 0, attrsize, result); + table_close(toastrel, AccessShareLock); + lock.unlock(); + + return result; +} + +Datum +DetoastPostgresDatum(struct varlena *attr, std::mutex &lock, bool *shouldFree) { + struct varlena *toastedValue = nullptr; + *shouldFree = true; + if (VARATT_IS_EXTERNAL_ONDISK(attr)) { + toastedValue = _toast_fetch_datum(attr, lock); + if (VARATT_IS_COMPRESSED(toastedValue)) { + struct varlena *tmp = toastedValue; + toastedValue = _toast_decompress_datum(tmp); + duckdb_free(tmp); + } + } else if (VARATT_IS_EXTERNAL_INDIRECT(attr)) { + struct varatt_indirect redirect; + VARATT_EXTERNAL_GET_POINTER(redirect, attr); + toastedValue = (struct varlena *)redirect.pointer; + toastedValue = reinterpret_cast(DetoastPostgresDatum(attr, lock, shouldFree)); + if (attr == (struct varlena *)redirect.pointer) { + struct varlena *result; + result = (struct varlena *)(VARSIZE_ANY(attr)); + memcpy(result, attr, VARSIZE_ANY(attr)); + toastedValue = result; + } + } else if (VARATT_IS_EXTERNAL_EXPANDED(attr)) { + ExpandedObjectHeader *eoh; + Size resultsize; + eoh = DatumGetEOHP(PointerGetDatum(attr)); + resultsize = EOH_get_flat_size(eoh); + toastedValue = (struct varlena *)duckdb_malloc(resultsize); + EOH_flatten_into(eoh, (void *)toastedValue, resultsize); + } else if (VARATT_IS_COMPRESSED(attr)) { + toastedValue = _toast_decompress_datum(attr); + } else if (VARATT_IS_SHORT(attr)) { + Size data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT; + Size new_size = data_size + VARHDRSZ; + toastedValue = (struct varlena *)duckdb_malloc(new_size); + SET_VARSIZE(toastedValue, new_size); + memcpy(VARDATA(toastedValue), VARDATA_SHORT(attr), data_size); + } else { + toastedValue = attr; + *shouldFree = false; + } + + return reinterpret_cast(toastedValue); +} + +} // namespace quack \ No newline at end of file diff --git a/src/quack_heap_scan.cpp b/src/quack_heap_scan.cpp index 09d06065..98ab7d16 100644 --- a/src/quack_heap_scan.cpp +++ b/src/quack_heap_scan.cpp @@ -28,9 +28,10 @@ PostgresHeapScanFunctionData::~PostgresHeapScanFunctionData() { // PostgresHeapScanGlobalState // -PostgresHeapScanGlobalState::PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation) { - relation.InitParallelScanState(); +PostgresHeapScanGlobalState::PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation, + duckdb::TableFunctionInitInput &input) { elog(DEBUG3, "-- (DuckDB/PostgresHeapScanGlobalState) Running %lu threads -- ", MaxThreads()); + relation.InitParallelScanState(input.column_ids, input.projection_ids, input.filters.get()); } PostgresHeapScanGlobalState::~PostgresHeapScanGlobalState() { @@ -58,7 +59,9 @@ PostgresHeapScanFunction::PostgresHeapScanFunction() PostgresHeapInitLocal) { named_parameters["table"] = duckdb::LogicalType::POINTER; named_parameters["snapshot"] = duckdb::LogicalType::POINTER; - // projection_pushdown = true; + projection_pushdown = true; + // filter_pushdown = true; + // filter_prune = true; } duckdb::unique_ptr @@ -94,7 +97,7 @@ duckdb::unique_ptr PostgresHeapScanFunction::PostgresHeapInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input) { auto &bind_data = input.bind_data->CastNoConst(); - return duckdb::make_uniq(bind_data.m_relation); + return duckdb::make_uniq(bind_data.m_relation, input); } duckdb::unique_ptr @@ -154,9 +157,10 @@ FindMatchingHeapRelation(List *tables, const duckdb::string &to_find) { /* This doesn't have an access method handler, we cant read from this */ RelationClose(rel); return nullptr; + } else { + RelationClose(rel); + return table; } - RelationClose(rel); - return table; } RelationClose(rel); } diff --git a/src/quack_heap_seq_scan.cpp b/src/quack_heap_seq_scan.cpp index 3cf90879..38184202 100644 --- a/src/quack_heap_seq_scan.cpp +++ b/src/quack_heap_seq_scan.cpp @@ -55,8 +55,16 @@ PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanIn } void -PostgresHeapSeqScan::InitParallelScanState() { +PostgresHeapSeqScan::InitParallelScanState(const duckdb::vector &columns, + const duckdb::vector &projections, + duckdb::TableFilterSet *filters) { m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel); + /* We need ordered columns ids for tuple fetch */ + for(duckdb::idx_t i = 0; i < columns.size(); i++) { + m_parallel_scan_state.m_columns[columns[i]] = i; + } + m_parallel_scan_state.m_projections = projections; + m_parallel_scan_state.m_filters = filters; } bool @@ -113,9 +121,7 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc } pgstat_count_heap_getnext(m_rel); - - InsertTupleIntoChunk(output, threadScanInfo.m_tuple_desc, &threadScanInfo.m_tuple, - threadScanInfo.m_output_vector_size); + InsertTupleIntoChunk(output, threadScanInfo, m_parallel_scan_state); } /* No more items on current page */ @@ -150,7 +156,7 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc } BlockNumber -PostgresHeapSeqScan::ParallelScanState::AssignNextBlockNumber() { +PostgresHeapSeqParallelScanState::AssignNextBlockNumber() { m_lock.lock(); BlockNumber block_number = InvalidBlockNumber; if (m_last_assigned_block_number == InvalidBlockNumber) { diff --git a/src/quack_hooks.cpp b/src/quack_hooks.cpp index 93706a58..d280a157 100644 --- a/src/quack_hooks.cpp +++ b/src/quack_hooks.cpp @@ -1,6 +1,8 @@ extern "C" { #include "postgres.h" +#include "catalog/pg_namespace.h" #include "commands/extension.h" +#include "utils/rel.h" } #include "quack/quack.h" @@ -13,14 +15,35 @@ is_quack_extension_registered() { return get_extension_oid("quack", true) != InvalidOid; } +static bool +is_catalog_table(List *tables) { + ListCell *lc; + foreach (lc, tables) { + RangeTblEntry *table = (RangeTblEntry *)lfirst(lc); + if (table->relid) { + auto rel = RelationIdGetRelation(table->relid); + auto namespaceOid = RelationGetNamespace(rel); + if (namespaceOid == PG_CATALOG_NAMESPACE || namespaceOid == PG_TOAST_NAMESPACE) { + RelationClose(rel); + return true; + } + RelationClose(rel); + } + } + return false; +} + static void quack_executor_run(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) { - if (is_quack_extension_registered() && queryDesc->operation == CMD_SELECT) { + if (is_quack_extension_registered() && !is_catalog_table(queryDesc->plannedstmt->rtable) && + queryDesc->operation == CMD_SELECT) { if (quack_execute_select(queryDesc, direction, count)) { return; } } + elog(DEBUG3, "quack_executor_run: Failing back to PG execution"); + if (PrevExecutorRunHook) { PrevExecutorRunHook(queryDesc, direction, count, execute_once); } diff --git a/src/quack_select.cpp b/src/quack_select.cpp index 53b2c4d8..c607e6ad 100644 --- a/src/quack_select.cpp +++ b/src/quack_select.cpp @@ -25,7 +25,7 @@ namespace quack { static duckdb::unique_ptr quack_open_database() { duckdb::DBConfig config; - //config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); + // config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); return duckdb::make_uniq(nullptr, &config); } @@ -59,7 +59,10 @@ quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t co TupleTableSlot *slot = NULL; // FIXME: try-catch ? - auto res = connection->Query(query_desc->sourceText); + + duckdb::unique_ptr res = nullptr; + + res = connection->Query(query_desc->sourceText); if (res->HasError()) { return false; } diff --git a/src/quack_types.cpp b/src/quack_types.cpp index 11c9ac34..d469e81d 100644 --- a/src/quack_types.cpp +++ b/src/quack_types.cpp @@ -2,12 +2,15 @@ extern "C" { #include "postgres.h" +#include "fmgr.h" #include "miscadmin.h" #include "catalog/pg_type.h" #include "executor/tuptable.h" } #include "quack/quack.h" +#include "quack/quack_heap_seq_scan.hpp" +#include "quack/quack_detoast.hpp" namespace quack { @@ -149,15 +152,15 @@ ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) { } } -typedef struct HeapTuplePageReadState { +typedef struct HeapTupleReadState { bool m_slow = 0; - int m_nvalid = 0; - uint32 m_offset = 0; -} HeapTuplePageReadState; + int m_last_tuple_att = 0; + uint32 m_page_tuple_offset = 0; +} HeapTupleReadState; static Datum -HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePageReadState &heapTupleReadState, - int natts, bool *isNull) { +HeapTupleFetchNextColumnDatum(TupleDesc tupleDesc, HeapTuple tuple, HeapTupleReadState &heapTupleReadState, int attNum, + bool *isNull) { HeapTupleHeader tup = tuple->t_data; bool hasnulls = HeapTupleHasNulls(tuple); @@ -168,23 +171,21 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage bool slow = false; Datum value = (Datum)0; - /* We can only fetch as many attributes as the tuple has. */ - natts = Min(HeapTupleHeaderGetNatts(tuple->t_data), natts); + attnum = heapTupleReadState.m_last_tuple_att; - attnum = heapTupleReadState.m_nvalid; if (attnum == 0) { /* Start from the first attribute */ off = 0; heapTupleReadState.m_slow = false; } else { /* Restore state from previous execution */ - off = heapTupleReadState.m_offset; + off = heapTupleReadState.m_page_tuple_offset; slow = heapTupleReadState.m_slow; } tp = (char *)tup + tup->t_hoff; - for (; attnum < natts; attnum++) { + for (; attnum < attNum; attnum++) { Form_pg_attribute thisatt = TupleDescAttr(tupleDesc, attnum); if (hasnulls && att_isnull(attnum, bp)) { @@ -199,7 +200,6 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage if (!slow && thisatt->attcacheoff >= 0) { off = thisatt->attcacheoff; } else if (thisatt->attlen == -1) { - if (!slow && off == att_align_nominal(off, thisatt->attalign)) { thisatt->attcacheoff = off; } else { @@ -208,7 +208,6 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage } } else { off = att_align_nominal(off, thisatt->attalign); - if (!slow) { thisatt->attcacheoff = off; } @@ -223,8 +222,8 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage } } - heapTupleReadState.m_nvalid = attnum; - heapTupleReadState.m_offset = off; + heapTupleReadState.m_last_tuple_att = attNum; + heapTupleReadState.m_page_tuple_offset = off; if (slow) { heapTupleReadState.m_slow = true; @@ -236,19 +235,39 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage } void -InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tupleDesc, HeapTupleData *slot, idx_t offset) { - HeapTuplePageReadState heapTupleReadState = {}; - for (int i = 0; i < tupleDesc->natts; i++) { - auto &result = output.data[i]; - bool isNull = false; - Datum value = HeapTupleFetchNextDatumValue(tupleDesc, slot, heapTupleReadState, i + 1, &isNull); - if (isNull) { +InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo, + PostgresHeapSeqParallelScanState ¶llelScanState) { + HeapTupleReadState heapTupleReadState = {}; + + Datum *values = (Datum *)duckdb_malloc(sizeof(Datum) * parallelScanState.m_columns.size()); + bool *nulls = (bool *)duckdb_malloc(sizeof(bool) * parallelScanState.m_columns.size()); + + for (auto const &[columnIdx, valueIdx] : parallelScanState.m_columns) { + values[valueIdx] = HeapTupleFetchNextColumnDatum(threadScanInfo.m_tuple_desc, &threadScanInfo.m_tuple, + heapTupleReadState, columnIdx + 1, &nulls[valueIdx]); + auto &result = output.data[valueIdx]; + if (nulls[valueIdx]) { auto &array_mask = duckdb::FlatVector::Validity(result); - array_mask.SetInvalid(offset); + array_mask.SetInvalid(threadScanInfo.m_output_vector_size); } else { - ConvertPostgresToDuckValue(value, result, offset); + if (threadScanInfo.m_tuple_desc->attrs[columnIdx].attlen == -1) { + bool shouldFree = false; + values[valueIdx] = + DetoastPostgresDatum(reinterpret_cast(values[valueIdx]), parallelScanState.m_lock, &shouldFree); + ConvertPostgresToDuckValue(values[valueIdx] , result, threadScanInfo.m_output_vector_size); + if (shouldFree) { + duckdb_free(reinterpret_cast(values[valueIdx])); + } + } else { + ConvertPostgresToDuckValue(values[valueIdx], result, threadScanInfo.m_output_vector_size); + } } } + + parallelScanState.m_total_row_count++; + + duckdb_free(values); + duckdb_free(nulls); } } // namespace quack