Skip to content

Commit

Permalink
Add bpreorder (#1426)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Add: bmp reorder optimize for bmp index.
ref: "Compressing Graphs and Indexes with Recursive Graph Bisection"
usage: "Optimize idx_name ON table_name WITH (bp_reorder)"

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
  • Loading branch information
small-turtle-1 authored Jul 2, 2024
1 parent 9ffa6cb commit f9b9eee
Show file tree
Hide file tree
Showing 12 changed files with 391 additions and 96 deletions.
21 changes: 20 additions & 1 deletion benchmark/local_infinity/sparse/bmp_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,25 @@ int main(int argc, char *argv[]) {
LocalFileSystem fs;

switch (opt.mode_type_) {
case ModeType::kShuffle: {
SparseMatrix<f32, i32> data_mat = DecodeSparseDataset(opt.data_path_);
Vector<SizeT> idx = ShuffleSparseMatrix(data_mat);
Vector<SizeT> inv_idx(data_mat.nrow_);
for (i64 i = 0; i < data_mat.nrow_; i++) {
inv_idx[idx[i]] = i;
}
SaveSparseMatrix(data_mat, opt.data_save_path_);

auto [topk, query_n, indices, scores] = DecodeGroundtruth(opt.groundtruth_path_, false);
auto new_indices = MakeUniqueForOverwrite<i32[]>(query_n * topk);
for (SizeT i = 0; i < query_n; i++) {
for (SizeT j = 0; j < topk; j++) {
new_indices.get()[i * topk + j] = inv_idx[indices.get()[i * topk + j]];
}
}
SaveGroundtruth(topk, query_n, new_indices.get(), scores.get(), opt.groundtruth_save_path_);
break;
}
case ModeType::kImport: {
SparseMatrix<f32, i32> data_mat = DecodeSparseDataset(opt.data_path_);
profiler.Begin();
Expand All @@ -67,7 +86,7 @@ int main(int argc, char *argv[]) {
}
data_mat.Clear();

BMPOptimizeOptions optimize_options{.topk_ = opt.topk_};
BMPOptimizeOptions optimize_options{.topk_ = opt.topk_, .bp_reorder_ = opt.bp_reorder_};
std::cout << "Optimizing index...\n";
index.Optimize(optimize_options);
std::cout << "Index built\n";
Expand Down
3 changes: 3 additions & 0 deletions benchmark/local_infinity/sparse/sparse_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ int main(int argc, char *argv[]) {
}
break;
}
default: {
UnrecoverableError("Unknown mode type");
}
}

return 0;
Expand Down
97 changes: 85 additions & 12 deletions benchmark/local_infinity/sparse/sparse_benchmark_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,27 @@ SparseMatrix<f32, i32> DecodeSparseDataset(const Path &data_path) {
return SparseMatrix<f32, i32>::Load(*file_handler);
}

Vector<SizeT> ShuffleSparseMatrix(SparseMatrix<f32, i32> &mat) {
Vector<SizeT> idx(mat.nrow_);
std::iota(idx.begin(), idx.end(), 0);
std::shuffle(idx.begin(), idx.end(), std::mt19937(std::random_device()()));

auto indptr = MakeUniqueForOverwrite<i64[]>(mat.nrow_ + 1);
auto indices = MakeUniqueForOverwrite<i32[]>(mat.nnz_);
auto data = MakeUniqueForOverwrite<f32[]>(mat.nnz_);

indptr[0] = 0;
for (i64 i = 0; i < mat.nrow_; ++i) {
indptr[i + 1] = indptr[i] + mat.indptr_[idx[i] + 1] - mat.indptr_[idx[i]];
std::copy(mat.indices_.get() + mat.indptr_[idx[i]], mat.indices_.get() + mat.indptr_[idx[i] + 1], indices.get() + indptr[i]);
std::copy(mat.data_.get() + mat.indptr_[idx[i]], mat.data_.get() + mat.indptr_[idx[i] + 1], data.get() + indptr[i]);
}
mat.data_ = std::move(data);
mat.indices_ = std::move(indices);
mat.indptr_ = std::move(indptr);
return idx; // idx[i] = j means original i row is shuffled to j row
}

void SaveSparseMatrix(const SparseMatrix<f32, i32> &mat, const Path &data_path) {
LocalFileSystem fs;
auto [file_handler, status] = fs.OpenFile(data_path.string(), FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kNoLock);
Expand Down Expand Up @@ -75,6 +96,18 @@ Tuple<u32, u32, UniquePtr<i32[]>, UniquePtr<f32[]>> DecodeGroundtruth(const Path
return {top_k, query_n, std::move(indices), std::move(scores)};
}

void SaveGroundtruth(u32 top_k, u32 query_n, const i32 *indices, const f32 *scores, const Path &groundtruth_path) {
LocalFileSystem fs;
auto [file_handler, status] = fs.OpenFile(groundtruth_path.string(), FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kNoLock);
if (!status.ok()) {
UnrecoverableError(fmt::format("Can't open file: {}, reason: {}", groundtruth_path.string(), status.message()));
}
file_handler->Write(&query_n, sizeof(query_n));
file_handler->Write(&top_k, sizeof(top_k));
file_handler->Write(indices, sizeof(i32) * query_n * top_k);
file_handler->Write(scores, sizeof(f32) * query_n * top_k);
}

const int kQueryLogInterval = 100;

Vector<Pair<Vector<u32>, Vector<f32>>> Search(i32 thread_n,
Expand Down Expand Up @@ -146,6 +179,7 @@ f32 CheckGroundtruth(i32 *gt_indices_list, f32 *gt_score_list, const Vector<Pair
enum class ModeType : i8 {
kImport,
kQuery,
kShuffle,
};

enum class DataSetType : u8 {
Expand All @@ -162,6 +196,7 @@ struct BenchmarkOption {
Map<String, ModeType> mode_type_map = {
{"import", ModeType::kImport},
{"query", ModeType::kQuery},
{"shuffle", ModeType::kShuffle},
};
Map<String, DataSetType> dataset_type_map = {
{"small", DataSetType::kSmall},
Expand All @@ -173,6 +208,7 @@ struct BenchmarkOption {
app_.add_option("--dataset", dataset_type_, "Dataset type")
->required()
->transform(CLI::CheckedTransformer(dataset_type_map, CLI::ignore_case));
app_.add_option("--shuffled", shuffled_, "Shuffled data")->required(false)->transform(CLI::TypeValidator<bool>());
app_.add_option("--query_n", query_n_, "Test query number")->required(false)->transform(CLI::TypeValidator<i64>());
app_.add_option("--thread_n", thread_n_, "Thread number")->required(false)->transform(CLI::Range(1, 1024));
ParseInner(app_);
Expand All @@ -184,23 +220,49 @@ struct BenchmarkOption {
data_path_ = dataset_dir;
groundtruth_path_ = dataset_dir;
index_save_path_ = tmp_data_path();
data_save_path_ = dataset_dir;
groundtruth_save_path_ = dataset_dir;
switch (dataset_type_) {
case DataSetType::kSmall: {
data_path_ /= "base_small.csr";
groundtruth_path_ /= "base_small.dev.gt";
index_save_path_ /= fmt::format("small_{}.bin", index_name);
if (!shuffled_) {
data_path_ /= "base_small.csr";
groundtruth_path_ /= "base_small.dev.gt";
index_save_path_ /= fmt::format("small_{}.bin", index_name);
} else {
data_path_ /= "base_small_shuffled.csr";
groundtruth_path_ /= "base_small_shuffled.dev.gt";
index_save_path_ /= fmt::format("small_shuffled_{}.bin", index_name);
}
data_save_path_ /= "base_small_shuffled.csr";
groundtruth_save_path_ /= "base_small_shuffled.dev.gt";
break;
}
case DataSetType::k1M: {
data_path_ /= "base_1M.csr";
groundtruth_path_ /= "base_1M.dev.gt";
index_save_path_ /= fmt::format("1M_{}.bin", index_name);
if (!shuffled_) {
data_path_ /= "base_1M.csr";
groundtruth_path_ /= "base_1M.dev.gt";
index_save_path_ /= fmt::format("1M_{}.bin", index_name);
} else {
data_path_ /= "base_1M_shuffled.csr";
groundtruth_path_ /= "base_1M_shuffled.dev.gt";
index_save_path_ /= fmt::format("1M_shuffled_{}.bin", index_name);
}
data_save_path_ /= "base_1M_shuffled.csr";
groundtruth_save_path_ /= "base_1M_shuffled.dev.gt";
break;
}
case DataSetType::kFull: {
data_path_ /= "base_full.csr";
groundtruth_path_ /= "base_full.dev.gt";
index_save_path_ /= fmt::format("full_{}.bin", index_name);
if (!shuffled_) {
data_path_ /= "base_full.csr";
groundtruth_path_ /= "base_full.dev.gt";
index_save_path_ /= fmt::format("full_{}.bin", index_name);
} else {
data_path_ /= "base_full_shuffled.csr";
groundtruth_path_ /= "base_full_shuffled.dev.gt";
index_save_path_ /= fmt::format("full_shuffled_{}.bin", index_name);
}
data_save_path_ /= "base_full_shuffled.csr";
groundtruth_save_path_ /= "base_full_shuffled.dev.gt";
break;
}
default: {
Expand All @@ -218,13 +280,16 @@ struct BenchmarkOption {
public:
ModeType mode_type_ = ModeType::kImport;
DataSetType dataset_type_ = DataSetType::kSmall;
bool shuffled_ = false;
i64 query_n_ = 0;
i32 thread_n_ = 1;

Path data_path_;
Path query_path_;
Path groundtruth_path_;
Path index_save_path_;
Path data_save_path_;
Path groundtruth_save_path_;

protected:
CLI::App app_;
Expand Down Expand Up @@ -257,18 +322,26 @@ struct BMPOption : public BenchmarkOption {
app_.add_option("--type", type_, "BMP compress type")
->required(false)
->transform(CLI::CheckedTransformer(bmp_compress_type_map, CLI::ignore_case));
app_.add_option("--bp_reorder", bp_reorder_, "BP reorder")->required(false)->transform(CLI::TypeValidator<bool>());
app_.add_option("--topk", topk_, "Topk")->required(false)->transform(CLI::Range(1, 1024));
app_.add_option("--block_size", block_size_, "Block size")->required(false)->transform(CLI::Range(1, 1024));
app_.add_option("--block_size", block_size_, "Block size")->required(false)->transform(CLI::Range(1, 256));
app_.add_option("--alpha", alpha_, "Alpha")->required(false)->transform(CLI::Range(0.0, 100.0));
app_.add_option("--beta", beta_, "Beta")->required(false)->transform(CLI::Range(0.0, 100.0));
}

String IndexName() const override { return fmt::format("bmp_block{}_type{}", block_size_, static_cast<i8>(type_)); }
String IndexName() const override {
String name = fmt::format("bmp_block{}_type{}", block_size_, static_cast<i8>(type_));
if (bp_reorder_) {
name += "_bp";
}
return name;
}

public:
BMPCompressType type_ = BMPCompressType::kCompressed;
bool bp_reorder_ = false;
i32 topk_ = 10;
u8 block_size_ = 8;
SizeT block_size_ = 8;
f32 alpha_ = 1.0;
f32 beta_ = 1.0;
};
Expand Down
100 changes: 82 additions & 18 deletions src/storage/knn_index/sparse/bmp_alg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@ import infinity_exception;
import third_party;
import serialize;
import segment_iter;
import bp_reordering;

namespace infinity {

template <typename DataType, BMPCompressType CompressType>
template <typename IdxType>
void BMPIvt<DataType, CompressType>::AddBlock(BMPBlockID block_id, const Vector<Vector<Pair<IdxType, DataType>>> &tail_terms) {
void BMPIvt<DataType, CompressType>::AddBlock(BMPBlockID block_id, const Vector<Pair<Vector<IdxType>, Vector<DataType>>> &tail_terms) {
HashMap<IdxType, DataType> max_scores;
for (const auto &terms : tail_terms) {
for (const auto &[term_id, score] : terms) {
for (const auto &[indices, data] : tail_terms) {
SizeT block_size = indices.size();
for (SizeT i = 0; i < block_size; ++i) {
IdxType term_id = indices[i];
DataType score = data[i];
max_scores[term_id] = std::max(max_scores[term_id], score);
}
}
Expand Down Expand Up @@ -62,11 +66,15 @@ template class BMPIvt<f64, BMPCompressType::kRaw>;

template <typename DataType, typename IdxType>
SizeT TailFwd<DataType, IdxType>::AddDoc(const SparseVecRef<DataType, IdxType> &doc) {
Vector<Pair<IdxType, DataType>> doc_terms;
Vector<IdxType> indices;
Vector<DataType> data;
indices.reserve(doc.nnz_);
data.reserve(doc.nnz_);
for (i32 i = 0; i < doc.nnz_; ++i) {
doc_terms.emplace_back(doc.indices_[i], doc.data_[i]);
indices.push_back(doc.indices_[i]);
data.push_back(doc.data_[i]);
}
tail_terms_.emplace_back(std::move(doc_terms));
tail_terms_.emplace_back(std::move(indices), std::move(data));
return tail_terms_.size();
}

Expand All @@ -75,7 +83,10 @@ Vector<Tuple<IdxType, Vector<BMPBlockOffset>, Vector<DataType>>> TailFwd<DataTyp
Vector<Tuple<IdxType, BMPBlockOffset, DataType>> term_pairs;
SizeT block_size = tail_terms_.size();
for (SizeT block_offset = 0; block_offset < block_size; ++block_offset) {
for (const auto &[term_id, score] : tail_terms_[block_offset]) {
SizeT block_size = tail_terms_[block_offset].first.size();
for (SizeT i = 0; i < block_size; ++i) {
IdxType term_id = tail_terms_[block_offset].first[i];
DataType score = tail_terms_[block_offset].second[i];
term_pairs.emplace_back(term_id, block_offset, score);
}
}
Expand Down Expand Up @@ -106,19 +117,19 @@ Vector<DataType> TailFwd<DataType, IdxType>::GetScores(const SparseVecRef<DataTy
SizeT tail_size = tail_terms_.size();
Vector<DataType> res(tail_size, 0.0);
for (SizeT offset = 0; offset < tail_size; ++offset) {
const auto &tail_terms = tail_terms_[offset];
const auto &[indices, data] = tail_terms_[offset];
SizeT j = 0;
for (i32 i = 0; i < query.nnz_; ++i) {
IdxType query_term = query.indices_[i];
DataType query_score = query.data_[i];
while (j < tail_terms.size() && tail_terms[j].first < query_term) {
while (j < indices.size() && indices[j] < query_term) {
++j;
}
if (j == tail_terms.size()) {
if (j == indices.size()) {
break;
}
if (tail_terms[j].first == query_term) {
res[offset] += query_score * tail_terms[j].second;
if (indices[j] == query_term) {
res[offset] += query_score * data[j];
}
}
}
Expand All @@ -138,14 +149,33 @@ Optional<TailFwd<DataType, IdxType>> BlockFwd<DataType, IdxType>::AddDoc(const S
if (tail_size < block_size_) {
return None;
}
TailFwd<DataType, IdxType> tail_fwd1;
TailFwd<DataType, IdxType> tail_fwd1(block_size_);
std::swap(tail_fwd1, tail_fwd_);

Vector<Tuple<IdxType, Vector<BMPBlockOffset>, Vector<DataType>>> block_terms = tail_fwd1.ToBlockFwd();
block_terms_list_.emplace_back(block_terms);
return tail_fwd1;
}

template <typename DataType, typename IdxType>
Vector<Pair<Vector<IdxType>, Vector<DataType>>> BlockFwd<DataType, IdxType>::GetFwd(SizeT doc_num, SizeT term_num) const {
SizeT doc_n = doc_num / block_size_ * block_size_;
Vector<Pair<Vector<IdxType>, Vector<DataType>>> fwd(doc_n);
for (SizeT block_id = 0; block_id < block_terms_list_.size(); ++block_id) {
const auto &block_terms = block_terms_list_[block_id];
for (auto iter = block_terms.Iter(); iter.HasNext(); iter.Next()) {
const auto &[term_id, block_size, block_offsets, scores] = iter.Value();
for (SizeT i = 0; i < block_size; ++i) {
BMPDocID doc_id = block_offsets[i] + block_id * block_size_;
DataType score = scores[i];
fwd[doc_id].first.push_back(term_id);
fwd[doc_id].second.push_back(score);
}
}
}
return fwd;
}

template <typename DataType, typename IdxType>
Vector<Vector<DataType>> BlockFwd<DataType, IdxType>::GetIvtScores(SizeT term_num) const {
Vector<Vector<DataType>> res(term_num);
Expand Down Expand Up @@ -206,8 +236,11 @@ template class BlockFwd<f64, i16>;
template class BlockFwd<f64, i8>;

template <typename DataType, typename IdxType, BMPCompressType CompressType>
void BMPAlg<DataType, IdxType, CompressType>::AddDoc(const SparseVecRef<DataType, IdxType> &doc, BMPDocID doc_id) {
std::unique_lock lock(mtx_);
void BMPAlg<DataType, IdxType, CompressType>::AddDoc(const SparseVecRef<DataType, IdxType> &doc, BMPDocID doc_id, bool lck) {
std::unique_lock<std::shared_mutex> lock;
if (lck) {
lock = std::unique_lock(mtx_);
}

doc_ids_.push_back(doc_id);
Optional<TailFwd<DataType, IdxType>> tail_fwd = block_fwd_.AddDoc(doc);
Expand All @@ -223,9 +256,40 @@ template <typename DataType, typename IdxType, BMPCompressType CompressType>
void BMPAlg<DataType, IdxType, CompressType>::Optimize(const BMPOptimizeOptions &options) {
std::unique_lock lock(mtx_);

SizeT term_num = bm_ivt_.term_num();
Vector<Vector<DataType>> ivt_scores = block_fwd_.GetIvtScores(term_num);
bm_ivt_.Optimize(options.topk_, std::move(ivt_scores));
if (options.bp_reorder_) {
SizeT block_size = block_fwd_.block_size();
SizeT term_num = bm_ivt_.term_num();
SizeT doc_num = doc_ids_.size() - doc_ids_.size() % block_size;

bm_ivt_ = BMPIvt<DataType, CompressType>(term_num);
Vector<Pair<Vector<IdxType>, Vector<DataType>>> fwd = block_fwd_.GetFwd(doc_num, term_num);
TailFwd<DataType, IdxType> tail_fwd = block_fwd_.GetTailFwd();
block_fwd_ = BlockFwd<DataType, IdxType>(block_size);

BPReordering<IdxType, BMPDocID> bp;
for (BMPDocID i = 0; i < doc_num; ++i) {
bp.AddDoc(&fwd[i].first);
}
Vector<BMPDocID> remap = bp(term_num);

Vector<BMPDocID> doc_ids;
std::swap(doc_ids, doc_ids_);
for (BMPDocID new_id = 0; new_id < doc_num; ++new_id) {
BMPDocID old_id = remap[new_id];
SparseVecRef<DataType, IdxType> doc((i32)fwd[old_id].first.size(), fwd[old_id].first.data(), fwd[old_id].second.data());
this->AddDoc(doc, doc_ids[old_id], false);
}
for (BMPDocID i = doc_num; i < doc_ids.size(); ++i) {
const auto &[indices, data] = tail_fwd.GetTailTerms()[i - doc_num];
SparseVecRef<DataType, IdxType> doc((i32)indices.size(), indices.data(), data.data());
this->AddDoc(doc, doc_ids[i], false);
}
}
if (options.topk_ != 0) {
SizeT term_num = bm_ivt_.term_num();
Vector<Vector<DataType>> ivt_scores = block_fwd_.GetIvtScores(term_num);
bm_ivt_.Optimize(options.topk_, std::move(ivt_scores));
}
}

template <typename DataType, typename IdxType, BMPCompressType CompressType>
Expand Down
Loading

0 comments on commit f9b9eee

Please sign in to comment.