Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dump bmp index #2367

Merged
merged 15 commits into from
Dec 13, 2024
Merged
108 changes: 108 additions & 0 deletions python/restart_test/test_memidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import pathlib
from infinity.common import ConflictType, SparseVector
import pytest


class TestMemIdx:
Expand Down Expand Up @@ -339,6 +340,113 @@ def check(rows):

part3()

# @pytest.mark.skip(reason="bug")
def test_mem_bmp(self, infinity_runner: InfinityRunner):
config1 = "test/data/config/restart_test/test_memidx/1.toml"
config2 = "test/data/config/restart_test/test_memidx/2.toml"
config3 = "test/data/config/restart_test/test_memidx/3.toml"
uri = common_values.TEST_LOCAL_HOST
infinity_runner.clear()

test_data = [
{"c1" : 1, "c2" : SparseVector(indices=[0, 10, 20, 30, 40, 50, 60, 70, 80, 90], values=[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0])},
{"c1" : 2, "c2" : SparseVector(indices=[0, 20, 40, 60, 80], values=[2.0, 2.0, 2.0, 2.0, 2.0])},
{"c1" : 3, "c2" : SparseVector(indices=[0, 30, 60, 90], values=[3.0, 3.0, 3.0, 3.0])},
{"c1" : 4, "c2" : SparseVector(indices=[0, 40, 80], values=[4.0, 4.0, 4.0])},
{"c1" : 5, "c2" : SparseVector(indices=[0], values=[0.0])},
]
query_vector = SparseVector(indices=[0, 20, 80], values=[1.0, 2.0, 3.0])

decorator1 = infinity_runner_decorator_factory(config1, uri, infinity_runner)

@decorator1
def part1(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.create_table(
"test_mem_bmp",
{"c1": {"type": "int"}, "c2": {"type": "sparse,100,float,int"}},
)
res = table_obj.create_index(
"idx1",
index.IndexInfo(
"c2",
index.IndexType.BMP,
{"BLOCK_SIZE": "8", "COMPRESS_TYPE": "compress"},
),
)
assert res.error_code == infinity.ErrorCode.OK

# trigger dump
for i in range(7):
table_obj.insert(test_data)

part1()

# config1 can hold 51 rows of ivf mem index before dump
# 1. recover by dumpindex wal & memindex recovery
decorator2 = infinity_runner_decorator_factory(config2, uri, infinity_runner)

@decorator2
def part2(infinity_obj):
time.sleep(5)
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.get_table("test_mem_bmp")
data_dict, data_type_dict, _ = table_obj.output(["count(*)"]).to_result()
# print(data_dict)
assert data_dict["count(star)"] == [35]

data_dict, data_type_dict, _ = (
table_obj.output(["c1"])
.match_sparse("c2", query_vector, "ip", 8)
.to_result()
)
assert data_dict["c1"] == [4, 4, 4, 4, 4, 4, 4, 2]

data_dict, data_type_dict, _ = table_obj.output(["count(*)"]).to_result()
# print(data_dict)
assert data_dict["count(star)"] == [35]

for i in range(3):
table_obj.insert(test_data)
time.sleep(5)

data_dict, data_type_dict, _ = (
table_obj.output(["c1"])
.match_sparse("c2", query_vector, "ip", 11)
.to_result()
)
assert data_dict["c1"] == [4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 2]

part2()

# 2. recover by delta ckp & dumpindex wal & memindex recovery
decorator3 = infinity_runner_decorator_factory(config3, uri, infinity_runner)

@decorator3
def part3(infinity_obj):
time.sleep(5)
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.get_table("test_mem_bmp")

def check():
data_dict, data_type_dict, _ = (
table_obj.output(["c1"])
.match_sparse("c2", query_vector, "ip", 11)
.to_result()
)
assert data_dict["c1"] == [4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 2]

data_dict, data_type_dict, _ = table_obj.output(["count(*)"]).to_result()
assert data_dict["count(star)"] == [50]

check()
infinity_obj.optimize("default_db", "test_mem_bmp", optimize_opt=None)
check()

db_obj.drop_table("test_mem_bmp")

part3()

def test_optimize_from_different_database(self, infinity_runner: InfinityRunner):
infinity_runner.clear()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,12 +533,17 @@ void PhysicalMatchSparseScan::ExecuteInnerT(DistFunc *dist_func,
const auto [chunk_index_entries, memory_index_entry] = segment_index_entry->GetBMPIndexSnapshot();
for (SizeT query_id = 0; query_id < query_n; ++query_id) {
for (auto chunk_index_entry : chunk_index_entries) {
if (!chunk_index_entry->CheckVisible(txn)) {
continue;
}
BufferHandle buffer_handle = chunk_index_entry->GetIndex();
const auto *bmp_index = reinterpret_cast<const AbstractBMP *>(buffer_handle.GetData());
bmp_search(*bmp_index, query_id, false, filter);
LOG_TRACE(fmt::format("Search Match Sparse in chunk {}", chunk_index_entry->encode()));
}
if (memory_index_entry.get() != nullptr) {
bmp_search(memory_index_entry->get(), query_id, true, filter);
LOG_TRACE(fmt::format("Search Match Sparse in mem index of {}", segment_index_entry->encode()));
}
}
};
Expand Down
6 changes: 5 additions & 1 deletion src/storage/knn_index/knn_ivf/ivf_index_data_in_mem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ class IVFIndexInMemT final : public IVFIndexInMem {
}
}

