Skip to content

Commit

Permalink
Fix pr #1949 (#1979)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Fix reverted pr #1949 (revert in #1960)

Related issue:
#1957 & #1958

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
  • Loading branch information
small-turtle-1 authored Oct 8, 2024
1 parent ae40b5a commit ead2460
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 81 deletions.
131 changes: 107 additions & 24 deletions python/restart_test/test_memidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from infinity_runner import InfinityRunner, infinity_runner_decorator_factory
from infinity import index
import time
import pathlib
from infinity.common import ConflictType


class TestMemIdx:
Expand All @@ -14,6 +16,7 @@ def test_mem_hnsw(self, infinity_runner: InfinityRunner):
infinity_runner.clear()

decorator1 = infinity_runner_decorator_factory(config1, uri, infinity_runner)

@decorator1
def part1(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
Expand Down Expand Up @@ -49,6 +52,7 @@ def part1(infinity_obj):
# config1 can held 6 rows of hnsw mem index before dump
# 1. recover by dumpindex wal & memindex recovery
decorator2 = infinity_runner_decorator_factory(config2, uri, infinity_runner)

@decorator2
def part2(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
Expand All @@ -74,6 +78,7 @@ def part2(infinity_obj):

# 2. recover by delta ckp & dumpindex wal & memindex recovery
decorator3 = infinity_runner_decorator_factory(config3, uri, infinity_runner)

@decorator3
def part3(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
Expand All @@ -99,28 +104,106 @@ def check():

part3()

# create table test_memidx1(c1 int, c2 embedding(float, 4));
# create index idx1 on test_memidx1(c2) using hnsw with(m=16, ef_construction=200, metric=l2,block_size=1);
# insert into test_memidx1 values(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]);
# insert into test_memidx1 values(4,[0.2,0.1,0.3,0.4]);
# # wait 5s
# insert into test_memidx1 values(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]);

# select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6);
# # result: 4, 4, 4, 4, 4, 2
# select count(*) from test_memidx1;
# # result: 10
# insert into test_memidx1 values(6,[0.3,0.2,0.1,0.4]),(6,[0.3,0.2,0.1,0.4]);
# # wait 5s
# insert into test_memidx1 values(8,[0.4,0.3,0.2,0.1]);

# select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6);
# # result: 8, 6, 6, 4, 4, 4
# select count(*) from test_memidx1;
# # result: 13
# # wait 3s
# select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6);
# # result: 8, 6, 6, 4, 4, 4
# select count(*) from test_memidx1;
# # result: 13

def test_optimize_from_different_database(self, infinity_runner: InfinityRunner):
config1 = "test/data/config/restart_test/test_memidx/1.toml"
config2 = "test/data/config/restart_test/test_memidx/3.toml"
uri = common_values.TEST_LOCAL_HOST
decorator1 = infinity_runner_decorator_factory(config1, uri, infinity_runner)
decorator2 = infinity_runner_decorator_factory(config2, uri, infinity_runner)

data_dir = "/var/infinity/data"
idx1_name = "index1"
idx2_name = "index2"

@decorator1
def part1(infinity_obj):
params = {
"M": "16",
"ef_construction": "20",
"metric": "l2",
"block_size": "1",
}

infinity_obj.drop_database("db1", conflict_type=ConflictType.Ignore)
db_obj1 = infinity_obj.create_database("db1")
table_obj1 = db_obj1.create_table(
"test_memidx1",
{"c1": {"type": "int"}, "c2": {"type": "vector,4,float"}},
)
table_obj1.create_index(
idx1_name,
index.IndexInfo("c2", index.IndexType.Hnsw, params),
)
table_obj1.insert(
[{"c1": 2, "c2": [0.1, 0.2, 0.3, -0.2]} for i in range(6)]
)
table_obj1.insert([{"c1": 4, "c2": [0.2, 0.1, 0.3, 0.4]} for i in range(6)])

infinity_obj.drop_database("db2", conflict_type=ConflictType.Ignore)
db_obj2 = infinity_obj.create_database("db2")
table_obj2 = db_obj2.create_table(
"test_memidx2",
{"c1": {"type": "int"}, "c2": {"type": "vector,4,float"}},
)
table_obj2.create_index(
idx2_name,
index.IndexInfo("c2", index.IndexType.Hnsw, params),
)
table_obj2.insert(
[{"c1": 2, "c2": [0.1, 0.2, 0.3, -0.2]} for i in range(6)]
)
table_obj2.insert([{"c1": 4, "c2": [0.2, 0.1, 0.3, 0.4]} for i in range(6)])

# create table test_memidx1(c1 int, c2 embedding(float, 4));
# create index idx1 on test_memidx1(c2) using hnsw with(m=16, ef_construction=200, metric=l2,block_size=1);
# insert into test_memidx1 values(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]),(2, [0.1,0.2,0.3,-0.2]);
# insert into test_memidx1 values(4,[0.2,0.1,0.3,0.4]);
# # wait 5s
# insert into test_memidx1 values(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]),(4,[0.2,0.1,0.3,0.4]);

# select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6);
# # result: 4, 4, 4, 4, 4, 2
# select count(*) from test_memidx1;
# # result: 10
# insert into test_memidx1 values(6,[0.3,0.2,0.1,0.4]),(6,[0.3,0.2,0.1,0.4]);
# # wait 5s
# insert into test_memidx1 values(8,[0.4,0.3,0.2,0.1]);

# select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6);
# # result: 8, 6, 6, 4, 4, 4
# select count(*) from test_memidx1;
# # result: 13
# # wait 3s
# select c1 from test_memidx1 search match vector(c2, [0.3,0.3,0.2,0.2],'float','l2',6);
# # result: 8, 6, 6, 4, 4, 4
# select count(*) from test_memidx1;
# # result: 13
part1()

@decorator2
def part2(infinity_obj):
# wait for optimize
time.sleep(3)

idx1_files = list(pathlib.Path(data_dir).rglob(f"*{idx1_name}*"))
idx2_files = list(pathlib.Path(data_dir).rglob(f"*{idx2_name}*"))
assert len(idx1_files) == 1
assert len(idx2_files) == 1

idx1_dir = idx1_files[0]
idx1_files_in_dir = list(idx1_dir.glob("*"))
assert len(idx1_files_in_dir) == 3

idx2_dir = idx2_files[0]
idx2_files_in_dir = list(idx2_dir.glob("*"))
assert len(idx2_files_in_dir) == 3

infinity_obj.cleanup()
idx1_files_in_dir = list(idx1_dir.glob("*"))
assert len(idx1_files_in_dir) == 1

idx2_files_in_dir = list(idx2_dir.glob("*"))
assert len(idx2_files_in_dir) == 1

part2()
2 changes: 1 addition & 1 deletion src/storage/meta/entry/base_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ bool BaseEntry::CheckVisible(Txn *txn) const {
TxnTimeStamp begin_ts = txn->BeginTS();
if (txn_id_ == 0) {
// could not check if the entry is visible accurately. log a warning and return true
LOG_WARN(fmt::format("Entry {} txn id is not set, commit_ts", *encode_, commit_ts_));
LOG_WARN(fmt::format("Entry {} txn id is not set, commit_ts: {}", *encode_, commit_ts_));
return begin_ts >= commit_ts_;
}
if (begin_ts >= commit_ts_ || txn_id_ == txn->TxnID()) {
Expand Down
1 change: 1 addition & 0 deletions src/storage/meta/entry/segment_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,7 @@ ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_
const auto &index_name = *table_index_entry_->GetIndexName();
if (!TrySetOptimizing()) {
LOG_INFO(fmt::format("Index {} segment {} is optimizing, skip optimize.", index_name, segment_id_));
return nullptr;
}
bool opt_success = false;
DeferFn defer_fn([&] {
Expand Down
31 changes: 16 additions & 15 deletions src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Status Txn::Import(TableEntry *table_entry, SharedPtr<SegmentEntry> segment_entr

// build WalCmd
WalSegmentInfo segment_info(segment_entry.get());
wal_entry_->cmds_.push_back(MakeShared<WalCmdImport>(db_name, table_name, std::move(segment_info)));
AddWalCmd(MakeShared<WalCmdImport>(db_name, table_name, std::move(segment_info)));

TxnTableStore *table_store = this->GetTxnTableStore(table_entry);
table_store->Import(std::move(segment_entry), this);
Expand All @@ -102,7 +102,7 @@ Status Txn::Append(TableEntry *table_entry, const SharedPtr<DataBlock> &input_bl
this->CheckTxn(db_name);
TxnTableStore *table_store = this->GetTxnTableStore(table_entry);

wal_entry_->cmds_.push_back(MakeShared<WalCmdAppend>(db_name, table_name, input_block));
AddWalCmd(MakeShared<WalCmdAppend>(db_name, table_name, input_block));
auto [err_msg, append_status] = table_store->Append(input_block);
return append_status;
}
Expand All @@ -120,7 +120,7 @@ Status Txn::Delete(TableEntry *table_entry, const Vector<RowID> &row_ids, bool c

TxnTableStore *table_store = this->GetTxnTableStore(table_entry);

wal_entry_->cmds_.push_back(MakeShared<WalCmdDelete>(db_name, table_name, row_ids));
AddWalCmd(MakeShared<WalCmdDelete>(db_name, table_name, row_ids));
auto [err_msg, delete_status] = table_store->Delete(row_ids);
return delete_status;
}
Expand All @@ -141,7 +141,7 @@ Status Txn::OptIndex(TableIndexEntry *table_index_entry, Vector<UniquePtr<InitPa
const String &table_name = *table_entry->GetTableName();
table_index_entry->OptIndex(txn_table_store, init_params, false /*replay*/);

wal_entry_->cmds_.push_back(MakeShared<WalCmdOptimize>(db_name_, table_name, index_name, std::move(init_params)));
AddWalCmd(MakeShared<WalCmdOptimize>(db_name_, table_name, index_name, std::move(init_params)));
return Status::OK();
}

Expand Down Expand Up @@ -178,7 +178,7 @@ Status Txn::CreateDatabase(const String &db_name, ConflictType conflict_type) {
}
txn_store_.AddDBStore(db_entry);

wal_entry_->cmds_.push_back(MakeShared<WalCmdCreateDatabase>(std::move(db_name), db_entry->GetPathNameTail()));
AddWalCmd(MakeShared<WalCmdCreateDatabase>(std::move(db_name), db_entry->GetPathNameTail()));
return Status::OK();
}

Expand All @@ -192,7 +192,7 @@ Status Txn::DropDatabase(const String &db_name, ConflictType conflict_type) {
}
txn_store_.DropDBStore(dropped_db_entry.get());

wal_entry_->cmds_.push_back(MakeShared<WalCmdDropDatabase>(db_name));
AddWalCmd(MakeShared<WalCmdDropDatabase>(db_name));
return Status::OK();
}

Expand Down Expand Up @@ -244,7 +244,7 @@ Status Txn::CreateTable(const String &db_name, const SharedPtr<TableDef> &table_
}

txn_store_.AddTableStore(table_entry);
wal_entry_->cmds_.push_back(MakeShared<WalCmdCreateTable>(std::move(db_name), table_entry->GetPathNameTail(), table_def));
AddWalCmd(MakeShared<WalCmdCreateTable>(std::move(db_name), table_entry->GetPathNameTail(), table_def));

LOG_TRACE("Txn::CreateTable created table entry is inserted.");
return Status::OK();
Expand All @@ -270,7 +270,7 @@ Status Txn::AddColumns(TableEntry *table_entry, const Vector<SharedPtr<ColumnDef
return add_status;
}

wal_entry_->cmds_.push_back(MakeShared<WalCmdAddColumns>(*table_entry->GetDBName(), *table_entry->GetTableName(), column_defs));
AddWalCmd(MakeShared<WalCmdAddColumns>(*table_entry->GetDBName(), *table_entry->GetTableName(), column_defs));
return Status::OK();
}

Expand All @@ -289,7 +289,7 @@ Status Txn::DropColumns(TableEntry *table_entry, const Vector<String> &column_na
return drop_status;
}

wal_entry_->cmds_.push_back(MakeShared<WalCmdDropColumns>(*table_entry->GetDBName(), *table_entry->GetTableName(), column_names));
AddWalCmd(MakeShared<WalCmdDropColumns>(*table_entry->GetDBName(), *table_entry->GetTableName(), column_names));
return Status::OK();
}

Expand All @@ -306,7 +306,7 @@ Status Txn::DropTableCollectionByName(const String &db_name, const String &table
}

txn_store_.DropTableStore(table_entry.get());
wal_entry_->cmds_.push_back(MakeShared<WalCmdDropTable>(db_name, table_name));
AddWalCmd(MakeShared<WalCmdDropTable>(db_name, table_name));

LOG_TRACE("Txn::DropTableCollectionByName dropped table entry is inserted.");
return Status::OK();
Expand All @@ -324,7 +324,7 @@ Tuple<TableIndexEntry *, Status> Txn::CreateIndexDef(TableEntry *table_entry, co
txn_table_store->AddIndexStore(table_index_entry);

String index_dir_tail = table_index_entry->GetPathNameTail();
wal_entry_->cmds_.push_back(
AddWalCmd(
MakeShared<WalCmdCreateIndex>(*table_entry->GetDBName(), *table_entry->GetTableName(), std::move(index_dir_tail), index_base));
return {table_index_entry, index_status};
}
Expand Down Expand Up @@ -412,7 +412,7 @@ Status Txn::DropIndexByName(const String &db_name, const String &table_name, con
auto *txn_table_store = this->GetTxnTableStore(table_entry);
txn_table_store->DropIndexStore(table_index_entry.get());

wal_entry_->cmds_.push_back(MakeShared<WalCmdDropIndex>(db_name, table_name, index_name));
AddWalCmd(MakeShared<WalCmdDropIndex>(db_name, table_name, index_name));
return index_status;
}

Expand Down Expand Up @@ -583,12 +583,13 @@ void Txn::Rollback() {
}

void Txn::AddWalCmd(const SharedPtr<WalCmd> &cmd) {
std::lock_guard guard(txn_store_.mtx_);
// std::lock_guard guard(txn_store_.mtx_);
auto state = txn_context_.GetTxnState();
if (state != TxnState::kStarted) {
auto begin_ts = BeginTS();
UnrecoverableError(fmt::format("Should add wal cmd in started state, begin_ts: {}", begin_ts));
}
// LOG_TRACE(fmt::format("Add wal cmd {} to txn {}", cmd->ToString(), txn_id_));
wal_entry_->cmds_.push_back(cmd);
}

Expand All @@ -600,7 +601,7 @@ bool Txn::DeltaCheckpoint(TxnTimeStamp last_ckp_ts, TxnTimeStamp &max_commit_ts)
if (!catalog_->SaveDeltaCatalog(last_ckp_ts, max_commit_ts, delta_path, delta_name)) {
return false;
}
wal_entry_->cmds_.push_back(MakeShared<WalCmdCheckpoint>(max_commit_ts, false, delta_path, delta_name));
AddWalCmd(MakeShared<WalCmdCheckpoint>(max_commit_ts, false, delta_path, delta_name));

return true;
}
Expand All @@ -610,7 +611,7 @@ void Txn::FullCheckpoint(const TxnTimeStamp max_commit_ts) {
String full_path, full_name;

catalog_->SaveFullCatalog(max_commit_ts, full_path, full_name);
wal_entry_->cmds_.push_back(MakeShared<WalCmdCheckpoint>(max_commit_ts, true, full_path, full_name));
AddWalCmd(MakeShared<WalCmdCheckpoint>(max_commit_ts, true, full_path, full_name));
}

void Txn::AddWriteTxnNum(TableEntry *table_entry) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/txn/txn.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BaseTableRef;
enum class CompactStatementType;
struct SegmentIndexEntry;

export class Txn {
export class Txn : public EnableSharedFromThis<Txn> {
public:
// For new txn
explicit Txn(TxnManager *txn_manager,
Expand Down
Loading

0 comments on commit ead2460

Please sign in to comment.