From 3eddb248ca242d8168e74248b1483d5f40a6359a Mon Sep 17 00:00:00 2001 From: yangzq50 <58433399+yangzq50@users.noreply.github.com> Date: Thu, 4 Jul 2024 22:47:23 +0800 Subject: [PATCH] Add script for generating colbert embeddings for MLDR benchmark (#1434) ### What problem does this PR solve? Add script for generating colbert embeddings for MLDR benchmark ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- .../generate_colbert_embedding.py | 76 +++++++- .../mldr_benchmark/insert_data_50000.py | 4 +- .../insert_data_with_colbert_50000.py | 162 ++++++++++++++++++ .../mldr_benchmark/mldr_common_tools.py | 16 ++ src/common/default_values.cppm | 2 +- 5 files changed, 249 insertions(+), 11 deletions(-) create mode 100644 python/benchmark/mldr_benchmark/insert_data_with_colbert_50000.py diff --git a/python/benchmark/mldr_benchmark/generate_colbert_embedding.py b/python/benchmark/mldr_benchmark/generate_colbert_embedding.py index 5855193378..39a4b58fd3 100644 --- a/python/benchmark/mldr_benchmark/generate_colbert_embedding.py +++ b/python/benchmark/mldr_benchmark/generate_colbert_embedding.py @@ -10,9 +10,9 @@ """ import os import struct +import numpy as np import datasets from tqdm import tqdm -from FlagEmbedding import BGEM3FlagModel from dataclasses import dataclass, field from transformers import HfArgumentParser from mldr_common_tools import EvalArgs, check_languages, load_corpus @@ -20,15 +20,63 @@ @dataclass class ModelArgs: + model: str = field(default="jina-colbert", metadata={'help': 'The model name of the model to use.'}) fp16: bool = field(default=True, metadata={'help': 'Use fp16 in inference?'}) -def get_model(model_args: ModelArgs): - model = BGEM3FlagModel("BAAI/bge-m3", use_fp16=model_args.fp16) - return model +class FakeBGEM3FlagModelColBERTV2: + def __init__(self): + from colbert.modeling.checkpoint import Checkpoint + from colbert.infra import ColBERTConfig + self.ckpt = Checkpoint("colbert-ir/colbertv2.0", colbert_config=ColBERTConfig(root="experiments")) + from langchain.text_splitter import RecursiveCharacterTextSplitter + self.text_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=0, length_function=len, + is_separator_regex=False) + + def encode(self, text: list[str], batch_size: int, max_length: int, return_dense: bool, return_sparse: bool, + return_colbert_vecs: bool): + assert return_colbert_vecs is True and return_dense is False and return_sparse is False + colbert_result = [] + for one_text in tqdm(text, desc="Generating ColBERT-v2.0 embeddings"): + chunks = self.text_splitter.create_documents([one_text]) + subtext_tensor = [self.ckpt.docFromText([chunk.page_content], to_cpu=True)[0].numpy() for chunk in chunks] + # now concatenate them together + concatenated = np.concatenate(subtext_tensor) + assert concatenated.ndim == 2 and concatenated.shape[1] == 128 + colbert_result.append(concatenated) + return {'colbert_vecs': colbert_result} + + +class FakeBGEM3FlagModelJinaColBERT: + def __init__(self): + from colbert.modeling.checkpoint import Checkpoint + from colbert.infra import ColBERTConfig + self.ckpt = Checkpoint("jinaai/jina-colbert-v1-en", colbert_config=ColBERTConfig(doc_maxlen=8192)) + + def encode(self, text: list[str], batch_size: int, max_length: int, return_dense: bool, return_sparse: bool, + return_colbert_vecs: bool): + assert return_colbert_vecs is True and return_dense is False and return_sparse is False + colbert_result = [] + for one_text in tqdm(text, desc="Generating jina-colbert-v1-en embeddings"): + text_tensor = self.ckpt.docFromText([one_text], to_cpu=True)[0].numpy() + assert text_tensor.ndim == 2 and text_tensor.shape[1] == 128 + colbert_result.append(text_tensor) + return {'colbert_vecs': colbert_result} -def generate_multivec(model: BGEM3FlagModel, corpus: datasets.Dataset, max_passage_length: int, batch_size: int, +def get_model(model_args: ModelArgs): + if model_args.model == "BAAI/bge-m3": + from FlagEmbedding import BGEM3FlagModel + return BGEM3FlagModel("BAAI/bge-m3", use_fp16=model_args.fp16) + elif model_args.model == "jina-colbert": + return FakeBGEM3FlagModelJinaColBERT() + elif model_args.model == "colbertv2.0": + return FakeBGEM3FlagModelColBERTV2() + else: + raise ValueError(f"Model {model_args.model} is not supported.") + + +def generate_multivec(model, corpus: datasets.Dataset, max_passage_length: int, batch_size: int, begin_pos: int, end_pos: int): result_dict = model.encode(corpus["text"][begin_pos: end_pos], batch_size=batch_size, max_length=max_passage_length, return_dense=False, return_sparse=False, return_colbert_vecs=True) @@ -52,14 +100,24 @@ def main(): eval_args: EvalArgs languages = check_languages(eval_args.languages) + if model_args.model == "jina-colbert": + if languages != ["en"]: + raise ValueError(f"Model {model_args.model} only supports English language.") model = get_model(model_args=model_args) print("==================================================") - print("Start generating colbert embedding with model: BAAI/bge-m3") - + print(f"Start generating colbert embedding with model: {model_args.model} ...") + if model_args.model == "BAAI/bge-m3": + save_dir_name = "bge-m3" + elif model_args.model == "colbertv2.0": + save_dir_name = "colbertv2" + elif model_args.model == "jina-colbert": + save_dir_name = "jina-colbert" + else: + raise ValueError(f"Model {model_args.model} is not supported.") print('Generate embedding of following languages: ', languages) for lang in languages: print("**************************************************") - embedding_save_dir = os.path.join(eval_args.embedding_save_dir, 'bge-m3', lang) + embedding_save_dir = os.path.join(eval_args.embedding_save_dir, save_dir_name, lang) if not os.path.exists(embedding_save_dir): os.makedirs(embedding_save_dir) colbert_save_file = os.path.join(embedding_save_dir, f'colbert-{eval_args.begin_pos}-{eval_args.end_pos}.data') @@ -77,7 +135,7 @@ def main(): save_result(colbert_embeddings, colbert_save_file) print("==================================================") - print("Finish generating colbert embeddings with model: BAAI/bge-m3") + print(f"Finish generating colbert embeddings with model: {model_args.model} ...") if __name__ == "__main__": diff --git a/python/benchmark/mldr_benchmark/insert_data_50000.py b/python/benchmark/mldr_benchmark/insert_data_50000.py index 318ef56d4a..d511f8cbaf 100644 --- a/python/benchmark/mldr_benchmark/insert_data_50000.py +++ b/python/benchmark/mldr_benchmark/insert_data_50000.py @@ -91,10 +91,12 @@ def main(self): sparse_base_name = f"sparse-{sparse_pos_part_begin}-{sparse_pos_part_end}.data" sparse_data = read_mldr_sparse_embedding_yield(os.path.join(sparse_embedding_dir, sparse_base_name)) docid_str = docid_list[row_pos] + insert_dense_data = next(dense_data) + insert_sparse_data = next(sparse_data) if int(docid_str.split('-')[-1]) >= 189796: continue insert_dict = {"docid_col": docid_str, "fulltext_col": corpus_text_list[row_pos], - "dense_col": next(dense_data), "sparse_col": next(sparse_data)} + "dense_col": insert_dense_data, "sparse_col": insert_sparse_data} buffer.append(insert_dict) if len(buffer) > 0: self.infinity_table.insert(buffer) diff --git a/python/benchmark/mldr_benchmark/insert_data_with_colbert_50000.py b/python/benchmark/mldr_benchmark/insert_data_with_colbert_50000.py new file mode 100644 index 0000000000..059adf94b3 --- /dev/null +++ b/python/benchmark/mldr_benchmark/insert_data_with_colbert_50000.py @@ -0,0 +1,162 @@ +# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import infinity +import numpy as np +from tqdm import tqdm +import infinity.index as index +from infinity.common import ConflictType, LOCAL_HOST +from mldr_common_tools import load_corpus, fvecs_read_yield, read_mldr_sparse_embedding_yield, read_colbert_data_yield +from infinity.errors import ErrorCode + + +def get_all_part_begin_ends(total_row_count: int): + result = [] + pos_now = 0 + while pos_now < total_row_count: + new_pos = int(input("input part end position: ")) + if pos_now >= new_pos or new_pos > total_row_count: + print("Invalid value. Input again.") + continue + result.append((pos_now, new_pos)) + pos_now = new_pos + return result + + +def get_bit_array(float_array: list[list]): + return [[1.0 if x > 0.0 else 0.0 for x in one_list] for one_list in float_array] + + +# fulltext column, dense embedding column, sparse embedding column +class InfinityClientForInsert: + def __init__(self): + self.test_db_name = "default_db" + self.test_table_name_prefix = "mldr_test_table_text_dense_sparse_" + self.test_table_schema = {"docid_col": {"type": "varchar"}, "fulltext_col": {"type": "varchar"}, + "dense_col": {"type": "vector,1024,float"}, + "sparse_col": {"type": "sparse,250002,float,int"}, + "colbert_col": {"type": "tensor,128,float"}, + "colbert_bit_col": {"type": "tensor,128,bit"}} + self.infinity_obj = infinity.connect(LOCAL_HOST) + self.infinity_db = self.infinity_obj.create_database(self.test_db_name, ConflictType.Ignore) + self.infinity_table = None + + def create_test_table(self, language_suffix: str): + table_name = self.test_table_name_prefix + language_suffix + self.infinity_db.drop_table(table_name, ConflictType.Ignore) + self.infinity_table = self.infinity_db.create_table(table_name, self.test_table_schema) + print("Create table successfully.") + + def main(self): + lang = input("Input language to insert: ") + self.create_test_table(lang) + corpus = load_corpus(lang) + total_num = corpus.num_rows + docid_list = corpus["docid"] + corpus_text_list = corpus["text"] + del corpus + print(f"Expect total number of rows: {total_num}") + dense_embedding_dir = input("Input dense embedding data files dir: ") + print("Input begin and end position pairs of dense embedding data to insert:") + dense_part_begin_ends = get_all_part_begin_ends(total_num) + sparse_embedding_dir = input("Input sparse embedding data files dir: ") + print("Input begin and end position pairs of sparse embedding data to insert:") + sparse_part_begin_ends = get_all_part_begin_ends(total_num) + colbert_embedding_dir = input("Input colbert embedding data files dir: ") + print("Input begin and end position pairs of colbert embedding data to insert:") + colbert_part_begin_ends = get_all_part_begin_ends(total_num) + insert_num = total_num + batch_size = 1024 + print("Start inserting data...") + dense_data = None + dense_pos_part_end = 0 + dense_pair_id_next = 0 + sparse_data = None + sparse_pos_part_end = 0 + sparse_pair_id_next = 0 + colbert_data = None + colbert_pos_part_end = 0 + colbert_pair_id_next = 0 + for begin_idx in tqdm(range(0, insert_num, batch_size)): + end_idx = min(begin_idx + batch_size, insert_num) + buffer = [] + for row_pos in range(begin_idx, end_idx): + if row_pos == dense_pos_part_end: + dense_pos_part_begin, dense_pos_part_end = dense_part_begin_ends[dense_pair_id_next] + dense_pair_id_next += 1 + dense_base_name = f"dense-{dense_pos_part_begin}-{dense_pos_part_end}.fvecs" + dense_data = fvecs_read_yield(os.path.join(dense_embedding_dir, dense_base_name)) + if row_pos == sparse_pos_part_end: + sparse_pos_part_begin, sparse_pos_part_end = sparse_part_begin_ends[sparse_pair_id_next] + sparse_pair_id_next += 1 + sparse_base_name = f"sparse-{sparse_pos_part_begin}-{sparse_pos_part_end}.data" + sparse_data = read_mldr_sparse_embedding_yield(os.path.join(sparse_embedding_dir, sparse_base_name)) + if row_pos == colbert_pos_part_end: + colbert_pos_part_begin, colbert_pos_part_end = colbert_part_begin_ends[colbert_pair_id_next] + colbert_pair_id_next += 1 + colbert_base_name = f"colbert-{colbert_pos_part_begin}-{colbert_pos_part_end}.data" + colbert_data = read_colbert_data_yield(os.path.join(colbert_embedding_dir, colbert_base_name)) + docid_str = docid_list[row_pos] + insert_dense_data = next(dense_data) + insert_sparse_data = next(sparse_data) + colbert_list = next(colbert_data) + if int(docid_str.split('-')[-1]) >= 189796: + continue + insert_dict = {"docid_col": docid_str, "fulltext_col": corpus_text_list[row_pos], + "dense_col": insert_dense_data, "sparse_col": insert_sparse_data, + "colbert_col": colbert_list, "colbert_bit_col": get_bit_array(colbert_list)} + buffer.append(insert_dict) + if len(buffer) > 0: + self.infinity_table.insert(buffer) + del buffer + print("Finish inserting data.") + del dense_data + del sparse_data + del docid_list + del corpus_text_list + print("Start creating fulltext index.") + ft_params = [] + if lang == "zh": + ft_params.append(index.InitParameter("analyzer", "chinese")) + res = self.infinity_table.create_index("ft_index", + [index.IndexInfo("fulltext_col", index.IndexType.FullText, ft_params)], + ConflictType.Error) + assert res.error_code == ErrorCode.OK + print("Finish creating fulltext index.") + print("Start creating Hnsw index...") + res = self.infinity_table.create_index("hnsw_index", [index.IndexInfo("dense_col", index.IndexType.Hnsw, + [index.InitParameter("M", "16"), + index.InitParameter("ef_construction", + "200"), + index.InitParameter("ef", "200"), + index.InitParameter("metric", "ip"), + index.InitParameter("encode", "lvq")])], + ConflictType.Error) + assert res.error_code == ErrorCode.OK + print("Finish creating Hnsw index.") + print("Start creating BMP index...") + res = self.infinity_table.create_index("bmp_index", [index.IndexInfo("sparse_col", index.IndexType.BMP, + [index.InitParameter("block_size", "16"), + index.InitParameter("compress_type", + "compress")])], + ConflictType.Error) + assert res.error_code == ErrorCode.OK + self.infinity_table.optimize("bmp_index", {"topk": "1000"}) + print("Finish creating BMP index.") + + +if __name__ == "__main__": + infinity_client = InfinityClientForInsert() + infinity_client.main() diff --git a/python/benchmark/mldr_benchmark/mldr_common_tools.py b/python/benchmark/mldr_benchmark/mldr_common_tools.py index 376d17f351..3d0c01b8b0 100644 --- a/python/benchmark/mldr_benchmark/mldr_common_tools.py +++ b/python/benchmark/mldr_benchmark/mldr_common_tools.py @@ -197,3 +197,19 @@ def read_mldr_sparse_embedding_yield(file_path: str): yield one_dict else: raise ValueError(f"Unsupported file: {file_path}") + + +def read_colbert_data_yield(file_path: str): + with open(file_path, 'rb') as f: + while True: + try: + tensor_len = struct.unpack('::max(); // Each row has one chunk. constexpr u64 DEFAULT_FIXLEN_CHUNK_SIZE = 65536L; - constexpr u64 DEFAULT_FIXLEN_TENSOR_CHUNK_SIZE = 512UL * 1024UL * 4UL; + constexpr u64 DEFAULT_FIXLEN_TENSOR_CHUNK_SIZE = 8192UL * 128UL * 4UL; // segment related constants constexpr SizeT DEFAULT_SEGMENT_CAPACITY = 1024 * 8192; // 1024 * 8192 = 8M rows