~IVFIndexInMemT() { BaseMemIndex::DecreaseMemoryUsageBase(MemoryUsed()); }
~IVFIndexInMemT() {
if (own_ivf_index_storage_) {
DecreaseMemoryUsageBase(MemoryUsed());
}
}

MemIndexTracerInfo GetInfo() const override {
auto *table_index_entry = segment_index_entry_->table_index_entry();
Expand Down
45 changes: 39 additions & 6 deletions src/storage/knn_index/sparse/abstract_bmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,35 @@ import sparse_util;
import segment_iter;
import segment_entry;
import infinity_exception;
import third_party;
import logger;

namespace infinity {

BMPIndexInMem::BMPIndexInMem(RowID begin_row_id, const IndexBase *index_base, const ColumnDef *column_def)
: begin_row_id_(begin_row_id), bmp_(InitAbstractIndex(index_base, column_def)) {
MemIndexTracerInfo BMPIndexInMem::GetInfo() const {
auto *table_index_entry = segment_index_entry_->table_index_entry();
SharedPtr<String> index_name = table_index_entry->GetIndexName();
auto *table_entry = table_index_entry->table_index_meta()->GetTableEntry();
SharedPtr<String> table_name = table_entry->GetTableName();
SharedPtr<String> db_name = table_entry->GetDBName();

auto [mem_used, row_cnt] = std::visit(
[](auto &&index) -> Pair<SizeT, SizeT> {
using T = std::decay_t<decltype(index)>;
if constexpr (std::is_same_v<T, std::nullptr_t>) {
return {};
} else {
return {index->MemoryUsage(), index->DocNum()};
}
},
bmp_);
return MemIndexTracerInfo(index_name, table_name, db_name, mem_used, row_cnt);
}

TableIndexEntry *BMPIndexInMem::table_index_entry() const { return segment_index_entry_->table_index_entry(); }

BMPIndexInMem::BMPIndexInMem(RowID begin_row_id, const IndexBase *index_base, const ColumnDef *column_def, SegmentIndexEntry *segment_index_entry)
: begin_row_id_(begin_row_id), bmp_(InitAbstractIndex(index_base, column_def)), segment_index_entry_(segment_index_entry) {
const auto *index_bmp = static_cast<const IndexBMP *>(index_base);
const auto *sparse_info = static_cast<SparseInfo *>(column_def->type()->type_info().get());
SizeT term_num = sparse_info->Dimension();
Expand Down Expand Up @@ -68,14 +92,16 @@ BMPIndexInMem::~BMPIndexInMem() {
return;
}
std::visit(
[](auto &&index) {
[&](auto &&index) {
using T = std::decay_t<decltype(index)>;
if constexpr (std::is_same_v<T, std::nullptr_t>) {
return;
} else {
SizeT mem_used = index->MemoryUsage();
if (index != nullptr) {
delete index;
}
DecreaseMemoryUsageBase(mem_used);
}
},
bmp_);
Expand All @@ -94,6 +120,7 @@ SizeT BMPIndexInMem::GetRowCount() const {
bmp_);
}

// realtime insert, trace this
void BMPIndexInMem::AddDocs(SizeT block_offset, BlockColumnEntry *block_column_entry, BufferManager *buffer_mgr, SizeT row_offset, SizeT row_count) {
std::visit(
[&](auto &&index) {
Expand All @@ -103,9 +130,12 @@ void BMPIndexInMem::AddDocs(SizeT block_offset, BlockColumnEntry *block_column_e
} else {
using IndexT = std::decay_t<decltype(*index)>;
using SparseRefT = SparseVecRef<typename IndexT::DataT, typename IndexT::IdxT>;

SizeT mem_before = index->MemoryUsage();
MemIndexInserterIter<SparseRefT> iter(block_offset, block_column_entry, buffer_mgr, row_offset, row_count);
index->AddDocs(std::move(iter));
SizeT mem_after = index->MemoryUsage();
IncreaseMemoryUsageBase(mem_after - mem_before);
LOG_INFO(fmt::format("before : {} -> after : {}, add mem_used : {}", mem_before, mem_after, mem_after - mem_before));
}
},
bmp_);
Expand Down Expand Up @@ -133,7 +163,7 @@ void BMPIndexInMem::AddDocs(const SegmentEntry *segment_entry, BufferManager *bu
bmp_);
}

SharedPtr<ChunkIndexEntry> BMPIndexInMem::Dump(SegmentIndexEntry *segment_index_entry, BufferManager *buffer_mgr) const {
SharedPtr<ChunkIndexEntry> BMPIndexInMem::Dump(SegmentIndexEntry *segment_index_entry, BufferManager *buffer_mgr, SizeT *dump_size) {
if (!own_memory_) {
UnrecoverableError("BMPIndexInMem::Dump() called with own_memory_ = false.");
}
Expand All @@ -147,6 +177,9 @@ SharedPtr<ChunkIndexEntry> BMPIndexInMem::Dump(SegmentIndexEntry *segment_index_
} else {
row_count = index->DocNum();
index_size = index->GetSizeInBytes();
if (dump_size != nullptr) {
*dump_size = index->MemoryUsage();
}
}
},
bmp_);
Expand All @@ -160,4 +193,4 @@ SharedPtr<ChunkIndexEntry> BMPIndexInMem::Dump(SegmentIndexEntry *segment_index_
return new_chunk_index_entry;
}

} // namespace infinity
} // namespace infinity
16 changes: 12 additions & 4 deletions src/storage/knn_index/sparse/abstract_bmp.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import index_bmp;
import sparse_info;
import internal_types;
import buffer_handle;
import base_memindex;
import memindex_tracer;
import table_index_entry;

namespace infinity {

Expand All @@ -52,11 +55,15 @@ export using AbstractBMP = std::variant<BMPAlg<f32, i32, BMPCompressType::kCompr
BMPAlg<f64, i8, BMPCompressType::kRaw> *,
std::nullptr_t>;

export struct BMPIndexInMem {
export struct BMPIndexInMem final : public BaseMemIndex {
public:
BMPIndexInMem() : bmp_(nullptr) {}

BMPIndexInMem(RowID begin_row_id, const IndexBase *index_base, const ColumnDef *column_def);
BMPIndexInMem(RowID begin_row_id, const IndexBase *index_base, const ColumnDef *column_def, SegmentIndexEntry *segment_index_entry);

MemIndexTracerInfo GetInfo() const override;

TableIndexEntry *table_index_entry() const override;

private:
template <typename DataType, typename IndexType>
Expand Down Expand Up @@ -112,13 +119,14 @@ public:

AbstractBMP &get_ref() { return bmp_; }

SharedPtr<ChunkIndexEntry> Dump(SegmentIndexEntry *segment_index_entry, BufferManager *buffer_mgr) const ;
SharedPtr<ChunkIndexEntry> Dump(SegmentIndexEntry *segment_index_entry, BufferManager *buffer_mgr, SizeT *dump_size = nullptr);

private:
RowID begin_row_id_ = {};
AbstractBMP bmp_ = nullptr;
mutable bool own_memory_ = true;
mutable BufferHandle chunk_handle_{};
SegmentIndexEntry *segment_index_entry_;
};

} // namespace infinity
} // namespace infinity
24 changes: 16 additions & 8 deletions src/storage/knn_index/sparse/bmp_alg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

module;

#include "common/simd/simd_common_intrin_include.h"
#include <algorithm>
#include <vector>
#include "common/simd/simd_common_intrin_include.h"

module bmp_alg;

Expand All @@ -25,12 +25,15 @@ import third_party;
import serialize;
import segment_iter;
import bp_reordering;
import bmp_blockterms;

namespace infinity {

template <typename DataType, BMPCompressType CompressType>
template <typename IdxType>
void BMPIvt<DataType, CompressType>::AddBlock(BMPBlockID block_id, const Vector<Pair<Vector<IdxType>, Vector<DataType>>> &tail_terms) {
void BMPIvt<DataType, CompressType>::AddBlock(BMPBlockID block_id,
const Vector<Pair<Vector<IdxType>, Vector<DataType>>> &tail_terms,
SizeT &mem_usage) {
HashMap<IdxType, DataType> max_scores;
for (const auto &[indices, data] : tail_terms) {
SizeT block_size = indices.size();
Expand All @@ -41,7 +44,7 @@ void BMPIvt<DataType, CompressType>::AddBlock(BMPBlockID block_id, const Vector<
}
}
for (const auto &[term_id, score] : max_scores) {
postings_[term_id].data_.AddBlock(block_id, score);
postings_[term_id].data_.AddBlock(block_id, score, mem_usage);
}
}

Expand All @@ -65,7 +68,7 @@ template class BMPIvt<f64, BMPCompressType::kCompressed>;
template class BMPIvt<f64, BMPCompressType::kRaw>;

template <typename DataType, typename IdxType>
SizeT TailFwd<DataType, IdxType>::AddDoc(const SparseVecRef<DataType, IdxType> &doc) {
SizeT TailFwd<DataType, IdxType>::AddDoc(const SparseVecRef<DataType, IdxType> &doc, SizeT &mem_usage) {
Vector<IdxType> indices;
Vector<DataType> data;
indices.reserve(doc.nnz_);
Expand All @@ -75,6 +78,7 @@ SizeT TailFwd<DataType, IdxType>::AddDoc(const SparseVecRef<DataType, IdxType> &
data.push_back(doc.data_[i]);
}
tail_terms_.emplace_back(std::move(indices), std::move(data));
mem_usage += doc.nnz_ * (sizeof(IdxType) + sizeof(DataType));
return tail_terms_.size();
}

Expand Down Expand Up @@ -144,8 +148,8 @@ template class TailFwd<f64, i16>;
template class TailFwd<f64, i8>;

template <typename DataType, typename IdxType>
Optional<TailFwd<DataType, IdxType>> BlockFwd<DataType, IdxType>::AddDoc(const SparseVecRef<DataType, IdxType> &doc) {
SizeT tail_size = tail_fwd_.AddDoc(doc);
Optional<TailFwd<DataType, IdxType>> BlockFwd<DataType, IdxType>::AddDoc(const SparseVecRef<DataType, IdxType> &doc, SizeT &mem_usage) {
SizeT tail_size = tail_fwd_.AddDoc(doc, mem_usage);
vsian marked this conversation as resolved.
Show resolved Hide resolved
if (tail_size < block_size_) {
return None;
}
Expand All @@ -154,6 +158,7 @@ Optional<TailFwd<DataType, IdxType>> BlockFwd<DataType, IdxType>::AddDoc(const S

Vector<Tuple<IdxType, Vector<BMPBlockOffset>, Vector<DataType>>> block_terms = tail_fwd1.ToBlockFwd();
block_terms_list_.emplace_back(block_terms);
mem_usage += block_terms_list_.back().GetSizeInBytes();
return tail_fwd1;
}

Expand Down Expand Up @@ -242,14 +247,17 @@ void BMPAlg<DataType, IdxType, CompressType>::AddDoc(const SparseVecRef<DataType
lock = std::unique_lock(mtx_);
}

SizeT mem_usage = 0;
doc_ids_.push_back(doc_id);
Optional<TailFwd<DataType, IdxType>> tail_fwd = block_fwd_.AddDoc(doc);
Optional<TailFwd<DataType, IdxType>> tail_fwd = block_fwd_.AddDoc(doc, mem_usage);
if (!tail_fwd.has_value()) {
mem_usage_.fetch_add(sizeof(BMPDocID) + mem_usage);
return;
}
BMPBlockID block_id = block_fwd_.block_num() - 1;
const auto &tail_terms = tail_fwd->GetTailTerms();
bm_ivt_.AddBlock(block_id, tail_terms);
bm_ivt_.AddBlock(block_id, tail_terms, mem_usage);
mem_usage_.fetch_add(sizeof(BMPDocID) + mem_usage);
}

template <typename DataType, typename IdxType, BMPCompressType CompressType>
Expand Down
Loading
Loading