Skip to content

Commit

Permalink
Fix cleanup (#1901)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Fix: buffer obj ref count bug after alter.
CI: collect executable when error occurred.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
  • Loading branch information
small-turtle-1 authored Sep 24, 2024
1 parent 1cc4ea6 commit 6496244
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 49 deletions.
7 changes: 3 additions & 4 deletions python/restart_test/test_alter.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,12 @@ def part1(infinity_obj):

infinity_obj.cleanup()

dropped_column_dirs = pathlib.Path(data_dir).rglob("1.col*")
dropped_column_dirs = pathlib.Path(data_dir).rglob("1.col")
# find the column file with the column idx = 1
assert len(list(dropped_column_dirs)) == 0

# contain "0.col" or "2.col"
column_dirs = pathlib.Path(data_dir).rglob("*[02].col")
column_dirs = pathlib.Path(data_dir).rglob("[02].col")
assert len(list(column_dirs)) == 2

res = table_obj.drop_columns(["c3"])
Expand All @@ -194,9 +194,8 @@ def part1(infinity_obj):
@decorator
def part2(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.get_table(table_name)

dropped_column_dirs = pathlib.Path(data_dir).rglob("2.col*")
dropped_column_dirs = pathlib.Path(data_dir).rglob("2.col")
assert len(list(dropped_column_dirs)) == 0

db_obj.drop_table(table_name)
Expand Down
51 changes: 47 additions & 4 deletions src/storage/buffer/buffer_obj.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ BufferObj::BufferObj(BufferManager *buffer_mgr, bool is_ephemeral, UniquePtr<Fil

BufferObj::~BufferObj() = default;

void BufferObj::SetFileWorker(UniquePtr<FileWorker> file_worker) {
file_worker_ = std::move(file_worker);
}
void BufferObj::SetFileWorker(UniquePtr<FileWorker> file_worker) { file_worker_ = std::move(file_worker); }

BufferHandle BufferObj::Load() {
std::unique_lock<std::mutex> locker(w_locker_);
Expand Down Expand Up @@ -161,7 +159,7 @@ bool BufferObj::Save(const FileWorkerSaveCtx &ctx) {

void BufferObj::PickForCleanup() {
std::unique_lock<std::mutex> locker(w_locker_);
if (*ptr_rc_ > 1) {
if (ptr_rc_ > 1) {
return;
}
switch (status_) {
Expand Down Expand Up @@ -299,4 +297,49 @@ void BufferObj::CheckState() const {
}
}

BufferPtr::~BufferPtr() {
if (buffer_obj_ != nullptr && buffer_obj_->ptr_rc_ > 0) {
--buffer_obj_->ptr_rc_;
}
}

BufferPtr::BufferPtr(BufferObj *buffer_obj) : buffer_obj_(buffer_obj) {
++buffer_obj_->ptr_rc_;
}

BufferPtr::BufferPtr(const BufferPtr &other) : buffer_obj_(other.buffer_obj_) { ++buffer_obj_->ptr_rc_; }

BufferPtr::BufferPtr(BufferPtr &&other) : buffer_obj_(other.buffer_obj_) { other.buffer_obj_ = nullptr; }

BufferPtr &BufferPtr::operator=(const BufferPtr &other) {
if (this != &other) {
if (buffer_obj_ != nullptr) {
--buffer_obj_->ptr_rc_;
}
buffer_obj_ = other.buffer_obj_;
if (buffer_obj_ != nullptr) {
++buffer_obj_->ptr_rc_;
}
}
return *this;
}

BufferPtr &BufferPtr::operator=(BufferPtr &&other) {
if (this != &other) {
if (buffer_obj_ != nullptr) {
--buffer_obj_->ptr_rc_;
}
buffer_obj_ = other.buffer_obj_;
other.buffer_obj_ = nullptr;
}
return *this;
}

void BufferPtr::reset() {
if (buffer_obj_ != nullptr) {
--buffer_obj_->ptr_rc_;
buffer_obj_ = nullptr;
}
}

} // namespace infinity
39 changes: 12 additions & 27 deletions src/storage/buffer/buffer_obj.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -155,46 +155,31 @@ private:
u32 id_;

friend class BufferPtr;
SharedPtr<u32> ptr_rc_ = MakeShared<u32>(0);
u32 ptr_rc_ = 0;
};

export class BufferPtr {
public:
BufferPtr() : buffer_obj_(nullptr) {}

~BufferPtr() {
if (buffer_obj_ != nullptr && *ptr_rc_ > 0) {
--*ptr_rc_;
}
}
~BufferPtr();

BufferPtr(BufferObj *buffer_obj) : buffer_obj_(buffer_obj), ptr_rc_(buffer_obj->ptr_rc_) { ++*buffer_obj_->ptr_rc_; }

BufferPtr(const BufferPtr &other) : buffer_obj_(other.buffer_obj_), ptr_rc_(other.ptr_rc_) { ++*buffer_obj_->ptr_rc_; }
BufferPtr &operator=(const BufferPtr &other) {
if (this != &other) {
if (buffer_obj_ != nullptr) {
--*ptr_rc_;
}
buffer_obj_ = other.buffer_obj_;
ptr_rc_ = other.ptr_rc_;
++*buffer_obj_->ptr_rc_;
}
return *this;
}
BufferPtr(BufferObj *buffer_obj);

BufferPtr(const BufferPtr &other);

BufferPtr(BufferPtr &&other);

BufferPtr &operator=(const BufferPtr &other);

BufferPtr &operator=(BufferPtr &&other);

BufferObj *get() const { return buffer_obj_; }

void reset() {
if (buffer_obj_ != nullptr) {
--*ptr_rc_;
buffer_obj_ = nullptr;
}
}
void reset();

private:
BufferObj *buffer_obj_;
SharedPtr<u32> ptr_rc_;
};

} // namespace infinity
3 changes: 2 additions & 1 deletion src/storage/meta/cleanup_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ void CleanupScanner::Scan() {

void CleanupScanner::Cleanup(CleanupInfoTracer *info_tracer) && {
for (auto &[entry, dropped] : entries_) {
std::move(*entry).Cleanup(info_tracer, dropped);
entry->Cleanup(info_tracer, dropped);
entry.reset();
}
buffer_mgr_->RemoveClean();
}
Expand Down
18 changes: 10 additions & 8 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ TxnTimeStamp TxnManager::GetCleanupScanTS() {
TxnTimeStamp checkpointed_ts = wal_mgr_->GetCheckpointedTS();
TxnTimeStamp res = std::min(first_uncommitted_begin_ts, checkpointed_ts);
LOG_INFO(fmt::format("Cleanup scan ts: {}, checkpoint ts: {}", res, checkpointed_ts));
if (!finished_txns_.empty()) {
res = std::min(res, finished_txns_.top()->CommitTS());
for (auto *txn : finished_txns_) {
res = std::min(res, txn->CommitTS());
}
for (auto *txn : finishing_txns_) {
res = std::min(res, txn->BeginTS());
Expand All @@ -318,10 +318,10 @@ void TxnManager::FinishTxn(Txn *txn) {
auto state = txn->GetTxnState();
if (state == TxnState::kCommitting) {
txn->SetTxnCommitted();
finished_txns_.emplace(txn);
finished_txns_.emplace_back(txn);
} else if (state == TxnState::kRollbacking) {
txn->SetTxnRollbacked();
finished_txns_.emplace(txn);
finished_txns_.emplace_back(txn);
} else {
String error_message = fmt::format("Invalid transaction status: {}", ToString(state));
UnrecoverableError(error_message);
Expand All @@ -334,14 +334,16 @@ void TxnManager::FinishTxn(Txn *txn) {
for (auto *finishing_txn : finishing_txns_) {
max_commit_ts = std::max(max_commit_ts, finishing_txn->CommitTS());
}
while (!finished_txns_.empty()) {
auto *finished_txn = finished_txns_.top();
auto iter = finished_txns_.begin();
while (iter != finished_txns_.end()) {
auto *finished_txn = *iter;
if (finished_txn->CommitTS() < max_commit_ts) {
break;
++iter;
continue;
}
auto finished_txn_id = finished_txn->TxnID();
// LOG_INFO(fmt::format("Txn: {} is finished. committed ts: {}", finished_txn_id, finished_txn->CommittedTS()));
finished_txns_.pop();
iter = finished_txns_.erase(iter);
SizeT remove_n = txn_map_.erase(finished_txn_id);
if (remove_n == 0) {
String error_message = fmt::format("Txn: {} not found in txn map", finished_txn_id);
Expand Down
6 changes: 1 addition & 5 deletions src/storage/txn/txn_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,6 @@ public:
bool InCheckpointProcess(TxnTimeStamp commit_ts);

private:
struct TxnCmp {
bool operator()(Txn *lhs, Txn *rhs) const { return lhs->CommitTS() > rhs->CommitTS(); }
};

Catalog *catalog_{};
mutable std::mutex locker_{};
BufferManager *buffer_mgr_{};
Expand All @@ -128,7 +124,7 @@ private:

Deque<WeakPtr<Txn>> beginned_txns_; // sorted by begin ts
HashSet<Txn *> finishing_txns_; // the txns in committing stage, can use flat_map
Heap<Txn *, TxnCmp> finished_txns_; // the txns that committed_ts
List<Txn *> finished_txns_; // the txns that committed_ts

Map<TxnTimeStamp, WalEntry *> wait_conflict_ck_{}; // sorted by commit ts

Expand Down

0 comments on commit 6496244

Please sign in to comment.