From 6496244a598f448e85e98d4e4e048558c75b260b Mon Sep 17 00:00:00 2001 From: shen yushi Date: Tue, 24 Sep 2024 11:27:15 +0800 Subject: [PATCH] Fix cleanup (#1901) ### What problem does this PR solve? Fix: buffer obj ref count bug after alter. CI: collect executable when error occurred. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- python/restart_test/test_alter.py | 7 ++-- src/storage/buffer/buffer_obj.cpp | 51 +++++++++++++++++++++++++--- src/storage/buffer/buffer_obj.cppm | 39 +++++++-------------- src/storage/meta/cleanup_scanner.cpp | 3 +- src/storage/txn/txn_manager.cpp | 18 +++++----- src/storage/txn/txn_manager.cppm | 6 +--- 6 files changed, 75 insertions(+), 49 deletions(-) diff --git a/python/restart_test/test_alter.py b/python/restart_test/test_alter.py index 620bfc653c..96de3c0059 100644 --- a/python/restart_test/test_alter.py +++ b/python/restart_test/test_alter.py @@ -178,12 +178,12 @@ def part1(infinity_obj): infinity_obj.cleanup() - dropped_column_dirs = pathlib.Path(data_dir).rglob("1.col*") + dropped_column_dirs = pathlib.Path(data_dir).rglob("1.col") # find the column file with the column idx = 1 assert len(list(dropped_column_dirs)) == 0 # contain "0.col" or "2.col" - column_dirs = pathlib.Path(data_dir).rglob("*[02].col") + column_dirs = pathlib.Path(data_dir).rglob("[02].col") assert len(list(column_dirs)) == 2 res = table_obj.drop_columns(["c3"]) @@ -194,9 +194,8 @@ def part1(infinity_obj): @decorator def part2(infinity_obj): db_obj = infinity_obj.get_database("default_db") - table_obj = db_obj.get_table(table_name) - dropped_column_dirs = pathlib.Path(data_dir).rglob("2.col*") + dropped_column_dirs = pathlib.Path(data_dir).rglob("2.col") assert len(list(dropped_column_dirs)) == 0 db_obj.drop_table(table_name) diff --git a/src/storage/buffer/buffer_obj.cpp b/src/storage/buffer/buffer_obj.cpp index 31c289b578..bea8ae7f79 100644 --- a/src/storage/buffer/buffer_obj.cpp +++ b/src/storage/buffer/buffer_obj.cpp @@ -41,9 +41,7 @@ BufferObj::BufferObj(BufferManager *buffer_mgr, bool is_ephemeral, UniquePtr file_worker) { - file_worker_ = std::move(file_worker); -} +void BufferObj::SetFileWorker(UniquePtr file_worker) { file_worker_ = std::move(file_worker); } BufferHandle BufferObj::Load() { std::unique_lock locker(w_locker_); @@ -161,7 +159,7 @@ bool BufferObj::Save(const FileWorkerSaveCtx &ctx) { void BufferObj::PickForCleanup() { std::unique_lock locker(w_locker_); - if (*ptr_rc_ > 1) { + if (ptr_rc_ > 1) { return; } switch (status_) { @@ -299,4 +297,49 @@ void BufferObj::CheckState() const { } } +BufferPtr::~BufferPtr() { + if (buffer_obj_ != nullptr && buffer_obj_->ptr_rc_ > 0) { + --buffer_obj_->ptr_rc_; + } +} + +BufferPtr::BufferPtr(BufferObj *buffer_obj) : buffer_obj_(buffer_obj) { + ++buffer_obj_->ptr_rc_; +} + +BufferPtr::BufferPtr(const BufferPtr &other) : buffer_obj_(other.buffer_obj_) { ++buffer_obj_->ptr_rc_; } + +BufferPtr::BufferPtr(BufferPtr &&other) : buffer_obj_(other.buffer_obj_) { other.buffer_obj_ = nullptr; } + +BufferPtr &BufferPtr::operator=(const BufferPtr &other) { + if (this != &other) { + if (buffer_obj_ != nullptr) { + --buffer_obj_->ptr_rc_; + } + buffer_obj_ = other.buffer_obj_; + if (buffer_obj_ != nullptr) { + ++buffer_obj_->ptr_rc_; + } + } + return *this; +} + +BufferPtr &BufferPtr::operator=(BufferPtr &&other) { + if (this != &other) { + if (buffer_obj_ != nullptr) { + --buffer_obj_->ptr_rc_; + } + buffer_obj_ = other.buffer_obj_; + other.buffer_obj_ = nullptr; + } + return *this; +} + +void BufferPtr::reset() { + if (buffer_obj_ != nullptr) { + --buffer_obj_->ptr_rc_; + buffer_obj_ = nullptr; + } +} + } // namespace infinity diff --git a/src/storage/buffer/buffer_obj.cppm b/src/storage/buffer/buffer_obj.cppm index cf0d96437a..62704dbd8e 100644 --- a/src/storage/buffer/buffer_obj.cppm +++ b/src/storage/buffer/buffer_obj.cppm @@ -155,46 +155,31 @@ private: u32 id_; friend class BufferPtr; - SharedPtr ptr_rc_ = MakeShared(0); + u32 ptr_rc_ = 0; }; export class BufferPtr { public: BufferPtr() : buffer_obj_(nullptr) {} - ~BufferPtr() { - if (buffer_obj_ != nullptr && *ptr_rc_ > 0) { - --*ptr_rc_; - } - } + ~BufferPtr(); - BufferPtr(BufferObj *buffer_obj) : buffer_obj_(buffer_obj), ptr_rc_(buffer_obj->ptr_rc_) { ++*buffer_obj_->ptr_rc_; } - - BufferPtr(const BufferPtr &other) : buffer_obj_(other.buffer_obj_), ptr_rc_(other.ptr_rc_) { ++*buffer_obj_->ptr_rc_; } - BufferPtr &operator=(const BufferPtr &other) { - if (this != &other) { - if (buffer_obj_ != nullptr) { - --*ptr_rc_; - } - buffer_obj_ = other.buffer_obj_; - ptr_rc_ = other.ptr_rc_; - ++*buffer_obj_->ptr_rc_; - } - return *this; - } + BufferPtr(BufferObj *buffer_obj); + + BufferPtr(const BufferPtr &other); + + BufferPtr(BufferPtr &&other); + + BufferPtr &operator=(const BufferPtr &other); + + BufferPtr &operator=(BufferPtr &&other); BufferObj *get() const { return buffer_obj_; } - void reset() { - if (buffer_obj_ != nullptr) { - --*ptr_rc_; - buffer_obj_ = nullptr; - } - } + void reset(); private: BufferObj *buffer_obj_; - SharedPtr ptr_rc_; }; } // namespace infinity \ No newline at end of file diff --git a/src/storage/meta/cleanup_scanner.cpp b/src/storage/meta/cleanup_scanner.cpp index d2cc005cac..74def3a382 100644 --- a/src/storage/meta/cleanup_scanner.cpp +++ b/src/storage/meta/cleanup_scanner.cpp @@ -52,7 +52,8 @@ void CleanupScanner::Scan() { void CleanupScanner::Cleanup(CleanupInfoTracer *info_tracer) && { for (auto &[entry, dropped] : entries_) { - std::move(*entry).Cleanup(info_tracer, dropped); + entry->Cleanup(info_tracer, dropped); + entry.reset(); } buffer_mgr_->RemoveClean(); } diff --git a/src/storage/txn/txn_manager.cpp b/src/storage/txn/txn_manager.cpp index 218ba20739..95f6fe4fa6 100644 --- a/src/storage/txn/txn_manager.cpp +++ b/src/storage/txn/txn_manager.cpp @@ -293,8 +293,8 @@ TxnTimeStamp TxnManager::GetCleanupScanTS() { TxnTimeStamp checkpointed_ts = wal_mgr_->GetCheckpointedTS(); TxnTimeStamp res = std::min(first_uncommitted_begin_ts, checkpointed_ts); LOG_INFO(fmt::format("Cleanup scan ts: {}, checkpoint ts: {}", res, checkpointed_ts)); - if (!finished_txns_.empty()) { - res = std::min(res, finished_txns_.top()->CommitTS()); + for (auto *txn : finished_txns_) { + res = std::min(res, txn->CommitTS()); } for (auto *txn : finishing_txns_) { res = std::min(res, txn->BeginTS()); @@ -318,10 +318,10 @@ void TxnManager::FinishTxn(Txn *txn) { auto state = txn->GetTxnState(); if (state == TxnState::kCommitting) { txn->SetTxnCommitted(); - finished_txns_.emplace(txn); + finished_txns_.emplace_back(txn); } else if (state == TxnState::kRollbacking) { txn->SetTxnRollbacked(); - finished_txns_.emplace(txn); + finished_txns_.emplace_back(txn); } else { String error_message = fmt::format("Invalid transaction status: {}", ToString(state)); UnrecoverableError(error_message); @@ -334,14 +334,16 @@ void TxnManager::FinishTxn(Txn *txn) { for (auto *finishing_txn : finishing_txns_) { max_commit_ts = std::max(max_commit_ts, finishing_txn->CommitTS()); } - while (!finished_txns_.empty()) { - auto *finished_txn = finished_txns_.top(); + auto iter = finished_txns_.begin(); + while (iter != finished_txns_.end()) { + auto *finished_txn = *iter; if (finished_txn->CommitTS() < max_commit_ts) { - break; + ++iter; + continue; } auto finished_txn_id = finished_txn->TxnID(); // LOG_INFO(fmt::format("Txn: {} is finished. committed ts: {}", finished_txn_id, finished_txn->CommittedTS())); - finished_txns_.pop(); + iter = finished_txns_.erase(iter); SizeT remove_n = txn_map_.erase(finished_txn_id); if (remove_n == 0) { String error_message = fmt::format("Txn: {} not found in txn map", finished_txn_id); diff --git a/src/storage/txn/txn_manager.cppm b/src/storage/txn/txn_manager.cppm index 9ab138af57..4a3456956d 100644 --- a/src/storage/txn/txn_manager.cppm +++ b/src/storage/txn/txn_manager.cppm @@ -115,10 +115,6 @@ public: bool InCheckpointProcess(TxnTimeStamp commit_ts); private: - struct TxnCmp { - bool operator()(Txn *lhs, Txn *rhs) const { return lhs->CommitTS() > rhs->CommitTS(); } - }; - Catalog *catalog_{}; mutable std::mutex locker_{}; BufferManager *buffer_mgr_{}; @@ -128,7 +124,7 @@ private: Deque> beginned_txns_; // sorted by begin ts HashSet finishing_txns_; // the txns in committing stage, can use flat_map - Heap finished_txns_; // the txns that committed_ts + List finished_txns_; // the txns that committed_ts Map wait_conflict_ck_{}; // sorted by commit ts