From ead2460340f05d5ddf21a9121a3746b07e7a1ebb Mon Sep 17 00:00:00 2001 From: shen yushi Date: Tue, 8 Oct 2024 14:34:59 +0800 Subject: [PATCH] Fix pr #1949 (#1979) ### What problem does this PR solve? Fix reverted pr #1949 (revert in #1960) Related issue: #1957 & #1958 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- python/restart_test/test_memidx.py | 131 ++++++++++++++---- src/storage/meta/entry/base_entry.cpp | 2 +- .../meta/entry/segment_index_entry.cpp | 1 + src/storage/txn/txn.cpp | 31 +++-- src/storage/txn/txn.cppm | 2 +- src/storage/txn/txn_manager.cpp | 56 ++++---- src/storage/txn/txn_manager.cppm | 2 +- src/storage/txn/txn_store.cpp | 14 +- src/storage/txn/txn_store.cppm | 2 +- .../config/restart_test/test_memidx/1.toml | 1 + .../config/restart_test/test_memidx/2.toml | 1 + .../config/restart_test/test_memidx/3.toml | 1 + 12 files changed, 163 insertions(+), 81 deletions(-) diff --git a/python/restart_test/test_memidx.py b/python/restart_test/test_memidx.py index a852888cdd..31000300f7 100644 --- a/python/restart_test/test_memidx.py +++ b/python/restart_test/test_memidx.py @@ -3,6 +3,8 @@ from infinity_runner import InfinityRunner, infinity_runner_decorator_factory from infinity import index import time +import pathlib +from infinity.common import ConflictType class TestMemIdx: @@ -14,6 +16,7 @@ def test_mem_hnsw(self, infinity_runner: InfinityRunner): infinity_runner.clear() decorator1 = infinity_runner_decorator_factory(config1, uri, infinity_runner) + @decorator1 def part1(infinity_obj): db_obj = infinity_obj.get_database("default_db") @@ -49,6 +52,7 @@ def part1(infinity_obj): # config1 can held 6 rows of hnsw mem index before dump # 1. recover by dumpindex wal & memindex recovery decorator2 = infinity_runner_decorator_factory(config2, uri, infinity_runner) + @decorator2 def part2(infinity_obj): db_obj = infinity_obj.get_database("default_db") @@ -74,6 +78,7 @@ def part2(infinity_obj): # 2. recover by delta ckp & dumpindex wal & memindex recovery decorator3 = infinity_runner_decorator_factory(config3, uri, infinity_runner) + @decorator3 def part3(infinity_obj): db_obj = infinity_obj.get_database("default_db") @@ -99,28 +104,106 @@ def check(): part3() + # create table test_memidx1(c1 int, c2 embedding(float, 4)); + # create index idx1 on test_memidx1(c2) using hnsw with(m=16, ef_construction=200, metric=l2,block_size=1); + # insert into test_memidx1 values(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]); + # insert into test_memidx1 values(4,[0.2,0.1,0.3,0.4]); + # # wait 5s + # insert into test_memidx1 values(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]); + + # select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6); + # # result: 4, 4, 4, 4, 4, 2 + # select count(*) from test_memidx1; + # # result: 10 + # insert into test_memidx1 values(6,[0.3,0.2,0.1,0.4]),(6,[0.3,0.2,0.1,0.4]); + # # wait 5s + # insert into test_memidx1 values(8,[0.4,0.3,0.2,0.1]); + + # select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6); + # # result: 8, 6, 6, 4, 4, 4 + # select count(*) from test_memidx1; + # # result: 13 + # # wait 3s + # select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6); + # # result: 8, 6, 6, 4, 4, 4 + # select count(*) from test_memidx1; + # # result: 13 + + def test_optimize_from_different_database(self, infinity_runner: InfinityRunner): + config1 = "test/data/config/restart_test/test_memidx/1.toml" + config2 = "test/data/config/restart_test/test_memidx/3.toml" + uri = common_values.TEST_LOCAL_HOST + decorator1 = infinity_runner_decorator_factory(config1, uri, infinity_runner) + decorator2 = infinity_runner_decorator_factory(config2, uri, infinity_runner) + + data_dir = "/var/infinity/data" + idx1_name = "index1" + idx2_name = "index2" + + @decorator1 + def part1(infinity_obj): + params = { + "M": "16", + "ef_construction": "20", + "metric": "l2", + "block_size": "1", + } + + infinity_obj.drop_database("db1", conflict_type=ConflictType.Ignore) + db_obj1 = infinity_obj.create_database("db1") + table_obj1 = db_obj1.create_table( + "test_memidx1", + {"c1": {"type": "int"}, "c2": {"type": "vector,4,float"}}, + ) + table_obj1.create_index( + idx1_name, + index.IndexInfo("c2", index.IndexType.Hnsw, params), + ) + table_obj1.insert( + [{"c1": 2, "c2": [0.1, 0.2, 0.3, -0.2]} for i in range(6)] + ) + table_obj1.insert([{"c1": 4, "c2": [0.2, 0.1, 0.3, 0.4]} for i in range(6)]) + + infinity_obj.drop_database("db2", conflict_type=ConflictType.Ignore) + db_obj2 = infinity_obj.create_database("db2") + table_obj2 = db_obj2.create_table( + "test_memidx2", + {"c1": {"type": "int"}, "c2": {"type": "vector,4,float"}}, + ) + table_obj2.create_index( + idx2_name, + index.IndexInfo("c2", index.IndexType.Hnsw, params), + ) + table_obj2.insert( + [{"c1": 2, "c2": [0.1, 0.2, 0.3, -0.2]} for i in range(6)] + ) + table_obj2.insert([{"c1": 4, "c2": [0.2, 0.1, 0.3, 0.4]} for i in range(6)]) -# create table test_memidx1(c1 int, c2 embedding(float, 4)); -# create index idx1 on test_memidx1(c2) using hnsw with(m=16, ef_construction=200, metric=l2,block_size=1); -# insert into test_memidx1 values(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]); -# insert into test_memidx1 values(4,[0.2,0.1,0.3,0.4]); -# # wait 5s -# insert into test_memidx1 values(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]); - -# select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6); -# # result: 4, 4, 4, 4, 4, 2 -# select count(*) from test_memidx1; -# # result: 10 -# insert into test_memidx1 values(6,[0.3,0.2,0.1,0.4]),(6,[0.3,0.2,0.1,0.4]); -# # wait 5s -# insert into test_memidx1 values(8,[0.4,0.3,0.2,0.1]); - -# select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6); -# # result: 8, 6, 6, 4, 4, 4 -# select count(*) from test_memidx1; -# # result: 13 -# # wait 3s -# select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6); -# # result: 8, 6, 6, 4, 4, 4 -# select count(*) from test_memidx1; -# # result: 13 + part1() + + @decorator2 + def part2(infinity_obj): + # wait for optimize + time.sleep(3) + + idx1_files = list(pathlib.Path(data_dir).rglob(f"*{idx1_name}*")) + idx2_files = list(pathlib.Path(data_dir).rglob(f"*{idx2_name}*")) + assert len(idx1_files) == 1 + assert len(idx2_files) == 1 + + idx1_dir = idx1_files[0] + idx1_files_in_dir = list(idx1_dir.glob("*")) + assert len(idx1_files_in_dir) == 3 + + idx2_dir = idx2_files[0] + idx2_files_in_dir = list(idx2_dir.glob("*")) + assert len(idx2_files_in_dir) == 3 + + infinity_obj.cleanup() + idx1_files_in_dir = list(idx1_dir.glob("*")) + assert len(idx1_files_in_dir) == 1 + + idx2_files_in_dir = list(idx2_dir.glob("*")) + assert len(idx2_files_in_dir) == 1 + + part2() diff --git a/src/storage/meta/entry/base_entry.cpp b/src/storage/meta/entry/base_entry.cpp index 08a62f164d..7f37cfe1f4 100644 --- a/src/storage/meta/entry/base_entry.cpp +++ b/src/storage/meta/entry/base_entry.cpp @@ -28,7 +28,7 @@ bool BaseEntry::CheckVisible(Txn *txn) const { TxnTimeStamp begin_ts = txn->BeginTS(); if (txn_id_ == 0) { // could not check if the entry is visible accurately. log a warning and return true - LOG_WARN(fmt::format("Entry {} txn id is not set, commit_ts", *encode_, commit_ts_)); + LOG_WARN(fmt::format("Entry {} txn id is not set, commit_ts: {}", *encode_, commit_ts_)); return begin_ts >= commit_ts_; } if (begin_ts >= commit_ts_ || txn_id_ == txn->TxnID()) { diff --git a/src/storage/meta/entry/segment_index_entry.cpp b/src/storage/meta/entry/segment_index_entry.cpp index 0c2c7b66c3..5cc7919896 100644 --- a/src/storage/meta/entry/segment_index_entry.cpp +++ b/src/storage/meta/entry/segment_index_entry.cpp @@ -809,6 +809,7 @@ ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_ const auto &index_name = *table_index_entry_->GetIndexName(); if (!TrySetOptimizing()) { LOG_INFO(fmt::format("Index {} segment {} is optimizing, skip optimize.", index_name, segment_id_)); + return nullptr; } bool opt_success = false; DeferFn defer_fn([&] { diff --git a/src/storage/txn/txn.cpp b/src/storage/txn/txn.cpp index 9d3bd5ddf3..747fa0dc76 100644 --- a/src/storage/txn/txn.cpp +++ b/src/storage/txn/txn.cpp @@ -87,7 +87,7 @@ Status Txn::Import(TableEntry *table_entry, SharedPtr segment_entr // build WalCmd WalSegmentInfo segment_info(segment_entry.get()); - wal_entry_->cmds_.push_back(MakeShared(db_name, table_name, std::move(segment_info))); + AddWalCmd(MakeShared(db_name, table_name, std::move(segment_info))); TxnTableStore *table_store = this->GetTxnTableStore(table_entry); table_store->Import(std::move(segment_entry), this); @@ -102,7 +102,7 @@ Status Txn::Append(TableEntry *table_entry, const SharedPtr &input_bl this->CheckTxn(db_name); TxnTableStore *table_store = this->GetTxnTableStore(table_entry); - wal_entry_->cmds_.push_back(MakeShared(db_name, table_name, input_block)); + AddWalCmd(MakeShared(db_name, table_name, input_block)); auto [err_msg, append_status] = table_store->Append(input_block); return append_status; } @@ -120,7 +120,7 @@ Status Txn::Delete(TableEntry *table_entry, const Vector &row_ids, bool c TxnTableStore *table_store = this->GetTxnTableStore(table_entry); - wal_entry_->cmds_.push_back(MakeShared(db_name, table_name, row_ids)); + AddWalCmd(MakeShared(db_name, table_name, row_ids)); auto [err_msg, delete_status] = table_store->Delete(row_ids); return delete_status; } @@ -141,7 +141,7 @@ Status Txn::OptIndex(TableIndexEntry *table_index_entry, VectorGetTableName(); table_index_entry->OptIndex(txn_table_store, init_params, false /*replay*/); - wal_entry_->cmds_.push_back(MakeShared(db_name_, table_name, index_name, std::move(init_params))); + AddWalCmd(MakeShared(db_name_, table_name, index_name, std::move(init_params))); return Status::OK(); } @@ -178,7 +178,7 @@ Status Txn::CreateDatabase(const String &db_name, ConflictType conflict_type) { } txn_store_.AddDBStore(db_entry); - wal_entry_->cmds_.push_back(MakeShared(std::move(db_name), db_entry->GetPathNameTail())); + AddWalCmd(MakeShared(std::move(db_name), db_entry->GetPathNameTail())); return Status::OK(); } @@ -192,7 +192,7 @@ Status Txn::DropDatabase(const String &db_name, ConflictType conflict_type) { } txn_store_.DropDBStore(dropped_db_entry.get()); - wal_entry_->cmds_.push_back(MakeShared(db_name)); + AddWalCmd(MakeShared(db_name)); return Status::OK(); } @@ -244,7 +244,7 @@ Status Txn::CreateTable(const String &db_name, const SharedPtr &table_ } txn_store_.AddTableStore(table_entry); - wal_entry_->cmds_.push_back(MakeShared(std::move(db_name), table_entry->GetPathNameTail(), table_def)); + AddWalCmd(MakeShared(std::move(db_name), table_entry->GetPathNameTail(), table_def)); LOG_TRACE("Txn::CreateTable created table entry is inserted."); return Status::OK(); @@ -270,7 +270,7 @@ Status Txn::AddColumns(TableEntry *table_entry, const Vectorcmds_.push_back(MakeShared(*table_entry->GetDBName(), *table_entry->GetTableName(), column_defs)); + AddWalCmd(MakeShared(*table_entry->GetDBName(), *table_entry->GetTableName(), column_defs)); return Status::OK(); } @@ -289,7 +289,7 @@ Status Txn::DropColumns(TableEntry *table_entry, const Vector &column_na return drop_status; } - wal_entry_->cmds_.push_back(MakeShared(*table_entry->GetDBName(), *table_entry->GetTableName(), column_names)); + AddWalCmd(MakeShared(*table_entry->GetDBName(), *table_entry->GetTableName(), column_names)); return Status::OK(); } @@ -306,7 +306,7 @@ Status Txn::DropTableCollectionByName(const String &db_name, const String &table } txn_store_.DropTableStore(table_entry.get()); - wal_entry_->cmds_.push_back(MakeShared(db_name, table_name)); + AddWalCmd(MakeShared(db_name, table_name)); LOG_TRACE("Txn::DropTableCollectionByName dropped table entry is inserted."); return Status::OK(); @@ -324,7 +324,7 @@ Tuple Txn::CreateIndexDef(TableEntry *table_entry, co txn_table_store->AddIndexStore(table_index_entry); String index_dir_tail = table_index_entry->GetPathNameTail(); - wal_entry_->cmds_.push_back( + AddWalCmd( MakeShared(*table_entry->GetDBName(), *table_entry->GetTableName(), std::move(index_dir_tail), index_base)); return {table_index_entry, index_status}; } @@ -412,7 +412,7 @@ Status Txn::DropIndexByName(const String &db_name, const String &table_name, con auto *txn_table_store = this->GetTxnTableStore(table_entry); txn_table_store->DropIndexStore(table_index_entry.get()); - wal_entry_->cmds_.push_back(MakeShared(db_name, table_name, index_name)); + AddWalCmd(MakeShared(db_name, table_name, index_name)); return index_status; } @@ -583,12 +583,13 @@ void Txn::Rollback() { } void Txn::AddWalCmd(const SharedPtr &cmd) { - std::lock_guard guard(txn_store_.mtx_); + // std::lock_guard guard(txn_store_.mtx_); auto state = txn_context_.GetTxnState(); if (state != TxnState::kStarted) { auto begin_ts = BeginTS(); UnrecoverableError(fmt::format("Should add wal cmd in started state, begin_ts: {}", begin_ts)); } + // LOG_TRACE(fmt::format("Add wal cmd {} to txn {}", cmd->ToString(), txn_id_)); wal_entry_->cmds_.push_back(cmd); } @@ -600,7 +601,7 @@ bool Txn::DeltaCheckpoint(TxnTimeStamp last_ckp_ts, TxnTimeStamp &max_commit_ts) if (!catalog_->SaveDeltaCatalog(last_ckp_ts, max_commit_ts, delta_path, delta_name)) { return false; } - wal_entry_->cmds_.push_back(MakeShared(max_commit_ts, false, delta_path, delta_name)); + AddWalCmd(MakeShared(max_commit_ts, false, delta_path, delta_name)); return true; } @@ -610,7 +611,7 @@ void Txn::FullCheckpoint(const TxnTimeStamp max_commit_ts) { String full_path, full_name; catalog_->SaveFullCatalog(max_commit_ts, full_path, full_name); - wal_entry_->cmds_.push_back(MakeShared(max_commit_ts, true, full_path, full_name)); + AddWalCmd(MakeShared(max_commit_ts, true, full_path, full_name)); } void Txn::AddWriteTxnNum(TableEntry *table_entry) { diff --git a/src/storage/txn/txn.cppm b/src/storage/txn/txn.cppm index 5e57cd3d9d..8c3292dfe2 100644 --- a/src/storage/txn/txn.cppm +++ b/src/storage/txn/txn.cppm @@ -64,7 +64,7 @@ class BaseTableRef; enum class CompactStatementType; struct SegmentIndexEntry; -export class Txn { +export class Txn : public EnableSharedFromThis { public: // For new txn explicit Txn(TxnManager *txn_manager, diff --git a/src/storage/txn/txn_manager.cpp b/src/storage/txn/txn_manager.cpp index 5316d2ab25..775fbced20 100644 --- a/src/storage/txn/txn_manager.cpp +++ b/src/storage/txn/txn_manager.cpp @@ -32,6 +32,7 @@ import catalog_delta_entry; import default_values; import wal_manager; import bg_task; +import defer_op; namespace infinity { @@ -129,27 +130,38 @@ TxnTimeStamp TxnManager::GetCommitTimeStampW(Txn *txn) { bool TxnManager::CheckConflict(Txn *txn) { TxnTimeStamp commit_ts = txn->CommitTS(); - Vector candidate_txns; + Vector> candidate_txns; + TxnTimeStamp min_checking_ts = UNCOMMIT_TS; { std::lock_guard guard(locker_); // LOG_INFO(fmt::format("Txn {}(commit_ts:{}) check conflict", txn->TxnID(), txn->CommitTS())); - for (auto *finishing_txn : finishing_txns_) { + for (Txn *finishing_txn : finishing_txns_) { TxnTimeStamp finishing_commit_ts = finishing_txn->CommitTS(); if (commit_ts > finishing_commit_ts) { - candidate_txns.push_back(finishing_txn); + candidate_txns.push_back(finishing_txn->shared_from_this()); + min_checking_ts = std::min(min_checking_ts, finishing_txn->BeginTS()); } } + if (min_checking_ts != UNCOMMIT_TS) { + checking_ts_.insert(min_checking_ts); + } } + DeferFn defer_fn([&] { + if (min_checking_ts != UNCOMMIT_TS) { + std::lock_guard guard(locker_); + checking_ts_.erase(min_checking_ts); + } + }); if (txn->CheckConflict(catalog_)) { return true; } - for (auto *candidate_txn : candidate_txns) { + for (SharedPtr &candidate_txn : candidate_txns) { // LOG_INFO(fmt::format("Txn {}(commit_ts: {}) check conflict with txn {}(commit_ts: {})", // txn->TxnID(), // txn->CommitTS(), // candidate_txn->TxnID(), // candidate_txn->CommitTS())); - if (txn->CheckConflict(candidate_txn)) { + if (txn->CheckConflict(candidate_txn.get())) { return true; } } @@ -292,12 +304,10 @@ TxnTimeStamp TxnManager::GetCleanupScanTS() { } TxnTimeStamp checkpointed_ts = wal_mgr_->GetCheckpointedTS(); TxnTimeStamp res = std::min(first_uncommitted_begin_ts, checkpointed_ts); - for (auto *txn : finished_txns_) { - res = std::min(res, txn->CommitTS()); - } - for (auto *txn : finishing_txns_) { - res = std::min(res, txn->BeginTS()); + if (!checking_ts_.empty()) { + res = std::min(res, *checking_ts_.begin()); } + LOG_INFO(fmt::format("Cleanup scan ts: {}, checkpoint ts: {}", res, checkpointed_ts)); return res; } @@ -318,10 +328,8 @@ void TxnManager::FinishTxn(Txn *txn) { auto state = txn->GetTxnState(); if (state == TxnState::kCommitting) { txn->SetTxnCommitted(); - finished_txns_.emplace_back(txn); } else if (state == TxnState::kRollbacking) { txn->SetTxnRollbacked(); - finished_txns_.emplace_back(txn); } else { String error_message = fmt::format("Invalid transaction status: {}", ToString(state)); UnrecoverableError(error_message); @@ -330,25 +338,11 @@ void TxnManager::FinishTxn(Txn *txn) { if (remove_n == 0) { UnrecoverableError("Txn not found in finishing_txns_"); } - TxnTimeStamp max_commit_ts = 0; - for (auto *finishing_txn : finishing_txns_) { - max_commit_ts = std::max(max_commit_ts, finishing_txn->CommitTS()); - } - auto iter = finished_txns_.begin(); - while (iter != finished_txns_.end()) { - auto *finished_txn = *iter; - if (finished_txn->CommitTS() < max_commit_ts) { - ++iter; - continue; - } - auto finished_txn_id = finished_txn->TxnID(); - // LOG_INFO(fmt::format("Txn: {} is finished. committed ts: {}", finished_txn_id, finished_txn->CommittedTS())); - iter = finished_txns_.erase(iter); - SizeT remove_n = txn_map_.erase(finished_txn_id); - if (remove_n == 0) { - String error_message = fmt::format("Txn: {} not found in txn map", finished_txn_id); - UnrecoverableError(error_message); - } + TransactionID txn_id = txn->TxnID(); + remove_n = txn_map_.erase(txn_id); + if (remove_n == 0) { + String error_message = fmt::format("Txn: {} not found in txn map", txn_id); + UnrecoverableError(error_message); } } diff --git a/src/storage/txn/txn_manager.cppm b/src/storage/txn/txn_manager.cppm index 4a3456956d..2ab7235ba1 100644 --- a/src/storage/txn/txn_manager.cppm +++ b/src/storage/txn/txn_manager.cppm @@ -124,7 +124,7 @@ private: Deque> beginned_txns_; // sorted by begin ts HashSet finishing_txns_; // the txns in committing stage, can use flat_map - List finished_txns_; // the txns that committed_ts + Set checking_ts_{}; // the begin ts of txn that is used to check conflict Map wait_conflict_ck_{}; // sorted by commit ts diff --git a/src/storage/txn/txn_store.cpp b/src/storage/txn/txn_store.cpp index 31d352d615..5a776496ed 100644 --- a/src/storage/txn/txn_store.cpp +++ b/src/storage/txn/txn_store.cpp @@ -116,9 +116,12 @@ void TxnIndexStore::Commit(TransactionID txn_id, TxnTimeStamp commit_ts) { } } -void TxnIndexStore::Rollback() { +void TxnIndexStore::Rollback(TxnTimeStamp abort_ts) { for (auto [segment_index_entry, new_chunk, old_chunks] : optimize_data_) { segment_index_entry->ResetOptimizing(); + for (ChunkIndexEntry *old_chunk : old_chunks) { + old_chunk->DeprecateChunk(abort_ts); + } } } @@ -303,7 +306,7 @@ void TxnTableStore::Rollback(TransactionID txn_id, TxnTimeStamp abort_ts) { Catalog::RemoveIndexEntry(table_index_entry, txn_id); // fix me } for (const auto &[index_name, txn_index_store] : txn_indexes_store_) { - txn_index_store->Rollback(); + txn_index_store->Rollback(abort_ts); } } @@ -580,12 +583,9 @@ void TxnStore::MaintainCompactionAlg() const { } bool TxnStore::CheckConflict(Catalog *catalog) { - if (txn_->db_name().empty() && !txn_tables_store_.empty()) { - const String &db_name = *txn_tables_store_.begin()->second->GetTableEntry()->GetDBName(); - txn_->SetDBName(db_name); - } - const String &db_name = txn_->db_name(); for (const auto &[table_name, table_store] : txn_tables_store_) { + const String &db_name = *table_store->GetTableEntry()->GetDBName(); + txn_->SetDBName(db_name); auto [table_entry1, status] = catalog->GetTableByName(db_name, table_name, txn_->TxnID(), txn_->CommitTS()); if (!status.ok() || table_entry1 != table_store->GetTableEntry()) { return true; diff --git a/src/storage/txn/txn_store.cppm b/src/storage/txn/txn_store.cppm index 2ad3d8f81e..4cdd2440cd 100644 --- a/src/storage/txn/txn_store.cppm +++ b/src/storage/txn/txn_store.cppm @@ -69,7 +69,7 @@ public: void Commit(TransactionID txn_id, TxnTimeStamp commit_ts); - void Rollback(); + void Rollback(TxnTimeStamp abort_ts); public: TableIndexEntry *const table_index_entry_{}; diff --git a/test/data/config/restart_test/test_memidx/1.toml b/test/data/config/restart_test/test_memidx/1.toml index f585861273..a9b967006d 100644 --- a/test/data/config/restart_test/test_memidx/1.toml +++ b/test/data/config/restart_test/test_memidx/1.toml @@ -8,6 +8,7 @@ log_to_stdout = true log_level = "trace" [storage] +persistence_dir = "" optimize_interval = "0s" cleanup_interval = "0s" compact_interval = "0s" diff --git a/test/data/config/restart_test/test_memidx/2.toml b/test/data/config/restart_test/test_memidx/2.toml index a3b80fde6b..c2d4a5f280 100644 --- a/test/data/config/restart_test/test_memidx/2.toml +++ b/test/data/config/restart_test/test_memidx/2.toml @@ -8,6 +8,7 @@ log_to_stdout = true log_level = "trace" [storage] +persistence_dir = "" optimize_interval = "0s" cleanup_interval = "3s" compact_interval = "0s" diff --git a/test/data/config/restart_test/test_memidx/3.toml b/test/data/config/restart_test/test_memidx/3.toml index 25c768e90d..c24c261425 100644 --- a/test/data/config/restart_test/test_memidx/3.toml +++ b/test/data/config/restart_test/test_memidx/3.toml @@ -8,6 +8,7 @@ log_to_stdout = true log_level = "trace" [storage] +persistence_dir = "" optimize_interval = "1s" cleanup_interval = "0s" compact_interval = "0s"