Skip to content

Commit

Permalink
Improve methods to create small and big indexes for more flexibility (#…
Browse files Browse the repository at this point in the history
…161)

create_small_index and create_big_index support multiple input embedding directories and accept a final output directory instead of inferring it.
  • Loading branch information
nateagr authored May 4, 2023
1 parent 2b3649a commit 1e9ed10
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 48 deletions.
84 changes: 44 additions & 40 deletions autofaiss/indices/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import multiprocessing
import os
import logging
from uuid import uuid4
from tempfile import TemporaryDirectory
import tempfile
from typing import Dict, Optional, Iterator, Tuple, Callable, Any, Union, List
Expand Down Expand Up @@ -362,11 +363,9 @@ def _add_embeddings_to_index(
"""Add embeddings to index"""

# Define output folders
partition = extract_partition_name_from_path(embedding_reader.embeddings_folder)
output_dir = os.path.join(output_root_dir, partition)
index_dest_path = os.path.join(output_dir, "knn.index")
ids_dest_dir = os.path.join(output_dir, "ids")
index_infos_dest_path = os.path.join(output_dir, "index_infos.json")
index_dest_path = os.path.join(output_root_dir, "knn.index")
ids_dest_dir = os.path.join(output_root_dir, "ids")
index_infos_dest_path = os.path.join(output_root_dir, "index_infos.json")

# Compute memory available for adding embeddings to index
metadata = IndexMetadata(index_key, embedding_reader.count, embedding_reader.dimension, make_direct_map)
Expand Down Expand Up @@ -401,7 +400,7 @@ def _add_embeddings_to_index(

def _add_embeddings_from_dir_to_index(
add_embeddings_fn: Callable,
embedding_root_dir: str,
embedding_root_dirs: Union[List[str], str],
output_root_dir: str,
index_key: str,
embedding_column_name: str,
Expand All @@ -417,7 +416,7 @@ def _add_embeddings_from_dir_to_index(
# Read embeddings
with Timeit("-> Reading embeddings", indent=2):
embedding_reader = EmbeddingReader(
embedding_root_dir, file_format="parquet", embedding_column=embedding_column_name, meta_columns=id_columns
embedding_root_dirs, file_format="parquet", embedding_column=embedding_column_name, meta_columns=id_columns
)

# Add embeddings to index
Expand All @@ -436,32 +435,32 @@ def _add_embeddings_from_dir_to_index(


def create_big_index(
embedding_root_dir: str,
ss,
embedding_root_dirs: Union[List[str], str],
output_root_dir: str,
id_columns: Optional[List[str]],
should_be_memory_mappable: bool,
max_index_query_time_ms: float,
max_index_memory_usage: str,
min_nearest_neighbors_to_retrieve: int,
embedding_column_name: str,
index_key: str,
index_path: Optional[str],
current_memory_available: str,
nb_cores: Optional[int],
use_gpu: bool,
metric_type: str,
nb_splits_per_big_index: int,
make_direct_map: bool,
temp_root_dir: str,
ss,
id_columns: Optional[List[str]] = None,
should_be_memory_mappable: bool = False,
max_index_query_time_ms: float = 10.0,
max_index_memory_usage: str = "16G",
min_nearest_neighbors_to_retrieve: int = 20,
embedding_column_name: str = "embedding",
index_key: Optional[str] = None,
index_path: Optional[str] = None,
current_memory_available: str = "32G",
nb_cores: Optional[int] = None,
use_gpu: bool = False,
metric_type: str = "ip",
nb_splits_per_big_index: int = 1,
make_direct_map: bool = False,
temp_root_dir: str = "hdfs://root/tmp/distributed_autofaiss_indices",
) -> Optional[Dict[str, str]]:
"""
Create a big index
"""

def _create_and_train_index_from_embedding_dir() -> TrainedIndex:
trained_index = create_and_train_index_from_embedding_dir(
embedding_root_dir=embedding_root_dir,
embedding_root_dirs=embedding_root_dirs,
embedding_column_name=embedding_column_name,
index_key=index_key,
max_index_memory_usage=max_index_memory_usage,
Expand All @@ -474,15 +473,16 @@ def _create_and_train_index_from_embedding_dir() -> TrainedIndex:
id_columns=id_columns,
)

index_output_root_dir = os.path.join(temp_root_dir, "training", partition)
index_output_root_dir = os.path.join(temp_root_dir, "training", str(uuid4()))
output_index_path = save_index(trained_index.index_or_path, index_output_root_dir, "trained_index")
return TrainedIndex(output_index_path, trained_index.index_key, embedding_root_dir)

partition = extract_partition_name_from_path(embedding_root_dir)
return TrainedIndex(output_index_path, trained_index.index_key, embedding_root_dirs)

if not index_path:
# Train index
rdd = ss.sparkContext.parallelize([embedding_root_dir], 1)
# Train index. We use the value 13 below as a magic number to create a partition
# and train the big index on an executor. We don't want to train the big index
# on the driver because we are potentially training multiple big indexes in parallel
# and the driver don't necessarily have enough memory
rdd = ss.sparkContext.parallelize([13], 1)
trained_index_path, trained_index_key, _, = rdd.map(
lambda _: _create_and_train_index_from_embedding_dir()
).collect()[0]
Expand All @@ -492,7 +492,7 @@ def _create_and_train_index_from_embedding_dir() -> TrainedIndex:
trained_index_key = index_key

# Add embeddings to index and compute metrics
partition_temp_root_dir = os.path.join(temp_root_dir, "add_embeddings", partition)
partition_temp_root_dir = os.path.join(temp_root_dir, "add_embeddings", str(uuid4()))
index, metrics = _add_embeddings_from_dir_to_index(
add_embeddings_fn=partial(
add_embeddings_to_index_distributed,
Expand All @@ -501,7 +501,7 @@ def _create_and_train_index_from_embedding_dir() -> TrainedIndex:
temporary_indices_folder=partition_temp_root_dir,
nb_indices_to_keep=nb_splits_per_big_index,
),
embedding_root_dir=embedding_root_dir,
embedding_root_dirs=embedding_root_dirs,
output_root_dir=output_root_dir,
index_key=trained_index_key,
embedding_column_name=embedding_column_name,
Expand All @@ -521,7 +521,7 @@ def _create_and_train_index_from_embedding_dir() -> TrainedIndex:


def create_small_index(
embedding_root_dir: str,
embedding_root_dirs: Union[List[str], str],
output_root_dir: str,
id_columns: Optional[List[str]] = None,
should_be_memory_mappable: bool = False,
Expand All @@ -542,7 +542,7 @@ def create_small_index(
"""
if not index_path:
trained_index = create_and_train_index_from_embedding_dir(
embedding_root_dir=embedding_root_dir,
embedding_root_dirs=embedding_root_dirs,
embedding_column_name=embedding_column_name,
index_key=index_key,
max_index_memory_usage=max_index_memory_usage,
Expand All @@ -558,7 +558,7 @@ def create_small_index(
assert index_key, "index key of the input index must be provided because you provided an index_path"
with tempfile.TemporaryDirectory() as tmp_dir:
embedding_reader = EmbeddingReader(
embedding_root_dir,
embedding_root_dirs,
file_format="parquet",
embedding_column=embedding_column_name,
meta_columns=id_columns,
Expand Down Expand Up @@ -611,12 +611,17 @@ def create_partitioned_indexes(
i.e. create and train one index per parquet partition
"""

def _infer_index_output_dir(embedding_root_dir: str) -> str:
"""Infer index output directory from input embedding directory"""
partition = extract_partition_name_from_path(embedding_root_dir)
return os.path.join(output_root_dir, partition)

def _create_small_indexes(embedding_root_dirs: List[str]) -> List[Optional[Dict[str, str]]]:
rdd = ss.sparkContext.parallelize(embedding_root_dirs, len(embedding_root_dirs))
return rdd.map(
lambda embedding_root_dir: create_small_index(
embedding_root_dir=embedding_root_dir,
output_root_dir=output_root_dir,
embedding_root_dirs=embedding_root_dir,
output_root_dir=_infer_index_output_dir(embedding_root_dir),
id_columns=id_columns,
should_be_memory_mappable=should_be_memory_mappable,
max_index_query_time_ms=max_index_query_time_ms,
Expand All @@ -638,7 +643,6 @@ def _create_small_indexes(embedding_root_dirs: List[str]) -> List[Optional[Dict[
create_big_index_fn = partial(
create_big_index,
ss=ss,
output_root_dir=output_root_dir,
id_columns=id_columns,
should_be_memory_mappable=should_be_memory_mappable,
max_index_query_time_ms=max_index_query_time_ms,
Expand Down Expand Up @@ -681,7 +685,7 @@ def _create_small_indexes(embedding_root_dirs: List[str]) -> List[Optional[Dict[
small_index_metrics_future = (
p.apply_async(_create_small_indexes, (small_partitions,)) if small_partitions else None
)
for metrics in p.starmap(create_big_index_fn, [(p,) for p in big_partitions]):
for metrics in p.starmap(create_big_index_fn, [(p, _infer_index_output_dir(p)) for p in big_partitions]):
all_metrics.append(metrics)
if small_index_metrics_future:
all_metrics.extend(small_index_metrics_future.get())
Expand Down
8 changes: 4 additions & 4 deletions autofaiss/indices/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
class TrainedIndex(NamedTuple):
index_or_path: Union[faiss.Index, str]
index_key: str
embedding_reader_or_path: Union[EmbeddingReader, str]
embedding_reader_or_path: Union[EmbeddingReader, str, List[str]]


def create_empty_index(vec_dim: int, index_key: str, metric_type: Union[str, int]) -> faiss.Index:
Expand Down Expand Up @@ -110,7 +110,7 @@ def create_and_train_new_index(


def create_and_train_index_from_embedding_dir(
embedding_root_dir: str,
embedding_root_dirs: Union[List[str], str],
embedding_column_name: str,
max_index_memory_usage: str,
make_direct_map: bool,
Expand All @@ -131,7 +131,7 @@ def create_and_train_index_from_embedding_dir(
# Read embeddings
with Timeit("-> Reading embeddings", indent=2):
embedding_reader = EmbeddingReader(
embedding_root_dir, file_format="parquet", embedding_column=embedding_column_name, meta_columns=id_columns
embedding_root_dirs, file_format="parquet", embedding_column=embedding_column_name, meta_columns=id_columns
)

# Define index key
Expand All @@ -145,7 +145,7 @@ def create_and_train_index_from_embedding_dir(
use_gpu=use_gpu,
)
if not best_index_keys:
raise RuntimeError(f"Unable to find optimal index key from embedding directory {embedding_root_dir}")
raise RuntimeError(f"Unable to find optimal index key from embedding directory {embedding_root_dirs}")
index_key = best_index_keys[0]

# Create metadata
Expand Down
4 changes: 0 additions & 4 deletions autofaiss/utils/cast.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ def cast_memory_to_bytes(memory_string: str) -> float:
def cast_bytes_to_memory_string(num_bytes: float) -> str:
"""
Cast a number of bytes to a readable string
>>> from autofaiss.utils.cast import cast_bytes_to_memory_string
>>> cast_bytes_to_memory_string(16.*1024*1024*1024) == "16.0GB"
True
"""

suffix = "B"
Expand Down

0 comments on commit 1e9ed10

Please sign in to comment.