diff --git a/include/pgduckdb/catalog/pgduckdb_table.hpp b/include/pgduckdb/catalog/pgduckdb_table.hpp index 07cf29b2..aeb835c2 100644 --- a/include/pgduckdb/catalog/pgduckdb_table.hpp +++ b/include/pgduckdb/catalog/pgduckdb_table.hpp @@ -22,35 +22,33 @@ namespace duckdb { class PostgresTable : public TableCatalogEntry { public: - virtual ~PostgresTable() { - } + virtual ~PostgresTable(); public: - static bool SetTableInfo(CreateTableInfo &info, Oid relid, Snapshot snapshot); - static Cardinality GetTableCardinality(Oid relid); + static ::Relation OpenRelation(Oid relid); + static bool SetTableInfo(CreateTableInfo &info, ::Relation rel); + static Cardinality GetTableCardinality(::Relation rel); protected: - PostgresTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, Cardinality cardinality, - Snapshot snapshot); + PostgresTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, ::Relation rel, + Cardinality cardinality, Snapshot snapshot); protected: + ::Relation rel; Cardinality cardinality; Snapshot snapshot; }; class PostgresHeapTable : public PostgresTable { public: - PostgresHeapTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, Cardinality cardinality, - Snapshot snapshot, Oid oid); + PostgresHeapTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, ::Relation rel, + Cardinality cardinality, Snapshot snapshot); public: // -- Table API -- unique_ptr GetStatistics(ClientContext &context, column_t column_id) override; TableFunction GetScanFunction(ClientContext &context, unique_ptr &bind_data) override; TableStorageInfo GetStorageInfo(ClientContext &context) override; - -private: - Oid oid; }; } // namespace duckdb diff --git a/include/pgduckdb/scan/heap_reader.hpp b/include/pgduckdb/scan/heap_reader.hpp index 015181fb..b7e99fe2 100644 --- a/include/pgduckdb/scan/heap_reader.hpp +++ b/include/pgduckdb/scan/heap_reader.hpp @@ -15,8 +15,8 @@ namespace pgduckdb { class HeapReaderGlobalState { public: - HeapReaderGlobalState(Relation relation) - : m_nblocks(RelationGetNumberOfBlocks(relation)), m_last_assigned_block_number(InvalidBlockNumber) { + HeapReaderGlobalState(Relation rel) + : m_nblocks(RelationGetNumberOfBlocks(rel)), m_last_assigned_block_number(InvalidBlockNumber) { } BlockNumber AssignNextBlockNumber(std::mutex &lock); BlockNumber m_nblocks; @@ -28,7 +28,7 @@ class HeapReaderGlobalState { class HeapReader { private: public: - HeapReader(Relation relation, duckdb::shared_ptr heap_reader_global_state, + HeapReader(Relation rel, duckdb::shared_ptr heap_reader_global_state, duckdb::shared_ptr global_state, duckdb::shared_ptr local_state); ~HeapReader(); @@ -49,7 +49,7 @@ class HeapReader { duckdb::shared_ptr m_global_state; duckdb::shared_ptr m_heap_reader_global_state; duckdb::shared_ptr m_local_state; - Relation m_relation; + Relation m_rel; bool m_inited; bool m_read_next_page; bool m_page_tuples_all_visible; diff --git a/include/pgduckdb/scan/postgres_seq_scan.hpp b/include/pgduckdb/scan/postgres_seq_scan.hpp index c3d79512..1c5ca1e1 100644 --- a/include/pgduckdb/scan/postgres_seq_scan.hpp +++ b/include/pgduckdb/scan/postgres_seq_scan.hpp @@ -21,7 +21,7 @@ namespace pgduckdb { // Global State struct PostgresSeqScanGlobalState : public duckdb::GlobalTableFunctionState { - explicit PostgresSeqScanGlobalState(Relation relation, duckdb::TableFunctionInitInput &input); + explicit PostgresSeqScanGlobalState(Relation rel, duckdb::TableFunctionInitInput &input); ~PostgresSeqScanGlobalState(); idx_t MaxThreads() const override { @@ -31,15 +31,14 @@ struct PostgresSeqScanGlobalState : public duckdb::GlobalTableFunctionState { public: duckdb::shared_ptr m_global_state; duckdb::shared_ptr m_heap_reader_global_state; - Relation m_relation; - Oid m_relid; + Relation m_rel; }; // Local State struct PostgresSeqScanLocalState : public duckdb::LocalTableFunctionState { public: - PostgresSeqScanLocalState(Relation relation, duckdb::shared_ptr heap_reader_global_state, + PostgresSeqScanLocalState(Relation rel, duckdb::shared_ptr heap_reader_global_state, duckdb::shared_ptr global_state); ~PostgresSeqScanLocalState() override; @@ -52,12 +51,12 @@ struct PostgresSeqScanLocalState : public duckdb::LocalTableFunctionState { struct PostgresSeqScanFunctionData : public duckdb::TableFunctionData { public: - PostgresSeqScanFunctionData(uint64_t cardinality, Oid relid, Snapshot snapshot); + PostgresSeqScanFunctionData(::Relation rel, uint64_t cardinality, Snapshot snapshot); ~PostgresSeqScanFunctionData() override; public: + ::Relation m_rel; uint64_t m_cardinality; - Oid m_relid; Snapshot m_snapshot; }; @@ -68,9 +67,6 @@ struct PostgresSeqScanFunction : public duckdb::TableFunction { PostgresSeqScanFunction(); public: - static duckdb::unique_ptr - PostgresSeqScanBind(duckdb::ClientContext &context, duckdb::TableFunctionBindInput &input, - duckdb::vector &return_types, duckdb::vector &names); static duckdb::unique_ptr PostgresSeqScanInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input); static duckdb::unique_ptr diff --git a/src/catalog/pgduckdb_table.cpp b/src/catalog/pgduckdb_table.cpp index 55810118..d6c37112 100644 --- a/src/catalog/pgduckdb_table.cpp +++ b/src/catalog/pgduckdb_table.cpp @@ -1,9 +1,11 @@ #include "pgduckdb/pgduckdb_types.hpp" +#include "pgduckdb/pgduckdb_process_lock.hpp" #include "pgduckdb/catalog/pgduckdb_schema.hpp" #include "pgduckdb/catalog/pgduckdb_table.hpp" #include "duckdb/parser/parsed_data/create_table_info.hpp" #include "pgduckdb/scan/postgres_seq_scan.hpp" #include "pgduckdb/scan/postgres_scan.hpp" +#include "pgduckdb/pgduckdb_utils.hpp" extern "C" { #include "postgres.h" @@ -25,44 +27,46 @@ extern "C" { namespace duckdb { -PostgresTable::PostgresTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, +PostgresTable::PostgresTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, ::Relation rel, Cardinality cardinality, Snapshot snapshot) - : TableCatalogEntry(catalog, schema, info), cardinality(cardinality), snapshot(snapshot) { + : TableCatalogEntry(catalog, schema, info), rel(rel), cardinality(cardinality), snapshot(snapshot) { +} + +PostgresTable::~PostgresTable() { + std::lock_guard lock(pgduckdb::DuckdbProcessLock::GetLock()); + RelationClose(rel); +} + +::Relation +PostgresTable::OpenRelation(Oid relid) { + std::lock_guard lock(pgduckdb::DuckdbProcessLock::GetLock()); + auto rel = pgduckdb::PostgresFunctionGuard<::Relation>(RelationIdGetRelation, relid); + return rel; } bool -PostgresTable::SetTableInfo(CreateTableInfo &info, Oid relid, Snapshot snapshot) { - auto rel = RelationIdGetRelation(relid); +PostgresTable::SetTableInfo(CreateTableInfo &info, ::Relation rel) { auto tupleDesc = RelationGetDescr(rel); - if (!tupleDesc) { - elog(WARNING, "Failed to get tuple descriptor for relation with OID %u", relid); - RelationClose(rel); - return false; - } - for (int i = 0; i < tupleDesc->natts; i++) { Form_pg_attribute attr = &tupleDesc->attrs[i]; auto col_name = duckdb::string(NameStr(attr->attname)); auto duck_type = pgduckdb::ConvertPostgresToDuckColumnType(attr); info.columns.AddColumn(duckdb::ColumnDefinition(col_name, duck_type)); /* Log column name and type */ - elog(DEBUG3, "(DuckDB/SetTableInfo) Column name: %s, Type: %s --", col_name.c_str(), + elog(DEBUG2, "(DuckDB/SetTableInfo) Column name: %s, Type: %s --", col_name.c_str(), duck_type.ToString().c_str()); } - RelationClose(rel); return true; } Cardinality -PostgresTable::GetTableCardinality(Oid relid) { - auto rel = RelationIdGetRelation(relid); +PostgresTable::GetTableCardinality(::Relation rel) { Cardinality cardinality; BlockNumber n_pages; double allvisfrac; estimate_rel_size(rel, NULL, &n_pages, &cardinality, &allvisfrac); - RelationClose(rel); return cardinality; } @@ -71,8 +75,8 @@ PostgresTable::GetTableCardinality(Oid relid) { //===--------------------------------------------------------------------===// PostgresHeapTable::PostgresHeapTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, - Cardinality cardinality, Snapshot snapshot, Oid oid) - : PostgresTable(catalog, schema, info, cardinality, snapshot), oid(oid) { + ::Relation rel, Cardinality cardinality, Snapshot snapshot) + : PostgresTable(catalog, schema, info, rel, cardinality, snapshot) { } unique_ptr @@ -82,7 +86,7 @@ PostgresHeapTable::GetStatistics(ClientContext &context, column_t column_id) { TableFunction PostgresHeapTable::GetScanFunction(ClientContext &context, unique_ptr &bind_data) { - bind_data = duckdb::make_uniq(cardinality, oid, snapshot); + bind_data = duckdb::make_uniq(rel, cardinality, snapshot); return pgduckdb::PostgresSeqScanFunction(); } diff --git a/src/catalog/pgduckdb_transaction.cpp b/src/catalog/pgduckdb_transaction.cpp index 7a27bc01..3d5d7c66 100644 --- a/src/catalog/pgduckdb_transaction.cpp +++ b/src/catalog/pgduckdb_transaction.cpp @@ -73,15 +73,17 @@ SchemaItems::GetTable(const string &entry_name) { } ReleaseSysCache(tuple); + ::Relation rel = PostgresTable::OpenRelation(rel_oid); + unique_ptr table; CreateTableInfo info; info.table = entry_name; Cardinality cardinality = 1; - if (!PostgresTable::SetTableInfo(info, rel_oid, snapshot)) { + if (!PostgresTable::SetTableInfo(info, rel)) { return nullptr; } - cardinality = PostgresTable::GetTableCardinality(rel_oid); - table = make_uniq(catalog, *schema, info, cardinality, snapshot, rel_oid); + cardinality = PostgresTable::GetTableCardinality(rel); + table = make_uniq(catalog, *schema, info, rel, cardinality, snapshot); tables[entry_name] = std::move(table); return tables[entry_name].get(); } diff --git a/src/scan/heap_reader.cpp b/src/scan/heap_reader.cpp index a25817d4..cbf23546 100644 --- a/src/scan/heap_reader.cpp +++ b/src/scan/heap_reader.cpp @@ -39,14 +39,14 @@ HeapReaderGlobalState::AssignNextBlockNumber(std::mutex &lock) { // HeapReader // -HeapReader::HeapReader(Relation relation, duckdb::shared_ptr heap_reader_global_state, +HeapReader::HeapReader(Relation rel, duckdb::shared_ptr heap_reader_global_state, duckdb::shared_ptr global_state, duckdb::shared_ptr local_state) : m_global_state(global_state), m_heap_reader_global_state(heap_reader_global_state), m_local_state(local_state), - m_relation(relation), m_inited(false), m_read_next_page(true), m_block_number(InvalidBlockNumber), - m_buffer(InvalidBuffer), m_current_tuple_index(InvalidOffsetNumber), m_page_tuples_left(0) { + m_rel(rel), m_inited(false), m_read_next_page(true), m_block_number(InvalidBlockNumber), m_buffer(InvalidBuffer), + m_current_tuple_index(InvalidOffsetNumber), m_page_tuples_left(0) { m_tuple.t_data = NULL; - m_tuple.t_tableOid = RelationGetRelid(m_relation); + m_tuple.t_tableOid = RelationGetRelid(m_rel); ItemPointerSetInvalid(&m_tuple.t_self); } @@ -57,7 +57,7 @@ Page HeapReader::PreparePageRead() { Page page = BufferGetPage(m_buffer); #if PG_VERSION_NUM < 170000 - TestForOldSnapshot(m_global_state->m_snapshot, m_relation, page); + TestForOldSnapshot(m_global_state->m_snapshot, m_rel, page); #endif m_page_tuples_all_visible = PageIsAllVisible(page) && !m_global_state->m_snapshot->takenDuringRecovery; m_page_tuples_left = PageGetMaxOffsetNumber(page) - FirstOffsetNumber + 1; @@ -90,7 +90,7 @@ HeapReader::ReadPageTuples(duckdb::DataChunk &output) { std::lock_guard lock(DuckdbProcessLock::GetLock()); block = m_block_number; - m_buffer = PostgresFunctionGuard(ReadBufferExtended, m_relation, MAIN_FORKNUM, block, RBM_NORMAL, + m_buffer = PostgresFunctionGuard(ReadBufferExtended, m_rel, MAIN_FORKNUM, block, RBM_NORMAL, GetAccessStrategy(BAS_BULKREAD)); PostgresFunctionGuard(LockBuffer, m_buffer, BUFFER_LOCK_SHARE); @@ -118,7 +118,7 @@ HeapReader::ReadPageTuples(duckdb::DataChunk &output) { continue; } - pgstat_count_heap_getnext(m_relation); + pgstat_count_heap_getnext(m_rel); InsertTupleIntoChunk(output, m_global_state, m_local_state, &m_tuple); } diff --git a/src/scan/postgres_seq_scan.cpp b/src/scan/postgres_seq_scan.cpp index 3295f50c..5304c2ee 100644 --- a/src/scan/postgres_seq_scan.cpp +++ b/src/scan/postgres_seq_scan.cpp @@ -10,31 +10,27 @@ namespace pgduckdb { // PostgresSeqScanGlobalState // -PostgresSeqScanGlobalState::PostgresSeqScanGlobalState(Relation relation, duckdb::TableFunctionInitInput &input) +PostgresSeqScanGlobalState::PostgresSeqScanGlobalState(Relation rel, duckdb::TableFunctionInitInput &input) : m_global_state(duckdb::make_shared_ptr()), - m_heap_reader_global_state(duckdb::make_shared_ptr(relation)), m_relation(relation) { + m_heap_reader_global_state(duckdb::make_shared_ptr(rel)), m_rel(rel) { m_global_state->InitGlobalState(input); - m_global_state->m_tuple_desc = RelationGetDescr(m_relation); + m_global_state->m_tuple_desc = RelationGetDescr(m_rel); m_global_state->InitRelationMissingAttrs(m_global_state->m_tuple_desc); - elog(DEBUG2, "(DuckDB/PostgresSeqScanGlobalState) Running %" PRIu64 " threads -- ", - (uint64_t)MaxThreads()); + elog(DEBUG2, "(DuckDB/PostgresSeqScanGlobalState) Running %" PRIu64 " threads -- ", (uint64_t)MaxThreads()); } PostgresSeqScanGlobalState::~PostgresSeqScanGlobalState() { - if (m_relation) { - RelationClose(m_relation); - } } // // PostgresSeqScanLocalState // -PostgresSeqScanLocalState::PostgresSeqScanLocalState(Relation relation, +PostgresSeqScanLocalState::PostgresSeqScanLocalState(Relation rel, duckdb::shared_ptr heap_reder_global_state, duckdb::shared_ptr global_state) : m_local_state(duckdb::make_shared_ptr()) { - m_heap_table_reader = duckdb::make_uniq(relation, heap_reder_global_state, global_state, m_local_state); + m_heap_table_reader = duckdb::make_uniq(rel, heap_reder_global_state, global_state, m_local_state); } PostgresSeqScanLocalState::~PostgresSeqScanLocalState() { @@ -44,8 +40,8 @@ PostgresSeqScanLocalState::~PostgresSeqScanLocalState() { // PostgresSeqScanFunctionData // -PostgresSeqScanFunctionData::PostgresSeqScanFunctionData(uint64_t cardinality, Oid relid, Snapshot snapshot) - : m_cardinality(cardinality), m_relid(relid), m_snapshot(snapshot) { +PostgresSeqScanFunctionData::PostgresSeqScanFunctionData(::Relation rel, uint64_t cardinality, Snapshot snapshot) + : m_rel(rel), m_cardinality(cardinality), m_snapshot(snapshot) { } PostgresSeqScanFunctionData::~PostgresSeqScanFunctionData() { @@ -56,7 +52,7 @@ PostgresSeqScanFunctionData::~PostgresSeqScanFunctionData() { // PostgresSeqScanFunction::PostgresSeqScanFunction() - : TableFunction("postgres_seq_scan", {}, PostgresSeqScanFunc, PostgresSeqScanBind, PostgresSeqScanInitGlobal, + : TableFunction("postgres_seq_scan", {}, PostgresSeqScanFunc, nullptr, PostgresSeqScanInitGlobal, PostgresSeqScanInitLocal) { named_parameters["cardinality"] = duckdb::LogicalType::UBIGINT; named_parameters["relid"] = duckdb::LogicalType::UINTEGER; @@ -67,45 +63,12 @@ PostgresSeqScanFunction::PostgresSeqScanFunction() cardinality = PostgresSeqScanCardinality; } -duckdb::unique_ptr -PostgresSeqScanFunction::PostgresSeqScanBind(duckdb::ClientContext &context, duckdb::TableFunctionBindInput &input, - duckdb::vector &return_types, - duckdb::vector &names) { - auto cardinality = input.named_parameters["cardinality"].GetValue(); - auto relid = input.named_parameters["relid"].GetValue(); - auto snapshot = (reinterpret_cast(input.named_parameters["snapshot"].GetPointer())); - - auto rel = RelationIdGetRelation(relid); - auto relation_descr = RelationGetDescr(rel); - - if (!relation_descr) { - elog(WARNING, "(PGDuckDB/PostgresSeqScanBind) Failed to get tuple descriptor for relation with OID %u", relid); - RelationClose(rel); - return nullptr; - } - - for (int i = 0; i < relation_descr->natts; i++) { - Form_pg_attribute attr = &relation_descr->attrs[i]; - auto col_name = duckdb::string(NameStr(attr->attname)); - auto duck_type = ConvertPostgresToDuckColumnType(attr); - return_types.push_back(duck_type); - names.push_back(col_name); - /* Log column name and type */ - elog(DEBUG2, "(PGDuckDB/PostgresSeqScanBind) Column name: %s, Type: %s --", col_name.c_str(), - duck_type.ToString().c_str()); - } - - RelationClose(rel); - return duckdb::make_uniq(cardinality, relid, snapshot); -} - duckdb::unique_ptr PostgresSeqScanFunction::PostgresSeqScanInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input) { auto &bind_data = input.bind_data->CastNoConst(); - auto global_state = duckdb::make_uniq(RelationIdGetRelation(bind_data.m_relid), input); + auto global_state = duckdb::make_uniq(bind_data.m_rel, input); global_state->m_global_state->m_snapshot = bind_data.m_snapshot; - global_state->m_relid = bind_data.m_relid; return std::move(global_state); } @@ -115,7 +78,7 @@ PostgresSeqScanFunction::PostgresSeqScanInitLocal(duckdb::ExecutionContext &cont duckdb::GlobalTableFunctionState *gstate) { auto global_state = reinterpret_cast(gstate); return duckdb::make_uniq( - global_state->m_relation, global_state->m_heap_reader_global_state, global_state->m_global_state); + global_state->m_rel, global_state->m_heap_reader_global_state, global_state->m_global_state); } void