From 1e9ed101613d7c7982b742a33d815737fd6c354d Mon Sep 17 00:00:00 2001 From: nateagr <41445208+nateagr@users.noreply.github.com> Date: Thu, 4 May 2023 12:03:30 +0200 Subject: [PATCH] Improve methods to create small and big indexes for more flexibility (#161) create_small_index and create_big_index support multiple input embedding directories and accept a final output directory instead of inferring it. --- autofaiss/indices/distributed.py | 84 +++++++++++++++++--------------- autofaiss/indices/training.py | 8 +-- autofaiss/utils/cast.py | 4 -- 3 files changed, 48 insertions(+), 48 deletions(-) diff --git a/autofaiss/indices/distributed.py b/autofaiss/indices/distributed.py index bd7a0b2..d241284 100644 --- a/autofaiss/indices/distributed.py +++ b/autofaiss/indices/distributed.py @@ -6,6 +6,7 @@ import multiprocessing import os import logging +from uuid import uuid4 from tempfile import TemporaryDirectory import tempfile from typing import Dict, Optional, Iterator, Tuple, Callable, Any, Union, List @@ -362,11 +363,9 @@ def _add_embeddings_to_index( """Add embeddings to index""" # Define output folders - partition = extract_partition_name_from_path(embedding_reader.embeddings_folder) - output_dir = os.path.join(output_root_dir, partition) - index_dest_path = os.path.join(output_dir, "knn.index") - ids_dest_dir = os.path.join(output_dir, "ids") - index_infos_dest_path = os.path.join(output_dir, "index_infos.json") + index_dest_path = os.path.join(output_root_dir, "knn.index") + ids_dest_dir = os.path.join(output_root_dir, "ids") + index_infos_dest_path = os.path.join(output_root_dir, "index_infos.json") # Compute memory available for adding embeddings to index metadata = IndexMetadata(index_key, embedding_reader.count, embedding_reader.dimension, make_direct_map) @@ -401,7 +400,7 @@ def _add_embeddings_to_index( def _add_embeddings_from_dir_to_index( add_embeddings_fn: Callable, - embedding_root_dir: str, + embedding_root_dirs: Union[List[str], str], output_root_dir: str, index_key: str, embedding_column_name: str, @@ -417,7 +416,7 @@ def _add_embeddings_from_dir_to_index( # Read embeddings with Timeit("-> Reading embeddings", indent=2): embedding_reader = EmbeddingReader( - embedding_root_dir, file_format="parquet", embedding_column=embedding_column_name, meta_columns=id_columns + embedding_root_dirs, file_format="parquet", embedding_column=embedding_column_name, meta_columns=id_columns ) # Add embeddings to index @@ -436,24 +435,24 @@ def _add_embeddings_from_dir_to_index( def create_big_index( - embedding_root_dir: str, - ss, + embedding_root_dirs: Union[List[str], str], output_root_dir: str, - id_columns: Optional[List[str]], - should_be_memory_mappable: bool, - max_index_query_time_ms: float, - max_index_memory_usage: str, - min_nearest_neighbors_to_retrieve: int, - embedding_column_name: str, - index_key: str, - index_path: Optional[str], - current_memory_available: str, - nb_cores: Optional[int], - use_gpu: bool, - metric_type: str, - nb_splits_per_big_index: int, - make_direct_map: bool, - temp_root_dir: str, + ss, + id_columns: Optional[List[str]] = None, + should_be_memory_mappable: bool = False, + max_index_query_time_ms: float = 10.0, + max_index_memory_usage: str = "16G", + min_nearest_neighbors_to_retrieve: int = 20, + embedding_column_name: str = "embedding", + index_key: Optional[str] = None, + index_path: Optional[str] = None, + current_memory_available: str = "32G", + nb_cores: Optional[int] = None, + use_gpu: bool = False, + metric_type: str = "ip", + nb_splits_per_big_index: int = 1, + make_direct_map: bool = False, + temp_root_dir: str = "hdfs://root/tmp/distributed_autofaiss_indices", ) -> Optional[Dict[str, str]]: """ Create a big index @@ -461,7 +460,7 @@ def create_big_index( def _create_and_train_index_from_embedding_dir() -> TrainedIndex: trained_index = create_and_train_index_from_embedding_dir( - embedding_root_dir=embedding_root_dir, + embedding_root_dirs=embedding_root_dirs, embedding_column_name=embedding_column_name, index_key=index_key, max_index_memory_usage=max_index_memory_usage, @@ -474,15 +473,16 @@ def _create_and_train_index_from_embedding_dir() -> TrainedIndex: id_columns=id_columns, ) - index_output_root_dir = os.path.join(temp_root_dir, "training", partition) + index_output_root_dir = os.path.join(temp_root_dir, "training", str(uuid4())) output_index_path = save_index(trained_index.index_or_path, index_output_root_dir, "trained_index") - return TrainedIndex(output_index_path, trained_index.index_key, embedding_root_dir) - - partition = extract_partition_name_from_path(embedding_root_dir) + return TrainedIndex(output_index_path, trained_index.index_key, embedding_root_dirs) if not index_path: - # Train index - rdd = ss.sparkContext.parallelize([embedding_root_dir], 1) + # Train index. We use the value 13 below as a magic number to create a partition + # and train the big index on an executor. We don't want to train the big index + # on the driver because we are potentially training multiple big indexes in parallel + # and the driver don't necessarily have enough memory + rdd = ss.sparkContext.parallelize([13], 1) trained_index_path, trained_index_key, _, = rdd.map( lambda _: _create_and_train_index_from_embedding_dir() ).collect()[0] @@ -492,7 +492,7 @@ def _create_and_train_index_from_embedding_dir() -> TrainedIndex: trained_index_key = index_key # Add embeddings to index and compute metrics - partition_temp_root_dir = os.path.join(temp_root_dir, "add_embeddings", partition) + partition_temp_root_dir = os.path.join(temp_root_dir, "add_embeddings", str(uuid4())) index, metrics = _add_embeddings_from_dir_to_index( add_embeddings_fn=partial( add_embeddings_to_index_distributed, @@ -501,7 +501,7 @@ def _create_and_train_index_from_embedding_dir() -> TrainedIndex: temporary_indices_folder=partition_temp_root_dir, nb_indices_to_keep=nb_splits_per_big_index, ), - embedding_root_dir=embedding_root_dir, + embedding_root_dirs=embedding_root_dirs, output_root_dir=output_root_dir, index_key=trained_index_key, embedding_column_name=embedding_column_name, @@ -521,7 +521,7 @@ def _create_and_train_index_from_embedding_dir() -> TrainedIndex: def create_small_index( - embedding_root_dir: str, + embedding_root_dirs: Union[List[str], str], output_root_dir: str, id_columns: Optional[List[str]] = None, should_be_memory_mappable: bool = False, @@ -542,7 +542,7 @@ def create_small_index( """ if not index_path: trained_index = create_and_train_index_from_embedding_dir( - embedding_root_dir=embedding_root_dir, + embedding_root_dirs=embedding_root_dirs, embedding_column_name=embedding_column_name, index_key=index_key, max_index_memory_usage=max_index_memory_usage, @@ -558,7 +558,7 @@ def create_small_index( assert index_key, "index key of the input index must be provided because you provided an index_path" with tempfile.TemporaryDirectory() as tmp_dir: embedding_reader = EmbeddingReader( - embedding_root_dir, + embedding_root_dirs, file_format="parquet", embedding_column=embedding_column_name, meta_columns=id_columns, @@ -611,12 +611,17 @@ def create_partitioned_indexes( i.e. create and train one index per parquet partition """ + def _infer_index_output_dir(embedding_root_dir: str) -> str: + """Infer index output directory from input embedding directory""" + partition = extract_partition_name_from_path(embedding_root_dir) + return os.path.join(output_root_dir, partition) + def _create_small_indexes(embedding_root_dirs: List[str]) -> List[Optional[Dict[str, str]]]: rdd = ss.sparkContext.parallelize(embedding_root_dirs, len(embedding_root_dirs)) return rdd.map( lambda embedding_root_dir: create_small_index( - embedding_root_dir=embedding_root_dir, - output_root_dir=output_root_dir, + embedding_root_dirs=embedding_root_dir, + output_root_dir=_infer_index_output_dir(embedding_root_dir), id_columns=id_columns, should_be_memory_mappable=should_be_memory_mappable, max_index_query_time_ms=max_index_query_time_ms, @@ -638,7 +643,6 @@ def _create_small_indexes(embedding_root_dirs: List[str]) -> List[Optional[Dict[ create_big_index_fn = partial( create_big_index, ss=ss, - output_root_dir=output_root_dir, id_columns=id_columns, should_be_memory_mappable=should_be_memory_mappable, max_index_query_time_ms=max_index_query_time_ms, @@ -681,7 +685,7 @@ def _create_small_indexes(embedding_root_dirs: List[str]) -> List[Optional[Dict[ small_index_metrics_future = ( p.apply_async(_create_small_indexes, (small_partitions,)) if small_partitions else None ) - for metrics in p.starmap(create_big_index_fn, [(p,) for p in big_partitions]): + for metrics in p.starmap(create_big_index_fn, [(p, _infer_index_output_dir(p)) for p in big_partitions]): all_metrics.append(metrics) if small_index_metrics_future: all_metrics.extend(small_index_metrics_future.get()) diff --git a/autofaiss/indices/training.py b/autofaiss/indices/training.py index 108604c..826c9f5 100644 --- a/autofaiss/indices/training.py +++ b/autofaiss/indices/training.py @@ -21,7 +21,7 @@ class TrainedIndex(NamedTuple): index_or_path: Union[faiss.Index, str] index_key: str - embedding_reader_or_path: Union[EmbeddingReader, str] + embedding_reader_or_path: Union[EmbeddingReader, str, List[str]] def create_empty_index(vec_dim: int, index_key: str, metric_type: Union[str, int]) -> faiss.Index: @@ -110,7 +110,7 @@ def create_and_train_new_index( def create_and_train_index_from_embedding_dir( - embedding_root_dir: str, + embedding_root_dirs: Union[List[str], str], embedding_column_name: str, max_index_memory_usage: str, make_direct_map: bool, @@ -131,7 +131,7 @@ def create_and_train_index_from_embedding_dir( # Read embeddings with Timeit("-> Reading embeddings", indent=2): embedding_reader = EmbeddingReader( - embedding_root_dir, file_format="parquet", embedding_column=embedding_column_name, meta_columns=id_columns + embedding_root_dirs, file_format="parquet", embedding_column=embedding_column_name, meta_columns=id_columns ) # Define index key @@ -145,7 +145,7 @@ def create_and_train_index_from_embedding_dir( use_gpu=use_gpu, ) if not best_index_keys: - raise RuntimeError(f"Unable to find optimal index key from embedding directory {embedding_root_dir}") + raise RuntimeError(f"Unable to find optimal index key from embedding directory {embedding_root_dirs}") index_key = best_index_keys[0] # Create metadata diff --git a/autofaiss/utils/cast.py b/autofaiss/utils/cast.py index 4dae426..214e021 100644 --- a/autofaiss/utils/cast.py +++ b/autofaiss/utils/cast.py @@ -40,10 +40,6 @@ def cast_memory_to_bytes(memory_string: str) -> float: def cast_bytes_to_memory_string(num_bytes: float) -> str: """ Cast a number of bytes to a readable string - - >>> from autofaiss.utils.cast import cast_bytes_to_memory_string - >>> cast_bytes_to_memory_string(16.*1024*1024*1024) == "16.0GB" - True """ suffix = "B"