Skip to content

Commit

Permalink
Add script for generating colbert embeddings for MLDR benchmark (#1434)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Add script for generating colbert embeddings for MLDR benchmark

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
  • Loading branch information
yangzq50 authored Jul 4, 2024
1 parent 4c892f8 commit 3eddb24
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 11 deletions.
76 changes: 67 additions & 9 deletions python/benchmark/mldr_benchmark/generate_colbert_embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,73 @@
"""
import os
import struct
import numpy as np
import datasets
from tqdm import tqdm
from FlagEmbedding import BGEM3FlagModel
from dataclasses import dataclass, field
from transformers import HfArgumentParser
from mldr_common_tools import EvalArgs, check_languages, load_corpus


@dataclass
class ModelArgs:
model: str = field(default="jina-colbert", metadata={'help': 'The model name of the model to use.'})
fp16: bool = field(default=True, metadata={'help': 'Use fp16 in inference?'})


def get_model(model_args: ModelArgs):
model = BGEM3FlagModel("BAAI/bge-m3", use_fp16=model_args.fp16)
return model
class FakeBGEM3FlagModelColBERTV2:
def __init__(self):
from colbert.modeling.checkpoint import Checkpoint
from colbert.infra import ColBERTConfig
self.ckpt = Checkpoint("colbert-ir/colbertv2.0", colbert_config=ColBERTConfig(root="experiments"))
from langchain.text_splitter import RecursiveCharacterTextSplitter
self.text_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=0, length_function=len,
is_separator_regex=False)

def encode(self, text: list[str], batch_size: int, max_length: int, return_dense: bool, return_sparse: bool,
return_colbert_vecs: bool):
assert return_colbert_vecs is True and return_dense is False and return_sparse is False
colbert_result = []
for one_text in tqdm(text, desc="Generating ColBERT-v2.0 embeddings"):
chunks = self.text_splitter.create_documents([one_text])
subtext_tensor = [self.ckpt.docFromText([chunk.page_content], to_cpu=True)[0].numpy() for chunk in chunks]
# now concatenate them together
concatenated = np.concatenate(subtext_tensor)
assert concatenated.ndim == 2 and concatenated.shape[1] == 128
colbert_result.append(concatenated)
return {'colbert_vecs': colbert_result}


class FakeBGEM3FlagModelJinaColBERT:
def __init__(self):
from colbert.modeling.checkpoint import Checkpoint
from colbert.infra import ColBERTConfig
self.ckpt = Checkpoint("jinaai/jina-colbert-v1-en", colbert_config=ColBERTConfig(doc_maxlen=8192))

def encode(self, text: list[str], batch_size: int, max_length: int, return_dense: bool, return_sparse: bool,
return_colbert_vecs: bool):
assert return_colbert_vecs is True and return_dense is False and return_sparse is False
colbert_result = []
for one_text in tqdm(text, desc="Generating jina-colbert-v1-en embeddings"):
text_tensor = self.ckpt.docFromText([one_text], to_cpu=True)[0].numpy()
assert text_tensor.ndim == 2 and text_tensor.shape[1] == 128
colbert_result.append(text_tensor)
return {'colbert_vecs': colbert_result}


def generate_multivec(model: BGEM3FlagModel, corpus: datasets.Dataset, max_passage_length: int, batch_size: int,
def get_model(model_args: ModelArgs):
if model_args.model == "BAAI/bge-m3":
from FlagEmbedding import BGEM3FlagModel
return BGEM3FlagModel("BAAI/bge-m3", use_fp16=model_args.fp16)
elif model_args.model == "jina-colbert":
return FakeBGEM3FlagModelJinaColBERT()
elif model_args.model == "colbertv2.0":
return FakeBGEM3FlagModelColBERTV2()
else:
raise ValueError(f"Model {model_args.model} is not supported.")


def generate_multivec(model, corpus: datasets.Dataset, max_passage_length: int, batch_size: int,
begin_pos: int, end_pos: int):
result_dict = model.encode(corpus["text"][begin_pos: end_pos], batch_size=batch_size, max_length=max_passage_length,
return_dense=False, return_sparse=False, return_colbert_vecs=True)
Expand All @@ -52,14 +100,24 @@ def main():
eval_args: EvalArgs

languages = check_languages(eval_args.languages)
if model_args.model == "jina-colbert":
if languages != ["en"]:
raise ValueError(f"Model {model_args.model} only supports English language.")
model = get_model(model_args=model_args)
print("==================================================")
print("Start generating colbert embedding with model: BAAI/bge-m3")

print(f"Start generating colbert embedding with model: {model_args.model} ...")
if model_args.model == "BAAI/bge-m3":
save_dir_name = "bge-m3"
elif model_args.model == "colbertv2.0":
save_dir_name = "colbertv2"
elif model_args.model == "jina-colbert":
save_dir_name = "jina-colbert"
else:
raise ValueError(f"Model {model_args.model} is not supported.")
print('Generate embedding of following languages: ', languages)
for lang in languages:
print("**************************************************")
embedding_save_dir = os.path.join(eval_args.embedding_save_dir, 'bge-m3', lang)
embedding_save_dir = os.path.join(eval_args.embedding_save_dir, save_dir_name, lang)
if not os.path.exists(embedding_save_dir):
os.makedirs(embedding_save_dir)
colbert_save_file = os.path.join(embedding_save_dir, f'colbert-{eval_args.begin_pos}-{eval_args.end_pos}.data')
Expand All @@ -77,7 +135,7 @@ def main():
save_result(colbert_embeddings, colbert_save_file)

print("==================================================")
print("Finish generating colbert embeddings with model: BAAI/bge-m3")
print(f"Finish generating colbert embeddings with model: {model_args.model} ...")


if __name__ == "__main__":
Expand Down
4 changes: 3 additions & 1 deletion python/benchmark/mldr_benchmark/insert_data_50000.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ def main(self):
sparse_base_name = f"sparse-{sparse_pos_part_begin}-{sparse_pos_part_end}.data"
sparse_data = read_mldr_sparse_embedding_yield(os.path.join(sparse_embedding_dir, sparse_base_name))
docid_str = docid_list[row_pos]
insert_dense_data = next(dense_data)
insert_sparse_data = next(sparse_data)
if int(docid_str.split('-')[-1]) >= 189796:
continue
insert_dict = {"docid_col": docid_str, "fulltext_col": corpus_text_list[row_pos],
"dense_col": next(dense_data), "sparse_col": next(sparse_data)}
"dense_col": insert_dense_data, "sparse_col": insert_sparse_data}
buffer.append(insert_dict)
if len(buffer) > 0:
self.infinity_table.insert(buffer)
Expand Down
162 changes: 162 additions & 0 deletions python/benchmark/mldr_benchmark/insert_data_with_colbert_50000.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import infinity
import numpy as np
from tqdm import tqdm
import infinity.index as index
from infinity.common import ConflictType, LOCAL_HOST
from mldr_common_tools import load_corpus, fvecs_read_yield, read_mldr_sparse_embedding_yield, read_colbert_data_yield
from infinity.errors import ErrorCode


def get_all_part_begin_ends(total_row_count: int):
result = []
pos_now = 0
while pos_now < total_row_count:
new_pos = int(input("input part end position: "))
if pos_now >= new_pos or new_pos > total_row_count:
print("Invalid value. Input again.")
continue
result.append((pos_now, new_pos))
pos_now = new_pos
return result


def get_bit_array(float_array: list[list]):
return [[1.0 if x > 0.0 else 0.0 for x in one_list] for one_list in float_array]


# fulltext column, dense embedding column, sparse embedding column
class InfinityClientForInsert:
def __init__(self):
self.test_db_name = "default_db"
self.test_table_name_prefix = "mldr_test_table_text_dense_sparse_"
self.test_table_schema = {"docid_col": {"type": "varchar"}, "fulltext_col": {"type": "varchar"},
"dense_col": {"type": "vector,1024,float"},
"sparse_col": {"type": "sparse,250002,float,int"},
"colbert_col": {"type": "tensor,128,float"},
"colbert_bit_col": {"type": "tensor,128,bit"}}
self.infinity_obj = infinity.connect(LOCAL_HOST)
self.infinity_db = self.infinity_obj.create_database(self.test_db_name, ConflictType.Ignore)
self.infinity_table = None

def create_test_table(self, language_suffix: str):
table_name = self.test_table_name_prefix + language_suffix
self.infinity_db.drop_table(table_name, ConflictType.Ignore)
self.infinity_table = self.infinity_db.create_table(table_name, self.test_table_schema)
print("Create table successfully.")

def main(self):
lang = input("Input language to insert: ")
self.create_test_table(lang)
corpus = load_corpus(lang)
total_num = corpus.num_rows
docid_list = corpus["docid"]
corpus_text_list = corpus["text"]
del corpus
print(f"Expect total number of rows: {total_num}")
dense_embedding_dir = input("Input dense embedding data files dir: ")
print("Input begin and end position pairs of dense embedding data to insert:")
dense_part_begin_ends = get_all_part_begin_ends(total_num)
sparse_embedding_dir = input("Input sparse embedding data files dir: ")
print("Input begin and end position pairs of sparse embedding data to insert:")
sparse_part_begin_ends = get_all_part_begin_ends(total_num)
colbert_embedding_dir = input("Input colbert embedding data files dir: ")
print("Input begin and end position pairs of colbert embedding data to insert:")
colbert_part_begin_ends = get_all_part_begin_ends(total_num)
insert_num = total_num
batch_size = 1024
print("Start inserting data...")
dense_data = None
dense_pos_part_end = 0
dense_pair_id_next = 0
sparse_data = None
sparse_pos_part_end = 0
sparse_pair_id_next = 0
colbert_data = None
colbert_pos_part_end = 0
colbert_pair_id_next = 0
for begin_idx in tqdm(range(0, insert_num, batch_size)):
end_idx = min(begin_idx + batch_size, insert_num)
buffer = []
for row_pos in range(begin_idx, end_idx):
if row_pos == dense_pos_part_end:
dense_pos_part_begin, dense_pos_part_end = dense_part_begin_ends[dense_pair_id_next]
dense_pair_id_next += 1
dense_base_name = f"dense-{dense_pos_part_begin}-{dense_pos_part_end}.fvecs"
dense_data = fvecs_read_yield(os.path.join(dense_embedding_dir, dense_base_name))
if row_pos == sparse_pos_part_end:
sparse_pos_part_begin, sparse_pos_part_end = sparse_part_begin_ends[sparse_pair_id_next]
sparse_pair_id_next += 1
sparse_base_name = f"sparse-{sparse_pos_part_begin}-{sparse_pos_part_end}.data"
sparse_data = read_mldr_sparse_embedding_yield(os.path.join(sparse_embedding_dir, sparse_base_name))
if row_pos == colbert_pos_part_end:
colbert_pos_part_begin, colbert_pos_part_end = colbert_part_begin_ends[colbert_pair_id_next]
colbert_pair_id_next += 1
colbert_base_name = f"colbert-{colbert_pos_part_begin}-{colbert_pos_part_end}.data"
colbert_data = read_colbert_data_yield(os.path.join(colbert_embedding_dir, colbert_base_name))
docid_str = docid_list[row_pos]
insert_dense_data = next(dense_data)
insert_sparse_data = next(sparse_data)
colbert_list = next(colbert_data)
if int(docid_str.split('-')[-1]) >= 189796:
continue
insert_dict = {"docid_col": docid_str, "fulltext_col": corpus_text_list[row_pos],
"dense_col": insert_dense_data, "sparse_col": insert_sparse_data,
"colbert_col": colbert_list, "colbert_bit_col": get_bit_array(colbert_list)}
buffer.append(insert_dict)
if len(buffer) > 0:
self.infinity_table.insert(buffer)
del buffer
print("Finish inserting data.")
del dense_data
del sparse_data
del docid_list
del corpus_text_list
print("Start creating fulltext index.")
ft_params = []
if lang == "zh":
ft_params.append(index.InitParameter("analyzer", "chinese"))
res = self.infinity_table.create_index("ft_index",
[index.IndexInfo("fulltext_col", index.IndexType.FullText, ft_params)],
ConflictType.Error)
assert res.error_code == ErrorCode.OK
print("Finish creating fulltext index.")
print("Start creating Hnsw index...")
res = self.infinity_table.create_index("hnsw_index", [index.IndexInfo("dense_col", index.IndexType.Hnsw,
[index.InitParameter("M", "16"),
index.InitParameter("ef_construction",
"200"),
index.InitParameter("ef", "200"),
index.InitParameter("metric", "ip"),
index.InitParameter("encode", "lvq")])],
ConflictType.Error)
assert res.error_code == ErrorCode.OK
print("Finish creating Hnsw index.")
print("Start creating BMP index...")
res = self.infinity_table.create_index("bmp_index", [index.IndexInfo("sparse_col", index.IndexType.BMP,
[index.InitParameter("block_size", "16"),
index.InitParameter("compress_type",
"compress")])],
ConflictType.Error)
assert res.error_code == ErrorCode.OK
self.infinity_table.optimize("bmp_index", {"topk": "1000"})
print("Finish creating BMP index.")


if __name__ == "__main__":
infinity_client = InfinityClientForInsert()
infinity_client.main()
16 changes: 16 additions & 0 deletions python/benchmark/mldr_benchmark/mldr_common_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,19 @@ def read_mldr_sparse_embedding_yield(file_path: str):
yield one_dict
else:
raise ValueError(f"Unsupported file: {file_path}")


def read_colbert_data_yield(file_path: str):
with open(file_path, 'rb') as f:
while True:
try:
tensor_len = struct.unpack('<i', f.read(4))[0]
one_tensor = []
for i in range(tensor_len):
dim = struct.unpack('<i', f.read(4))[0]
vec = struct.unpack('{}f'.format(dim), f.read(4 * dim))
assert dim == len(vec)
one_tensor.append(list(vec))
yield one_tensor
except struct.error:
break
2 changes: 1 addition & 1 deletion src/common/default_values.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export {
constexpr u64 MAX_VECTOR_CHUNK_COUNT = std::numeric_limits<u64>::max();
// Each row has one chunk.
constexpr u64 DEFAULT_FIXLEN_CHUNK_SIZE = 65536L;
constexpr u64 DEFAULT_FIXLEN_TENSOR_CHUNK_SIZE = 512UL * 1024UL * 4UL;
constexpr u64 DEFAULT_FIXLEN_TENSOR_CHUNK_SIZE = 8192UL * 128UL * 4UL;

// segment related constants
constexpr SizeT DEFAULT_SEGMENT_CAPACITY = 1024 * 8192; // 1024 * 8192 = 8M rows
Expand Down

0 comments on commit 3eddb24

Please sign in to comment.