Skip to content

Commit

Permalink
Revert "Fix bugs (#1949)" (#1960)
Browse files Browse the repository at this point in the history
This reverts commit 1989ed0.

### What problem does this PR solve?

#1949 introduce issue: 1 and 2 of #1958

Issue link:#1958

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
  • Loading branch information
JinHai-CN authored Oct 2, 2024
1 parent 51fdbfa commit a6c444e
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 144 deletions.
131 changes: 24 additions & 107 deletions python/restart_test/test_memidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
from infinity_runner import InfinityRunner, infinity_runner_decorator_factory
from infinity import index
import time
import pathlib
from infinity.common import ConflictType


class TestMemIdx:
Expand All @@ -16,7 +14,6 @@ def test_mem_hnsw(self, infinity_runner: InfinityRunner):
infinity_runner.clear()

decorator1 = infinity_runner_decorator_factory(config1, uri, infinity_runner)

@decorator1
def part1(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
Expand Down Expand Up @@ -52,7 +49,6 @@ def part1(infinity_obj):
# config1 can held 6 rows of hnsw mem index before dump
# 1. recover by dumpindex wal & memindex recovery
decorator2 = infinity_runner_decorator_factory(config2, uri, infinity_runner)

@decorator2
def part2(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
Expand All @@ -78,7 +74,6 @@ def part2(infinity_obj):

# 2. recover by delta ckp & dumpindex wal & memindex recovery
decorator3 = infinity_runner_decorator_factory(config3, uri, infinity_runner)

@decorator3
def part3(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
Expand All @@ -104,106 +99,28 @@ def check():

part3()

# create table test_memidx1(c1 int, c2 embedding(float, 4));
# create index idx1 on test_memidx1(c2) using hnsw with(m=16, ef_construction=200, metric=l2,block_size=1);
# insert into test_memidx1 values(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]);
# insert into test_memidx1 values(4,[0.2,0.1,0.3,0.4]);
# # wait 5s
# insert into test_memidx1 values(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]);

# select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6);
# # result: 4, 4, 4, 4, 4, 2
# select count(*) from test_memidx1;
# # result: 10
# insert into test_memidx1 values(6,[0.3,0.2,0.1,0.4]),(6,[0.3,0.2,0.1,0.4]);
# # wait 5s
# insert into test_memidx1 values(8,[0.4,0.3,0.2,0.1]);

# select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6);
# # result: 8, 6, 6, 4, 4, 4
# select count(*) from test_memidx1;
# # result: 13
# # wait 3s
# select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6);
# # result: 8, 6, 6, 4, 4, 4
# select count(*) from test_memidx1;
# # result: 13

def test_optimize_from_different_database(self, infinity_runner: InfinityRunner):
config1 = "test/data/config/restart_test/test_memidx/1.toml"
config2 = "test/data/config/restart_test/test_memidx/3.toml"
uri = common_values.TEST_LOCAL_HOST
decorator1 = infinity_runner_decorator_factory(config1, uri, infinity_runner)
decorator2 = infinity_runner_decorator_factory(config2, uri, infinity_runner)

data_dir = "/var/infinity/data"
idx1_name = "index1"
idx2_name = "index2"

@decorator1
def part1(infinity_obj):
params = {
"M": "16",
"ef_construction": "20",
"metric": "l2",
"block_size": "1",
}

infinity_obj.drop_database("db1", conflict_type=ConflictType.Ignore)
db_obj1 = infinity_obj.create_database("db1")
table_obj1 = db_obj1.create_table(
"test_memidx1",
{"c1": {"type": "int"}, "c2": {"type": "vector,4,float"}},
)
table_obj1.create_index(
idx1_name,
index.IndexInfo("c2", index.IndexType.Hnsw, params),
)
table_obj1.insert(
[{"c1": 2, "c2": [0.1, 0.2, 0.3, -0.2]} for i in range(6)]
)
table_obj1.insert([{"c1": 4, "c2": [0.2, 0.1, 0.3, 0.4]} for i in range(6)])

infinity_obj.drop_database("db2", conflict_type=ConflictType.Ignore)
db_obj2 = infinity_obj.create_database("db2")
table_obj2 = db_obj2.create_table(
"test_memidx2",
{"c1": {"type": "int"}, "c2": {"type": "vector,4,float"}},
)
table_obj2.create_index(
idx2_name,
index.IndexInfo("c2", index.IndexType.Hnsw, params),
)
table_obj2.insert(
[{"c1": 2, "c2": [0.1, 0.2, 0.3, -0.2]} for i in range(6)]
)
table_obj2.insert([{"c1": 4, "c2": [0.2, 0.1, 0.3, 0.4]} for i in range(6)])

part1()

@decorator2
def part2(infinity_obj):
# wait for optimize
time.sleep(3)

idx1_files = list(pathlib.Path(data_dir).rglob(f"*{idx1_name}*"))
idx2_files = list(pathlib.Path(data_dir).rglob(f"*{idx2_name}*"))
assert len(idx1_files) == 1
assert len(idx2_files) == 1

idx1_dir = idx1_files[0]
idx1_files_in_dir = list(idx1_dir.glob("*"))
assert len(idx1_files_in_dir) == 3

idx2_dir = idx2_files[0]
idx2_files_in_dir = list(idx2_dir.glob("*"))
assert len(idx2_files_in_dir) == 3

infinity_obj.cleanup()
idx1_files_in_dir = list(idx1_dir.glob("*"))
assert len(idx1_files_in_dir) == 1

idx2_files_in_dir = list(idx2_dir.glob("*"))
assert len(idx2_files_in_dir) == 1

part2()
# create table test_memidx1(c1 int, c2 embedding(float, 4));
# create index idx1 on test_memidx1(c2) using hnsw with(m=16, ef_construction=200, metric=l2,block_size=1);
# insert into test_memidx1 values(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]);
# insert into test_memidx1 values(4,[0.2,0.1,0.3,0.4]);
# # wait 5s
# insert into test_memidx1 values(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]);

# select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6);
# # result: 4, 4, 4, 4, 4, 2
# select count(*) from test_memidx1;
# # result: 10
# insert into test_memidx1 values(6,[0.3,0.2,0.1,0.4]),(6,[0.3,0.2,0.1,0.4]);
# # wait 5s
# insert into test_memidx1 values(8,[0.4,0.3,0.2,0.1]);

# select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6);
# # result: 8, 6, 6, 4, 4, 4
# select count(*) from test_memidx1;
# # result: 13
# # wait 3s
# select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6);
# # result: 8, 6, 6, 4, 4, 4
# select count(*) from test_memidx1;
# # result: 13
2 changes: 1 addition & 1 deletion src/storage/meta/entry/base_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ bool BaseEntry::CheckVisible(Txn *txn) const {
TxnTimeStamp begin_ts = txn->BeginTS();
if (txn_id_ == 0) {
// could not check if the entry is visible accurately. log a warning and return true
LOG_WARN(fmt::format("Entry {} txn id is not set, commit_ts: {}", *encode_, commit_ts_));
LOG_WARN(fmt::format("Entry {} txn id is not set, commit_ts", *encode_, commit_ts_));
return begin_ts >= commit_ts_;
}
if (begin_ts >= commit_ts_ || txn_id_ == txn->TxnID()) {
Expand Down
1 change: 0 additions & 1 deletion src/storage/meta/entry/segment_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,6 @@ ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_
const auto &index_name = *table_index_entry_->GetIndexName();
if (!TrySetOptimizing()) {
LOG_INFO(fmt::format("Index {} segment {} is optimizing, skip optimize.", index_name, segment_id_));
return nullptr;
}
bool opt_success = false;
DeferFn defer_fn([&] {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/txn/txn.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BaseTableRef;
enum class CompactStatementType;
struct SegmentIndexEntry;

export class Txn : public EnableSharedFromThis<Txn> {
export class Txn {
public:
// For new txn
explicit Txn(TxnManager *txn_manager,
Expand Down
53 changes: 31 additions & 22 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,39 +129,30 @@ TxnTimeStamp TxnManager::GetCommitTimeStampW(Txn *txn) {

bool TxnManager::CheckConflict(Txn *txn) {
TxnTimeStamp commit_ts = txn->CommitTS();
Vector<SharedPtr<Txn>> candidate_txns;
TxnTimeStamp min_checking_ts = UNCOMMIT_TS;
Vector<Txn *> candidate_txns;
{
std::lock_guard guard(locker_);
// LOG_INFO(fmt::format("Txn {}(commit_ts:{}) check conflict", txn->TxnID(), txn->CommitTS()));
for (Txn *finishing_txn : finishing_txns_) {
for (auto *finishing_txn : finishing_txns_) {
TxnTimeStamp finishing_commit_ts = finishing_txn->CommitTS();
if (commit_ts > finishing_commit_ts) {
candidate_txns.push_back(finishing_txn->shared_from_this());
min_checking_ts = std::min(min_checking_ts, finishing_txn->BeginTS());
candidate_txns.push_back(finishing_txn);
}
}
if (min_checking_ts != UNCOMMIT_TS) {
checking_ts_.insert(min_checking_ts);
}
}
if (txn->CheckConflict(catalog_)) {
return true;
}
for (SharedPtr<Txn> &candidate_txn : candidate_txns) {
for (auto *candidate_txn : candidate_txns) {
// LOG_INFO(fmt::format("Txn {}(commit_ts: {}) check conflict with txn {}(commit_ts: {})",
// txn->TxnID(),
// txn->CommitTS(),
// candidate_txn->TxnID(),
// candidate_txn->CommitTS()));
if (txn->CheckConflict(candidate_txn.get())) {
if (txn->CheckConflict(candidate_txn)) {
return true;
}
}
if (min_checking_ts != UNCOMMIT_TS) {
std::lock_guard guard(locker_);
checking_ts_.erase(min_checking_ts);
}
return false;
}

Expand Down Expand Up @@ -301,10 +292,12 @@ TxnTimeStamp TxnManager::GetCleanupScanTS() {
}
TxnTimeStamp checkpointed_ts = wal_mgr_->GetCheckpointedTS();
TxnTimeStamp res = std::min(first_uncommitted_begin_ts, checkpointed_ts);
if (!checking_ts_.empty()) {
res = std::min(res, *checking_ts_.begin());
for (auto *txn : finished_txns_) {
res = std::min(res, txn->CommitTS());
}
for (auto *txn : finishing_txns_) {
res = std::min(res, txn->BeginTS());
}

LOG_INFO(fmt::format("Cleanup scan ts: {}, checkpoint ts: {}", res, checkpointed_ts));
return res;
}
Expand All @@ -325,8 +318,10 @@ void TxnManager::FinishTxn(Txn *txn) {
auto state = txn->GetTxnState();
if (state == TxnState::kCommitting) {
txn->SetTxnCommitted();
finished_txns_.emplace_back(txn);
} else if (state == TxnState::kRollbacking) {
txn->SetTxnRollbacked();
finished_txns_.emplace_back(txn);
} else {
String error_message = fmt::format("Invalid transaction status: {}", ToString(state));
UnrecoverableError(error_message);
Expand All @@ -335,11 +330,25 @@ void TxnManager::FinishTxn(Txn *txn) {
if (remove_n == 0) {
UnrecoverableError("Txn not found in finishing_txns_");
}
TransactionID txn_id = txn->TxnID();
remove_n = txn_map_.erase(txn_id);
if (remove_n == 0) {
String error_message = fmt::format("Txn: {} not found in txn map", txn_id);
UnrecoverableError(error_message);
TxnTimeStamp max_commit_ts = 0;
for (auto *finishing_txn : finishing_txns_) {
max_commit_ts = std::max(max_commit_ts, finishing_txn->CommitTS());
}
auto iter = finished_txns_.begin();
while (iter != finished_txns_.end()) {
auto *finished_txn = *iter;
if (finished_txn->CommitTS() < max_commit_ts) {
++iter;
continue;
}
auto finished_txn_id = finished_txn->TxnID();
// LOG_INFO(fmt::format("Txn: {} is finished. committed ts: {}", finished_txn_id, finished_txn->CommittedTS()));
iter = finished_txns_.erase(iter);
SizeT remove_n = txn_map_.erase(finished_txn_id);
if (remove_n == 0) {
String error_message = fmt::format("Txn: {} not found in txn map", finished_txn_id);
UnrecoverableError(error_message);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/txn/txn_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private:

Deque<WeakPtr<Txn>> beginned_txns_; // sorted by begin ts
HashSet<Txn *> finishing_txns_; // the txns in committing stage, can use flat_map
Set<TxnTimeStamp> checking_ts_{}; // the begin ts of txn that is used to check conflict
List<Txn *> finished_txns_; // the txns that committed_ts

Map<TxnTimeStamp, WalEntry *> wait_conflict_ck_{}; // sorted by commit ts

Expand Down
14 changes: 7 additions & 7 deletions src/storage/txn/txn_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,9 @@ void TxnIndexStore::Commit(TransactionID txn_id, TxnTimeStamp commit_ts) {
}
}

void TxnIndexStore::Rollback(TxnTimeStamp abort_ts) {
void TxnIndexStore::Rollback() {
for (auto [segment_index_entry, new_chunk, old_chunks] : optimize_data_) {
segment_index_entry->ResetOptimizing();
for (ChunkIndexEntry *old_chunk : old_chunks) {
old_chunk->DeprecateChunk(abort_ts);
}
}
}

Expand Down Expand Up @@ -306,7 +303,7 @@ void TxnTableStore::Rollback(TransactionID txn_id, TxnTimeStamp abort_ts) {
Catalog::RemoveIndexEntry(table_index_entry, txn_id); // fix me
}
for (const auto &[index_name, txn_index_store] : txn_indexes_store_) {
txn_index_store->Rollback(abort_ts);
txn_index_store->Rollback();
}
}

Expand Down Expand Up @@ -583,9 +580,12 @@ void TxnStore::MaintainCompactionAlg() const {
}

bool TxnStore::CheckConflict(Catalog *catalog) {
for (const auto &[table_name, table_store] : txn_tables_store_) {
const String &db_name = *table_store->GetTableEntry()->GetDBName();
if (txn_->db_name().empty() && !txn_tables_store_.empty()) {
const String &db_name = *txn_tables_store_.begin()->second->GetTableEntry()->GetDBName();
txn_->SetDBName(db_name);
}
const String &db_name = txn_->db_name();
for (const auto &[table_name, table_store] : txn_tables_store_) {
auto [table_entry1, status] = catalog->GetTableByName(db_name, table_name, txn_->TxnID(), txn_->CommitTS());
if (!status.ok() || table_entry1 != table_store->GetTableEntry()) {
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/txn/txn_store.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public:

void Commit(TransactionID txn_id, TxnTimeStamp commit_ts);

void Rollback(TxnTimeStamp abort_ts);
void Rollback();

public:
TableIndexEntry *const table_index_entry_{};
Expand Down
1 change: 0 additions & 1 deletion test/data/config/restart_test/test_memidx/1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ log_to_stdout = true
log_level = "trace"

[storage]
persistence_dir = ""
optimize_interval = "0s"
cleanup_interval = "0s"
compact_interval = "0s"
Expand Down
1 change: 0 additions & 1 deletion test/data/config/restart_test/test_memidx/2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ log_to_stdout = true
log_level = "trace"

[storage]
persistence_dir = ""
optimize_interval = "0s"
cleanup_interval = "3s"
compact_interval = "0s"
Expand Down
1 change: 0 additions & 1 deletion test/data/config/restart_test/test_memidx/3.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ log_to_stdout = true
log_level = "trace"

[storage]
persistence_dir = ""
optimize_interval = "1s"
cleanup_interval = "0s"
compact_interval = "0s"
Expand Down

0 comments on commit a6c444e

Please sign in to comment.