diff --git a/python/restart_test/test_memidx.py b/python/restart_test/test_memidx.py index 5452d09406..0aa8362650 100644 --- a/python/restart_test/test_memidx.py +++ b/python/restart_test/test_memidx.py @@ -729,3 +729,28 @@ def part2(infinity_obj): assert data_dict["c1"] == [1, 2] part2() + + def test_optimize_empty_index(self, infinity_runner: InfinityRunner): + infinity_runner.clear() + config = "test/data/config/restart_test/test_memidx/6.toml" + uri = common_values.TEST_LOCAL_HOST + + table_name = "t1" + decorator = infinity_runner_decorator_factory(config, uri, infinity_runner) + + @decorator + def part1(infinity_obj): + db_obj = infinity_obj.get_database("default_db") + db_obj.drop_table(table_name, ConflictType.Ignore) + table_obj = db_obj.create_table(table_name, {"c1": {"type": "int"}, "c2": {"type": "varchar"}}) + table_obj.create_index("idx1", index.IndexInfo("c2", index.IndexType.FullText)) + + insert_n = 8192*3 + for i in range(insert_n): + table_obj.insert([{"c1": i, "c2": ""}]) + + time.sleep(3) + + db_obj.drop_table(table_name) + + part1() diff --git a/src/common/memory/byte_slice.cpp b/src/common/memory/byte_slice.cpp index 2979d402bb..3dd218f740 100644 --- a/src/common/memory/byte_slice.cpp +++ b/src/common/memory/byte_slice.cpp @@ -28,6 +28,9 @@ ByteSlice *ByteSlice::NewSlice(u8 *data, SizeT data_size) { } void ByteSlice::DestroySlice(ByteSlice *slice) { + if (slice == nullptr) { + return; + } u8 *mem = (u8 *)slice; if (slice->owned_) { delete[] mem; diff --git a/src/storage/meta/entry/segment_index_entry.cpp b/src/storage/meta/entry/segment_index_entry.cpp index 0cbc347a2a..033506f829 100644 --- a/src/storage/meta/entry/segment_index_entry.cpp +++ b/src/storage/meta/entry/segment_index_entry.cpp @@ -697,8 +697,6 @@ void SegmentIndexEntry::CommitOptimize(ChunkIndexEntry *new_chunk, const Vector< ss << old_chunk->chunk_id_ << ", "; } LOG_INFO(ss.str()); - - ResetOptimizing(); } void SegmentIndexEntry::OptIndex(IndexBase *index_base, @@ -866,7 +864,8 @@ void SegmentIndexEntry::ReplaceChunkIndexEntries(TxnTableStore *txn_table_store, ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_table_store, SegmentEntry *segment_entry) { const auto &index_name = *table_index_entry_->GetIndexName(); - if (!TrySetOptimizing()) { + Txn *txn = txn_table_store->GetTxn(); + if (!TrySetOptimizing(txn)) { LOG_INFO(fmt::format("Index {} segment {} is optimizing, skip optimize.", index_name, segment_id_)); return nullptr; } @@ -878,7 +877,6 @@ ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_ } }); - Txn *txn = txn_table_store->GetTxn(); TxnTimeStamp begin_ts = txn->BeginTS(); const IndexBase *index_base = table_index_entry_->index_base(); SharedPtr column_def = table_index_entry_->column_def(); @@ -1225,15 +1223,20 @@ UniquePtr SegmentIndexEntry::Deserialize(const nlohmann::json return segment_index_entry; } -bool SegmentIndexEntry::TrySetOptimizing() { +bool SegmentIndexEntry::TrySetOptimizing(Txn *txn) { bool expected = false; - return optimizing_.compare_exchange_strong(expected, true); + bool success = optimizing_.compare_exchange_strong(expected, true); + if (!success) { + return false; + } + TableEntry *table_entry = table_index_entry_->table_index_meta()->GetTableEntry(); + TxnTableStore *txn_table_store = txn->txn_store()->GetTxnTableStore(table_entry); + TxnIndexStore *txn_index_store = txn_table_store->GetIndexStore(table_index_entry_); + txn_index_store->AddSegmentOptimizing(this); + return true; } -void SegmentIndexEntry::ResetOptimizing() { - bool expected = true; - optimizing_.compare_exchange_strong(expected, false); -} +void SegmentIndexEntry::ResetOptimizing() { optimizing_.store(false); } Pair SegmentIndexEntry::GetFulltextColumnLenInfo() { std::shared_lock lock(rw_locker_); diff --git a/src/storage/meta/entry/segment_index_entry.cppm b/src/storage/meta/entry/segment_index_entry.cppm index 7d3cd87894..88242b9270 100644 --- a/src/storage/meta/entry/segment_index_entry.cppm +++ b/src/storage/meta/entry/segment_index_entry.cppm @@ -277,7 +277,7 @@ private: Atomic deprecate_ts_ = UNCOMMIT_TS; public: - bool TrySetOptimizing(); + bool TrySetOptimizing(Txn *txn); void ResetOptimizing(); diff --git a/src/storage/meta/entry/table_entry.cpp b/src/storage/meta/entry/table_entry.cpp index 99edb7b601..c51ac523e7 100644 --- a/src/storage/meta/entry/table_entry.cpp +++ b/src/storage/meta/entry/table_entry.cpp @@ -914,7 +914,7 @@ void TableEntry::OptimizeIndex(Txn *txn) { const IndexFullText *index_fulltext = static_cast(index_base); Map> index_by_segment = table_index_entry->GetIndexBySegmentSnapshot(this, txn); for (auto &[segment_id, segment_index_entry] : index_by_segment) { - if (!segment_index_entry->TrySetOptimizing()) { + if (!segment_index_entry->TrySetOptimizing(txn)) { LOG_INFO(fmt::format("Index {} segment {} is optimizing, skip optimize.", index_name, segment_id)); continue; } diff --git a/src/storage/txn/txn_store.cpp b/src/storage/txn/txn_store.cpp index 4f921876c4..b76d8e7964 100644 --- a/src/storage/txn/txn_store.cpp +++ b/src/storage/txn/txn_store.cpp @@ -121,13 +121,30 @@ void TxnIndexStore::Commit(TransactionID txn_id, TxnTimeStamp commit_ts) { void TxnIndexStore::Rollback(TxnTimeStamp abort_ts) { for (auto [segment_index_entry, new_chunk, old_chunks] : optimize_data_) { - segment_index_entry->ResetOptimizing(); for (ChunkIndexEntry *old_chunk : old_chunks) { old_chunk->DeprecateChunk(abort_ts); } } } +void TxnIndexStore::AddSegmentOptimizing(SegmentIndexEntry *segment_index_entry) { + status_ = TxnStoreStatus::kOptimizing; + index_entry_map_.emplace(segment_index_entry->segment_id(), segment_index_entry); +} + + +bool TxnIndexStore::TryRevert() { + if (status_ == TxnStoreStatus::kNone) { + return false; + } + for (auto &[segment_id, segment_index_entry] : index_entry_map_) { + segment_index_entry->ResetOptimizing(); + } + status_ = TxnStoreStatus::kNone; + return true; +} + + ///----------------------------------------------------------------------------- TxnCompactStore::TxnCompactStore() : type_(CompactStatementType::kInvalid) {} @@ -532,6 +549,9 @@ void TxnTableStore::TryRevert() { added_txn_num_ = false; table_entry_->DecWriteTxnNum(); } + for (const auto &[index_name, index_store] : txn_indexes_store_) { + index_store->TryRevert(); + } } TxnStore::TxnStore(Txn *txn, Catalog *catalog) : txn_(txn), catalog_(catalog) {} diff --git a/src/storage/txn/txn_store.cppm b/src/storage/txn/txn_store.cppm index ff4efe2612..0376029557 100644 --- a/src/storage/txn/txn_store.cppm +++ b/src/storage/txn/txn_store.cppm @@ -71,6 +71,10 @@ public: void Rollback(TxnTimeStamp abort_ts); + void AddSegmentOptimizing(SegmentIndexEntry *segment_index_entry); + + bool TryRevert(); + public: TableIndexEntry *const table_index_entry_{}; @@ -78,6 +82,12 @@ public: HashMap chunk_index_entries_{}; Vector>> optimize_data_; + + enum struct TxnStoreStatus { + kNone, + kOptimizing, + }; + TxnStoreStatus status_{TxnStoreStatus::kNone}; }; export struct TxnCompactStore {