Skip to content

Commit

Permalink
Open heap relation in only once (#247)
Browse files Browse the repository at this point in the history
Opening and closing relation is not thread safe. So it would be better
that we open and close relation from central point and pass pointer to
structure for reading.
  • Loading branch information
mkaruza committed Oct 7, 2024
1 parent d2db26c commit b319fa1
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 100 deletions.
20 changes: 9 additions & 11 deletions include/pgduckdb/catalog/pgduckdb_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,33 @@ namespace duckdb {

class PostgresTable : public TableCatalogEntry {
public:
virtual ~PostgresTable() {
}
virtual ~PostgresTable();

public:
static bool SetTableInfo(CreateTableInfo &info, Oid relid, Snapshot snapshot);
static Cardinality GetTableCardinality(Oid relid);
static ::Relation OpenRelation(Oid relid);
static bool SetTableInfo(CreateTableInfo &info, ::Relation rel);
static Cardinality GetTableCardinality(::Relation rel);

protected:
PostgresTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, Cardinality cardinality,
Snapshot snapshot);
PostgresTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, ::Relation rel,
Cardinality cardinality, Snapshot snapshot);

protected:
::Relation rel;
Cardinality cardinality;
Snapshot snapshot;
};

class PostgresHeapTable : public PostgresTable {
public:
PostgresHeapTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, Cardinality cardinality,
Snapshot snapshot, Oid oid);
PostgresHeapTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, ::Relation rel,
Cardinality cardinality, Snapshot snapshot);

public:
// -- Table API --
unique_ptr<BaseStatistics> GetStatistics(ClientContext &context, column_t column_id) override;
TableFunction GetScanFunction(ClientContext &context, unique_ptr<FunctionData> &bind_data) override;
TableStorageInfo GetStorageInfo(ClientContext &context) override;

