Skip to content

Commit

Permalink
Fix SegmentIndexEntry::TrySetOptimizing
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhichang committed Dec 17, 2024
1 parent 7da1aa4 commit 741a3e8
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 13 deletions.
25 changes: 25 additions & 0 deletions python/restart_test/test_memidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,3 +729,28 @@ def part2(infinity_obj):
assert data_dict["c1"] == [1, 2]

part2()

def test_optimize_empty_index(self, infinity_runner: InfinityRunner):
infinity_runner.clear()
config = "test/data/config/restart_test/test_memidx/6.toml"
uri = common_values.TEST_LOCAL_HOST

table_name = "t1"
decorator = infinity_runner_decorator_factory(config, uri, infinity_runner)

@decorator
def part1(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
db_obj.drop_table(table_name, ConflictType.Ignore)
table_obj = db_obj.create_table(table_name, {"c1": {"type": "int"}, "c2": {"type": "varchar"}})
table_obj.create_index("idx1", index.IndexInfo("c2", index.IndexType.FullText))

insert_n = 8192*3
for i in range(insert_n):
table_obj.insert([{"c1": i, "c2": ""}])

time.sleep(3)

db_obj.drop_table(table_name)

part1()
3 changes: 3 additions & 0 deletions src/common/memory/byte_slice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ ByteSlice *ByteSlice::NewSlice(u8 *data, SizeT data_size) {
}

void ByteSlice::DestroySlice(ByteSlice *slice) {
if (slice == nullptr) {
return;
}
u8 *mem = (u8 *)slice;
if (slice->owned_) {
delete[] mem;
Expand Down
23 changes: 13 additions & 10 deletions src/storage/meta/entry/segment_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,6 @@ void SegmentIndexEntry::CommitOptimize(ChunkIndexEntry *new_chunk, const Vector<
ss << old_chunk->chunk_id_ << ", ";
}
LOG_INFO(ss.str());

ResetOptimizing();
}

void SegmentIndexEntry::OptIndex(IndexBase *index_base,
Expand Down Expand Up @@ -866,7 +864,8 @@ void SegmentIndexEntry::ReplaceChunkIndexEntries(TxnTableStore *txn_table_store,

ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_table_store, SegmentEntry *segment_entry) {
const auto &index_name = *table_index_entry_->GetIndexName();
if (!TrySetOptimizing()) {
Txn *txn = txn_table_store->GetTxn();
if (!TrySetOptimizing(txn)) {
LOG_INFO(fmt::format("Index {} segment {} is optimizing, skip optimize.", index_name, segment_id_));
return nullptr;
}
Expand All @@ -878,7 +877,6 @@ ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_
}
});

Txn *txn = txn_table_store->GetTxn();
TxnTimeStamp begin_ts = txn->BeginTS();
const IndexBase *index_base = table_index_entry_->index_base();
SharedPtr<ColumnDef> column_def = table_index_entry_->column_def();
Expand Down Expand Up @@ -1225,15 +1223,20 @@ UniquePtr<SegmentIndexEntry> SegmentIndexEntry::Deserialize(const nlohmann::json
return segment_index_entry;
}

bool SegmentIndexEntry::TrySetOptimizing() {
bool SegmentIndexEntry::TrySetOptimizing(Txn *txn) {
bool expected = false;
return optimizing_.compare_exchange_strong(expected, true);
bool success = optimizing_.compare_exchange_strong(expected, true);
if (!success) {
return false;
}
TableEntry *table_entry = table_index_entry_->table_index_meta()->GetTableEntry();
TxnTableStore *txn_table_store = txn->txn_store()->GetTxnTableStore(table_entry);
TxnIndexStore *txn_index_store = txn_table_store->GetIndexStore(table_index_entry_);
txn_index_store->AddSegmentOptimizing(this);
return true;
}

void SegmentIndexEntry::ResetOptimizing() {
bool expected = true;
optimizing_.compare_exchange_strong(expected, false);
}
void SegmentIndexEntry::ResetOptimizing() { optimizing_.store(false); }

Pair<u64, u32> SegmentIndexEntry::GetFulltextColumnLenInfo() {
std::shared_lock lock(rw_locker_);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/segment_index_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private:
Atomic<TxnTimeStamp> deprecate_ts_ = UNCOMMIT_TS;

public:
bool TrySetOptimizing();
bool TrySetOptimizing(Txn *txn);

void ResetOptimizing();

Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ void TableEntry::OptimizeIndex(Txn *txn) {
const IndexFullText *index_fulltext = static_cast<const IndexFullText *>(index_base);
Map<SegmentID, SharedPtr<SegmentIndexEntry>> index_by_segment = table_index_entry->GetIndexBySegmentSnapshot(this, txn);
for (auto &[segment_id, segment_index_entry] : index_by_segment) {
if (!segment_index_entry->TrySetOptimizing()) {
if (!segment_index_entry->TrySetOptimizing(txn)) {
LOG_INFO(fmt::format("Index {} segment {} is optimizing, skip optimize.", index_name, segment_id));
continue;
}
Expand Down
22 changes: 21 additions & 1 deletion src/storage/txn/txn_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,30 @@ void TxnIndexStore::Commit(TransactionID txn_id, TxnTimeStamp commit_ts) {

void TxnIndexStore::Rollback(TxnTimeStamp abort_ts) {
for (auto [segment_index_entry, new_chunk, old_chunks] : optimize_data_) {
segment_index_entry->ResetOptimizing();
for (ChunkIndexEntry *old_chunk : old_chunks) {
old_chunk->DeprecateChunk(abort_ts);
}
}
}

void TxnIndexStore::AddSegmentOptimizing(SegmentIndexEntry *segment_index_entry) {
status_ = TxnStoreStatus::kOptimizing;
index_entry_map_.emplace(segment_index_entry->segment_id(), segment_index_entry);
}


bool TxnIndexStore::TryRevert() {
if (status_ == TxnStoreStatus::kNone) {
return false;
}
for (auto &[segment_id, segment_index_entry] : index_entry_map_) {
segment_index_entry->ResetOptimizing();
}
status_ = TxnStoreStatus::kNone;
return true;
}


///-----------------------------------------------------------------------------

TxnCompactStore::TxnCompactStore() : type_(CompactStatementType::kInvalid) {}
Expand Down Expand Up @@ -532,6 +549,9 @@ void TxnTableStore::TryRevert() {
added_txn_num_ = false;
table_entry_->DecWriteTxnNum();
}
for (const auto &[index_name, index_store] : txn_indexes_store_) {
index_store->TryRevert();
}
}

TxnStore::TxnStore(Txn *txn, Catalog *catalog) : txn_(txn), catalog_(catalog) {}
Expand Down
10 changes: 10 additions & 0 deletions src/storage/txn/txn_store.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,23 @@ public:

void Rollback(TxnTimeStamp abort_ts);

void AddSegmentOptimizing(SegmentIndexEntry *segment_index_entry);

bool TryRevert();

public:
TableIndexEntry *const table_index_entry_{};

HashMap<SegmentID, SegmentIndexEntry *> index_entry_map_{};
HashMap<String, ChunkIndexEntry *> chunk_index_entries_{};

Vector<Tuple<SegmentIndexEntry *, ChunkIndexEntry *, Vector<ChunkIndexEntry *>>> optimize_data_;

enum struct TxnStoreStatus {
kNone,
kOptimizing,
};
TxnStoreStatus status_{TxnStoreStatus::kNone};
};

export struct TxnCompactStore {
Expand Down

0 comments on commit 741a3e8

Please sign in to comment.