Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track ColumnChunk allocations through the BufferManager #3743

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/graph/on_disk_graph.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "graph/on_disk_graph.h"

#include "main/client_context.h"
#include "storage/buffer_manager/memory_manager.h"
#include "storage/storage_manager.h"

using namespace kuzu::catalog;
Expand All @@ -11,11 +12,11 @@ using namespace kuzu::common;
namespace kuzu {
namespace graph {

static std::unique_ptr<RelTableScanState> getRelScanState(RelDataDirection direction,
ValueVector* srcVector, ValueVector* dstVector) {
static std::unique_ptr<RelTableScanState> getRelScanState(MemoryManager& mm,
RelDataDirection direction, ValueVector* srcVector, ValueVector* dstVector) {
// Empty columnIDs since we do not scan any rel property.
auto columnIDs = std::vector<column_id_t>{};
auto scanState = std::make_unique<RelTableScanState>(columnIDs, direction);
auto scanState = std::make_unique<RelTableScanState>(mm, columnIDs, direction);
scanState->nodeIDVector = srcVector;
scanState->outputVectors.push_back(dstVector);
return scanState;
Expand All @@ -29,9 +30,9 @@ OnDiskGraphScanState::OnDiskGraphScanState(MemoryManager* mm) {
dstNodeIDVector = std::make_unique<ValueVector>(LogicalType::INTERNAL_ID(), mm);
dstNodeIDVector->state = dstNodeIDVectorState;
fwdScanState =
getRelScanState(RelDataDirection::FWD, srcNodeIDVector.get(), dstNodeIDVector.get());
getRelScanState(*mm, RelDataDirection::FWD, srcNodeIDVector.get(), dstNodeIDVector.get());
bwdScanState =
getRelScanState(RelDataDirection::BWD, srcNodeIDVector.get(), dstNodeIDVector.get());
getRelScanState(*mm, RelDataDirection::BWD, srcNodeIDVector.get(), dstNodeIDVector.get());
}

OnDiskGraph::OnDiskGraph(ClientContext* context, const GraphEntry& entry)
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ struct BufferPoolConstants {
static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = (uint64_t)1 << 43; // (8TB)
#endif

static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 26; // (64MB)
static constexpr uint64_t DEFAULT_BUFFER_POOL_SIZE_FOR_TESTING = 1ull << 28; // (256MB)
};

struct StorageConstants {
Expand Down
7 changes: 6 additions & 1 deletion src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
namespace kuzu {
namespace storage {
class NodeTable;
}
class MemoryManager;
} // namespace storage
namespace processor {

using partitioner_func_t =
Expand Down Expand Up @@ -35,6 +36,10 @@ struct PartitionerSharedState {
std::mutex mtx;
storage::NodeTable* srcNodeTable;
storage::NodeTable* dstNodeTable;
storage::MemoryManager& mm;

explicit PartitionerSharedState(storage::MemoryManager& mm)
: mtx{}, srcNodeTable{nullptr}, dstNodeTable{nullptr}, mm{mm} {}

// FIXME(Guodong): we should not maintain maxNodeOffsets.
std::vector<common::offset_t> maxNodeOffsets; // max node offset in each direction.
Expand Down
10 changes: 7 additions & 3 deletions src/include/processor/operator/persistent/batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include "storage/store/table.h"

namespace kuzu {
namespace storage {
class MemoryManager;
}
namespace processor {

struct BatchInsertInfo {
Expand All @@ -25,16 +28,17 @@ struct BatchInsertSharedState {
storage::Table* table;
std::shared_ptr<FactorizedTable> fTable;
storage::WAL* wal;
storage::MemoryManager* mm;

BatchInsertSharedState(storage::Table* table, std::shared_ptr<FactorizedTable> fTable,
storage::WAL* wal)
: numRows{0}, table{table}, fTable{std::move(fTable)}, wal{wal} {};
storage::WAL* wal, storage::MemoryManager* mm)
: numRows{0}, table{table}, fTable{std::move(fTable)}, wal{wal}, mm{mm} {};
BatchInsertSharedState(const BatchInsertSharedState& other) = delete;

virtual ~BatchInsertSharedState() = default;

std::unique_ptr<BatchInsertSharedState> copy() const {
auto result = std::make_unique<BatchInsertSharedState>(table, fTable, wal);
auto result = std::make_unique<BatchInsertSharedState>(table, fTable, wal, mm);
result->numRows.store(numRows.load());
return result;
}
Expand Down
15 changes: 9 additions & 6 deletions src/include/processor/operator/persistent/node_batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
#include "storage/store/node_table.h"

namespace kuzu {
namespace storage {
class MemoryManager;
}
namespace transaction {
class Transaction;
};
Expand Down Expand Up @@ -58,8 +61,8 @@ struct NodeBatchInsertSharedState final : public BatchInsertSharedState {
std::unique_ptr<storage::ChunkedNodeGroup> sharedNodeGroup;

NodeBatchInsertSharedState(storage::Table* table, std::shared_ptr<FactorizedTable> fTable,
storage::WAL* wal)
: BatchInsertSharedState{table, fTable, wal}, readerSharedState{nullptr},
storage::WAL* wal, storage::MemoryManager* mm)
: BatchInsertSharedState{table, fTable, wal, mm}, readerSharedState{nullptr},
distinctSharedState{nullptr}, currentNodeGroupIdx{0}, sharedNodeGroup{nullptr} {
pkIndex =
common::ku_dynamic_cast<storage::Table*, storage::NodeTable*>(table)->getPKIndex();
Expand Down Expand Up @@ -119,22 +122,22 @@ class NodeBatchInsert final : public BatchInsert {
void writeAndResetNodeGroup(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx,
std::unique_ptr<storage::ChunkedNodeGroup>& nodeGroup,
std::optional<IndexBuilder>& indexBuilder);
std::optional<IndexBuilder>& indexBuilder, storage::MemoryManager* mm);

private:
// Returns the number of nodes written from the group
uint64_t writeToExistingNodeGroup(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, std::optional<IndexBuilder>& indexBuilder,
common::column_id_t pkColumnID, storage::NodeTable* table,
storage::ChunkedNodeGroup* nodeGroup);
storage::ChunkedNodeGroup* nodeGroup, storage::MemoryManager* mm);

void appendIncompleteNodeGroup(transaction::Transaction* transaction,
std::unique_ptr<storage::ChunkedNodeGroup> localNodeGroup,
std::optional<IndexBuilder>& indexBuilder);
std::optional<IndexBuilder>& indexBuilder, storage::MemoryManager* mm);
void clearToIndex(std::unique_ptr<storage::ChunkedNodeGroup>& nodeGroup,
common::offset_t startIndexInGroup);

void copyToNodeGroup(transaction::Transaction* transaction);
void copyToNodeGroup(transaction::Transaction* transaction, storage::MemoryManager* mm);
void populateDefaultColumns();
};

Expand Down
5 changes: 4 additions & 1 deletion src/include/processor/operator/scan/scan_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
#include "storage/store/rel_table.h"

namespace kuzu {
namespace storage {
class MemoryManager;
}
namespace processor {

struct ScanRelTableInfo {
Expand All @@ -22,7 +25,7 @@ struct ScanRelTableInfo {
columnPredicates{std::move(columnPredicates)} {}
EXPLICIT_COPY_DEFAULT_MOVE(ScanRelTableInfo);

void initScanState();
void initScanState(storage::MemoryManager& mm);

private:
ScanRelTableInfo(const ScanRelTableInfo& other)
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/buffer_manager/buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class EvictionQueue {
*/

class BufferManager {
friend class MemoryAllocator;
friend class MemoryManager;

public:
enum class PageReadPolicy : uint8_t { READ_PAGE = 0, DONT_READ_PAGE = 1 };
Expand Down
63 changes: 24 additions & 39 deletions src/include/storage/buffer_manager/memory_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,77 +20,62 @@ class VirtualFileSystem;

namespace storage {

class MemoryAllocator;
class MemoryManager;
class BMFileHandle;
class BufferManager;

class MemoryBuffer {
public:
MemoryBuffer(MemoryAllocator* allocator, common::page_idx_t blockIdx, uint8_t* buffer,
MemoryBuffer(MemoryManager* mm, common::page_idx_t blockIdx, uint8_t* buffer,
uint64_t size = common::BufferPoolConstants::PAGE_256KB_SIZE);
~MemoryBuffer();

public:
std::span<uint8_t> buffer;
common::page_idx_t pageIdx;
MemoryAllocator* allocator;
};

class MemoryAllocator {
friend class MemoryBuffer;

public:
MemoryAllocator(BufferManager* bm, common::VirtualFileSystem* vfs,
main::ClientContext* context);

~MemoryAllocator();

std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero, uint64_t size);
inline common::page_offset_t getPageSize() const { return pageSize; }

private:
void freeBlock(common::page_idx_t pageIdx, std::span<uint8_t> buffer);

private:
BMFileHandle* fh;
BufferManager* bm;
common::page_offset_t pageSize;
std::stack<common::page_idx_t> freePages;
std::mutex allocatorLock;
MemoryManager* mm;
};

/*
* The Memory Manager (MM) is used for allocating/reclaiming intermediate memory blocks.
* It can allocate a memory buffer of size PAGE_256KB from the buffer manager backed by a
* BMFileHandle with temp in-mem file.
*
* Internally, MM uses a MemoryAllocator. The MemoryAllocator is holding the BMFileHandle backed by
* a temp in-mem file, and responsible for allocating/reclaiming memory buffers of its size class
* from the buffer manager. The MemoryAllocator keeps track of free pages in the BMFileHandle, so
* that it can reuse those freed pages without allocating new pages. The MemoryAllocator is
* The MemoryManager holds a BMFileHandle backed by
* a temp in-mem file, and is responsible for allocating/reclaiming memory buffers of its size class
* from the buffer manager. The MemoryManager keeps track of free pages in the BMFileHandle, so
* that it can reuse those freed pages without allocating new pages. The MemoryManager is
* thread-safe, so that multiple threads can allocate/reclaim memory blocks with the same size class
* at the same time.
*
* MM will return a MemoryBuffer to the caller, which is a wrapper of the allocated memory block,
* and it will automatically call its allocator to reclaim the memory block when it is destroyed.
*/
class MemoryManager {
friend class MemoryBuffer;

public:
explicit MemoryManager(BufferManager* bm, common::VirtualFileSystem* vfs,
main::ClientContext* context)
: bm{bm} {
allocator = std::make_unique<MemoryAllocator>(bm, vfs, context);
}
MemoryManager(BufferManager* bm, common::VirtualFileSystem* vfs, main::ClientContext* context);

~MemoryManager();

std::unique_ptr<MemoryBuffer> mallocBuffer(bool initializeToZero, uint64_t size);
std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero = false,
uint64_t size = common::BufferPoolConstants::PAGE_256KB_SIZE) {
return allocator->allocateBuffer(initializeToZero, size);
}
uint64_t size = common::BufferPoolConstants::PAGE_256KB_SIZE);
inline common::page_offset_t getPageSize() const { return pageSize; }

BufferManager* getBufferManager() const { return bm; }

private:
void freeBlock(common::page_idx_t pageIdx, std::span<uint8_t> buffer);

private:
BMFileHandle* fh;
BufferManager* bm;
std::unique_ptr<MemoryAllocator> allocator;
common::page_offset_t pageSize;
std::stack<common::page_idx_t> freePages;
std::mutex allocatorLock;
};

} // namespace storage
} // namespace kuzu
11 changes: 6 additions & 5 deletions src/include/storage/local_storage/local_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ namespace storage {

class ChunkedNodeGroup;
struct TableScanState;
class MemoryManager;

class LocalNodeNG final : public LocalNodeGroup {
public:
LocalNodeNG(common::table_id_t tableID, common::offset_t nodeGroupStartOffset,
const std::vector<common::LogicalType>& dataTypes)
: LocalNodeGroup{nodeGroupStartOffset, dataTypes}, tableID{tableID} {}
const std::vector<common::LogicalType>& dataTypes, MemoryManager* mm)
: LocalNodeGroup{nodeGroupStartOffset, dataTypes, mm}, tableID{tableID} {}
DELETE_COPY_DEFAULT_MOVE(LocalNodeNG);

void initializeScanState(TableScanState& scanState) const;
Expand Down Expand Up @@ -41,8 +42,8 @@ class LocalNodeNG final : public LocalNodeGroup {
class LocalNodeTableData final : public LocalTableData {
public:
explicit LocalNodeTableData(common::table_id_t tableID,
std::vector<common::LogicalType> dataTypes)
: LocalTableData{tableID, std::move(dataTypes)} {}
std::vector<common::LogicalType> dataTypes, MemoryManager* mm)
: LocalTableData{tableID, std::move(dataTypes), mm} {}

private:
LocalNodeGroup* getOrCreateLocalNodeGroup(common::ValueVector* nodeIDVector) override;
Expand All @@ -51,7 +52,7 @@ class LocalNodeTableData final : public LocalTableData {
struct TableReadState;
class LocalNodeTable final : public LocalTable {
public:
explicit LocalNodeTable(Table& table);
explicit LocalNodeTable(Table& table, MemoryManager* mm);

bool insert(TableInsertState& state) override;
bool update(TableUpdateState& state) override;
Expand Down
10 changes: 6 additions & 4 deletions src/include/storage/local_storage/local_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace kuzu {
namespace storage {
class MemoryManager;

static constexpr common::column_id_t LOCAL_NBR_ID_COLUMN_ID = 0;
static constexpr common::column_id_t LOCAL_REL_ID_COLUMN_ID = 1;
Expand All @@ -15,7 +16,8 @@ class LocalRelNG final : public LocalNodeGroup {
friend class RelTableData;

public:
LocalRelNG(common::offset_t nodeGroupStartOffset, std::vector<common::LogicalType> dataTypes);
LocalRelNG(common::offset_t nodeGroupStartOffset, std::vector<common::LogicalType> dataTypes,
MemoryManager* mm);
DELETE_COPY_DEFAULT_MOVE(LocalRelNG);

common::row_idx_t scanCSR(common::offset_t srcOffset, common::offset_t posToReadForOffset,
Expand Down Expand Up @@ -52,16 +54,16 @@ class LocalRelTableData final : public LocalTableData {

public:
explicit LocalRelTableData(common::table_id_t tableID,
std::vector<common::LogicalType> dataTypes)
: LocalTableData{tableID, std::move(dataTypes)} {}
std::vector<common::LogicalType> dataTypes, MemoryManager* mm)
: LocalTableData{tableID, std::move(dataTypes), mm} {}

private:
LocalNodeGroup* getOrCreateLocalNodeGroup(common::ValueVector* nodeIDVector) override;
};

class LocalRelTable final : public LocalTable {
public:
explicit LocalRelTable(Table& table);
explicit LocalRelTable(Table& table, MemoryManager* mm);

bool insert(TableInsertState& insertState) override;
bool update(TableUpdateState& updateState) override;
Expand Down
Loading