private:
Oid oid;
};

} // namespace duckdb
8 changes: 4 additions & 4 deletions include/pgduckdb/scan/heap_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ namespace pgduckdb {

class HeapReaderGlobalState {
public:
HeapReaderGlobalState(Relation relation)
: m_nblocks(RelationGetNumberOfBlocks(relation)), m_last_assigned_block_number(InvalidBlockNumber) {
HeapReaderGlobalState(Relation rel)
: m_nblocks(RelationGetNumberOfBlocks(rel)), m_last_assigned_block_number(InvalidBlockNumber) {
}
BlockNumber AssignNextBlockNumber(std::mutex &lock);
BlockNumber m_nblocks;
Expand All @@ -28,7 +28,7 @@ class HeapReaderGlobalState {
class HeapReader {
private:
public:
HeapReader(Relation relation, duckdb::shared_ptr<HeapReaderGlobalState> heap_reader_global_state,
HeapReader(Relation rel, duckdb::shared_ptr<HeapReaderGlobalState> heap_reader_global_state,
duckdb::shared_ptr<PostgresScanGlobalState> global_state,
duckdb::shared_ptr<PostgresScanLocalState> local_state);
~HeapReader();
Expand All @@ -49,7 +49,7 @@ class HeapReader {
duckdb::shared_ptr<PostgresScanGlobalState> m_global_state;
duckdb::shared_ptr<HeapReaderGlobalState> m_heap_reader_global_state;
duckdb::shared_ptr<PostgresScanLocalState> m_local_state;
Relation m_relation;
Relation m_rel;
bool m_inited;
bool m_read_next_page;
bool m_page_tuples_all_visible;
Expand Down
14 changes: 5 additions & 9 deletions include/pgduckdb/scan/postgres_seq_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace pgduckdb {
// Global State

struct PostgresSeqScanGlobalState : public duckdb::GlobalTableFunctionState {
explicit PostgresSeqScanGlobalState(Relation relation, duckdb::TableFunctionInitInput &input);
explicit PostgresSeqScanGlobalState(Relation rel, duckdb::TableFunctionInitInput &input);
~PostgresSeqScanGlobalState();
idx_t
MaxThreads() const override {
Expand All @@ -31,15 +31,14 @@ struct PostgresSeqScanGlobalState : public duckdb::GlobalTableFunctionState {
public:
duckdb::shared_ptr<PostgresScanGlobalState> m_global_state;
duckdb::shared_ptr<HeapReaderGlobalState> m_heap_reader_global_state;
Relation m_relation;
Oid m_relid;
Relation m_rel;
};

// Local State

struct PostgresSeqScanLocalState : public duckdb::LocalTableFunctionState {
public:
PostgresSeqScanLocalState(Relation relation, duckdb::shared_ptr<HeapReaderGlobalState> heap_reader_global_state,
PostgresSeqScanLocalState(Relation rel, duckdb::shared_ptr<HeapReaderGlobalState> heap_reader_global_state,
duckdb::shared_ptr<PostgresScanGlobalState> global_state);
~PostgresSeqScanLocalState() override;

Expand All @@ -52,12 +51,12 @@ struct PostgresSeqScanLocalState : public duckdb::LocalTableFunctionState {

struct PostgresSeqScanFunctionData : public duckdb::TableFunctionData {
public:
PostgresSeqScanFunctionData(uint64_t cardinality, Oid relid, Snapshot snapshot);
PostgresSeqScanFunctionData(::Relation rel, uint64_t cardinality, Snapshot snapshot);
~PostgresSeqScanFunctionData() override;

public:
::Relation m_rel;
uint64_t m_cardinality;
Oid m_relid;
Snapshot m_snapshot;
};

Expand All @@ -68,9 +67,6 @@ struct PostgresSeqScanFunction : public duckdb::TableFunction {
PostgresSeqScanFunction();

public:
static duckdb::unique_ptr<duckdb::FunctionData>
PostgresSeqScanBind(duckdb::ClientContext &context, duckdb::TableFunctionBindInput &input,
duckdb::vector<duckdb::LogicalType> &return_types, duckdb::vector<duckdb::string> &names);
static duckdb::unique_ptr<duckdb::GlobalTableFunctionState>
PostgresSeqScanInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input);
static duckdb::unique_ptr<duckdb::LocalTableFunctionState>
Expand Down
40 changes: 22 additions & 18 deletions src/catalog/pgduckdb_table.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#include "pgduckdb/pgduckdb_types.hpp"
#include "pgduckdb/pgduckdb_process_lock.hpp"
#include "pgduckdb/catalog/pgduckdb_schema.hpp"
#include "pgduckdb/catalog/pgduckdb_table.hpp"
#include "duckdb/parser/parsed_data/create_table_info.hpp"
#include "pgduckdb/scan/postgres_seq_scan.hpp"
#include "pgduckdb/scan/postgres_scan.hpp"
#include "pgduckdb/pgduckdb_utils.hpp"

extern "C" {
#include "postgres.h"
Expand All @@ -25,44 +27,46 @@ extern "C" {

namespace duckdb {

PostgresTable::PostgresTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info,
PostgresTable::PostgresTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, ::Relation rel,
Cardinality cardinality, Snapshot snapshot)
: TableCatalogEntry(catalog, schema, info), cardinality(cardinality), snapshot(snapshot) {
: TableCatalogEntry(catalog, schema, info), rel(rel), cardinality(cardinality), snapshot(snapshot) {
}

PostgresTable::~PostgresTable() {
std::lock_guard<std::mutex> lock(pgduckdb::DuckdbProcessLock::GetLock());
RelationClose(rel);
}

::Relation
PostgresTable::OpenRelation(Oid relid) {
std::lock_guard<std::mutex> lock(pgduckdb::DuckdbProcessLock::GetLock());
auto rel = pgduckdb::PostgresFunctionGuard<::Relation>(RelationIdGetRelation, relid);
return rel;
}

bool
PostgresTable::SetTableInfo(CreateTableInfo &info, Oid relid, Snapshot snapshot) {
auto rel = RelationIdGetRelation(relid);
PostgresTable::SetTableInfo(CreateTableInfo &info, ::Relation rel) {
auto tupleDesc = RelationGetDescr(rel);

if (!tupleDesc) {
elog(WARNING, "Failed to get tuple descriptor for relation with OID %u", relid);
RelationClose(rel);
return false;
}

for (int i = 0; i < tupleDesc->natts; i++) {
Form_pg_attribute attr = &tupleDesc->attrs[i];
auto col_name = duckdb::string(NameStr(attr->attname));
auto duck_type = pgduckdb::ConvertPostgresToDuckColumnType(attr);
info.columns.AddColumn(duckdb::ColumnDefinition(col_name, duck_type));
/* Log column name and type */
elog(DEBUG3, "(DuckDB/SetTableInfo) Column name: %s, Type: %s --", col_name.c_str(),
elog(DEBUG2, "(DuckDB/SetTableInfo) Column name: %s, Type: %s --", col_name.c_str(),
duck_type.ToString().c_str());
}

RelationClose(rel);
return true;
}

Cardinality
PostgresTable::GetTableCardinality(Oid relid) {
auto rel = RelationIdGetRelation(relid);
PostgresTable::GetTableCardinality(::Relation rel) {
Cardinality cardinality;
BlockNumber n_pages;
double allvisfrac;
estimate_rel_size(rel, NULL, &n_pages, &cardinality, &allvisfrac);
RelationClose(rel);
return cardinality;
}

Expand All @@ -71,8 +75,8 @@ PostgresTable::GetTableCardinality(Oid relid) {
//===--------------------------------------------------------------------===//

PostgresHeapTable::PostgresHeapTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info,
Cardinality cardinality, Snapshot snapshot, Oid oid)
: PostgresTable(catalog, schema, info, cardinality, snapshot), oid(oid) {
::Relation rel, Cardinality cardinality, Snapshot snapshot)
: PostgresTable(catalog, schema, info, rel, cardinality, snapshot) {
}

unique_ptr<BaseStatistics>
Expand All @@ -82,7 +86,7 @@ PostgresHeapTable::GetStatistics(ClientContext &context, column_t column_id) {

TableFunction
PostgresHeapTable::GetScanFunction(ClientContext &context, unique_ptr<FunctionData> &bind_data) {
bind_data = duckdb::make_uniq<pgduckdb::PostgresSeqScanFunctionData>(cardinality, oid, snapshot);
bind_data = duckdb::make_uniq<pgduckdb::PostgresSeqScanFunctionData>(rel, cardinality, snapshot);
return pgduckdb::PostgresSeqScanFunction();
}

Expand Down
8 changes: 5 additions & 3 deletions src/catalog/pgduckdb_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,17 @@ SchemaItems::GetTable(const string &entry_name) {
}
ReleaseSysCache(tuple);

::Relation rel = PostgresTable::OpenRelation(rel_oid);

unique_ptr<PostgresTable> table;
CreateTableInfo info;
info.table = entry_name;
Cardinality cardinality = 1;
if (!PostgresTable::SetTableInfo(info, rel_oid, snapshot)) {
if (!PostgresTable::SetTableInfo(info, rel)) {
return nullptr;
}
cardinality = PostgresTable::GetTableCardinality(rel_oid);
table = make_uniq<PostgresHeapTable>(catalog, *schema, info, cardinality, snapshot, rel_oid);
cardinality = PostgresTable::GetTableCardinality(rel);
table = make_uniq<PostgresHeapTable>(catalog, *schema, info, rel, cardinality, snapshot);
tables[entry_name] = std::move(table);
return tables[entry_name].get();
}
Expand Down
14 changes: 7 additions & 7 deletions src/scan/heap_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ HeapReaderGlobalState::AssignNextBlockNumber(std::mutex &lock) {
// HeapReader
//

HeapReader::HeapReader(Relation relation, duckdb::shared_ptr<HeapReaderGlobalState> heap_reader_global_state,
HeapReader::HeapReader(Relation rel, duckdb::shared_ptr<HeapReaderGlobalState> heap_reader_global_state,
duckdb::shared_ptr<PostgresScanGlobalState> global_state,
duckdb::shared_ptr<PostgresScanLocalState> local_state)
: m_global_state(global_state), m_heap_reader_global_state(heap_reader_global_state), m_local_state(local_state),
m_relation(relation), m_inited(false), m_read_next_page(true), m_block_number(InvalidBlockNumber),
m_buffer(InvalidBuffer), m_current_tuple_index(InvalidOffsetNumber), m_page_tuples_left(0) {
m_rel(rel), m_inited(false), m_read_next_page(true), m_block_number(InvalidBlockNumber), m_buffer(InvalidBuffer),
m_current_tuple_index(InvalidOffsetNumber), m_page_tuples_left(0) {
m_tuple.t_data = NULL;
m_tuple.t_tableOid = RelationGetRelid(m_relation);
m_tuple.t_tableOid = RelationGetRelid(m_rel);
ItemPointerSetInvalid(&m_tuple.t_self);
}

Expand All @@ -57,7 +57,7 @@ Page
HeapReader::PreparePageRead() {
Page page = BufferGetPage(m_buffer);
#if PG_VERSION_NUM < 170000
TestForOldSnapshot(m_global_state->m_snapshot, m_relation, page);
TestForOldSnapshot(m_global_state->m_snapshot, m_rel, page);
#endif
m_page_tuples_all_visible = PageIsAllVisible(page) && !m_global_state->m_snapshot->takenDuringRecovery;
m_page_tuples_left = PageGetMaxOffsetNumber(page) - FirstOffsetNumber + 1;
Expand Down Expand Up @@ -90,7 +90,7 @@ HeapReader::ReadPageTuples(duckdb::DataChunk &output) {
std::lock_guard<std::mutex> lock(DuckdbProcessLock::GetLock());
block = m_block_number;

m_buffer = PostgresFunctionGuard<Buffer>(ReadBufferExtended, m_relation, MAIN_FORKNUM, block, RBM_NORMAL,
m_buffer = PostgresFunctionGuard<Buffer>(ReadBufferExtended, m_rel, MAIN_FORKNUM, block, RBM_NORMAL,
GetAccessStrategy(BAS_BULKREAD));

PostgresFunctionGuard(LockBuffer, m_buffer, BUFFER_LOCK_SHARE);
Expand Down Expand Up @@ -118,7 +118,7 @@ HeapReader::ReadPageTuples(duckdb::DataChunk &output) {
continue;
}

pgstat_count_heap_getnext(m_relation);
pgstat_count_heap_getnext(m_rel);
InsertTupleIntoChunk(output, m_global_state, m_local_state, &m_tuple);
}

Expand Down
Loading

0 comments on commit b319fa1

Please sign in to comment.