-
Notifications
You must be signed in to change notification settings - Fork 91
/
Copy pathquack_heap_seq_scan.hpp
95 lines (81 loc) · 2.39 KB
/
quack_heap_seq_scan.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#pragma once
#include "duckdb.hpp"
extern "C" {
#include "postgres.h"
#include "access/tableam.h"
#include "access/heapam.h"
#include "storage/bufmgr.h"
}
#include <mutex>
#include <atomic>
namespace quack {
class PostgresHeapSeqScanThreadInfo {
public:
PostgresHeapSeqScanThreadInfo();
~PostgresHeapSeqScanThreadInfo();
void EndScan();
public:
TupleDesc m_tuple_desc;
bool m_inited;
bool m_read_next_page;
bool m_page_tuples_all_visible;
int m_output_vector_size;
BlockNumber m_block_number;
Buffer m_buffer;
OffsetNumber m_current_tuple_index;
int m_page_tuples_left;
HeapTupleData m_tuple;
};
class PostgresHeapSeqParallelScanState {
private:
static int const k_max_prefetch_block_number = 32;
public:
PostgresHeapSeqParallelScanState()
: m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_count_tuples_only(false),
m_total_row_count(0), m_last_prefetch_block(0), m_strategy(nullptr) {
}
~PostgresHeapSeqParallelScanState() {
if (m_strategy)
pfree(m_strategy);
}
BlockNumber AssignNextBlockNumber();
void PrefetchNextRelationPages(Relation rel);
std::mutex m_lock;
BlockNumber m_nblocks;
BlockNumber m_last_assigned_block_number;
bool m_count_tuples_only;
duckdb::map<duckdb::idx_t, duckdb::idx_t> m_columns;
duckdb::map<duckdb::idx_t, duckdb::idx_t> m_projections;
duckdb::TableFilterSet *m_filters = nullptr;
std::atomic<std::uint32_t> m_total_row_count;
BlockNumber m_last_prefetch_block;
BufferAccessStrategy m_strategy;
};
class PostgresHeapSeqScan {
private:
public:
PostgresHeapSeqScan(RangeTblEntry *table);
~PostgresHeapSeqScan();
PostgresHeapSeqScan(const PostgresHeapSeqScan &other) = delete;
PostgresHeapSeqScan &operator=(const PostgresHeapSeqScan &other) = delete;
PostgresHeapSeqScan &operator=(PostgresHeapSeqScan &&other) = delete;
PostgresHeapSeqScan(PostgresHeapSeqScan &&other);
public:
void InitParallelScanState( duckdb::TableFunctionInitInput &input);
void
SetSnapshot(Snapshot snapshot) {
m_snapshot = snapshot;
}
public:
Relation GetRelation();
TupleDesc GetTupleDesc();
bool ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo);
bool IsValid() const;
private:
Page PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanInfo);
private:
Relation m_rel = nullptr;
Snapshot m_snapshot = nullptr;
PostgresHeapSeqParallelScanState m_parallel_scan_state;
};
} // namespace quack