diff --git a/langchain_postgres/v2/async_vectorstore.py b/langchain_postgres/v2/async_vectorstore.py index fd0bfd7..47056e1 100644 --- a/langchain_postgres/v2/async_vectorstore.py +++ b/langchain_postgres/v2/async_vectorstore.py @@ -935,15 +935,15 @@ async def aapply_vector_index( text(f"CREATE EXTENSION IF NOT EXISTS {index.extension_name}") ) await conn.commit() - function = index.get_index_function() + operator_class = index.operator_class() filter = f"WHERE ({index.partial_indexes})" if index.partial_indexes else "" params = "WITH " + index.index_options() if name is None: if index.name is None: index.name = self.table_name + DEFAULT_INDEX_NAME_SUFFIX name = index.name - stmt = f'CREATE INDEX {"CONCURRENTLY" if concurrently else ""} "{name}" ON "{self.schema_name}"."{self.table_name}" USING {index.index_type} ({self.embedding_column} {function}) {params} {filter};' + stmt = f'CREATE INDEX {"CONCURRENTLY" if concurrently else ""} "{name}" ON "{self.schema_name}"."{self.table_name}" USING {index.index_type} ({self.embedding_column} {operator_class}) {params} {filter};' if concurrently: async with self.engine.connect() as conn: diff --git a/langchain_postgres/v2/engine.py b/langchain_postgres/v2/engine.py index 219717e..3d505fc 100644 --- a/langchain_postgres/v2/engine.py +++ b/langchain_postgres/v2/engine.py @@ -10,6 +10,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from .hybrid_search_config import HybridSearchConfig +from .indexes import DEFAULT_VECTOR_TYPE, VectorType T = TypeVar("T") @@ -150,6 +151,7 @@ async def _ainit_vectorstore_table( table_name: str, vector_size: int, *, + vector_type: VectorType = DEFAULT_VECTOR_TYPE, schema_name: str = "public", content_column: str = "content", embedding_column: str = "embedding", @@ -166,6 +168,8 @@ async def _ainit_vectorstore_table( Args: table_name (str): The database table name. vector_size (int): Vector size for the embedding model to be used. + vector_type (VectorType): Type of the vector column to store embeddings. + Default: VectorType.VECTOR. schema_name (str): The schema name. Default: "public". content_column (str): Name of the column to store document content. @@ -194,6 +198,8 @@ async def _ainit_vectorstore_table( hybrid_search_default_column_name = content_column + "_tsv" content_column = self._escape_postgres_identifier(content_column) embedding_column = self._escape_postgres_identifier(embedding_column) + embedding_column_type = f"{vector_type.value}({vector_size})" + if metadata_columns is None: metadata_columns = [] else: @@ -246,7 +252,7 @@ async def _ainit_vectorstore_table( query = f"""CREATE TABLE "{schema_name}"."{table_name}"( "{id_column_name}" {id_data_type} PRIMARY KEY, "{content_column}" TEXT NOT NULL, - "{embedding_column}" vector({vector_size}) NOT NULL + "{embedding_column}" {embedding_column_type} NOT NULL {hybrid_search_column}""" for column in metadata_columns: if isinstance(column, Column): @@ -268,6 +274,7 @@ async def ainit_vectorstore_table( table_name: str, vector_size: int, *, + vector_type: VectorType = DEFAULT_VECTOR_TYPE, schema_name: str = "public", content_column: str = "content", embedding_column: str = "embedding", @@ -284,6 +291,8 @@ async def ainit_vectorstore_table( Args: table_name (str): The database table name. vector_size (int): Vector size for the embedding model to be used. + vector_type (VectorType): Type of the vector column to store embeddings. + Default: VectorType.VECTOR. schema_name (str): The schema name. Default: "public". content_column (str): Name of the column to store document content. @@ -308,6 +317,7 @@ async def ainit_vectorstore_table( self._ainit_vectorstore_table( table_name, vector_size, + vector_type=vector_type, schema_name=schema_name, content_column=content_column, embedding_column=embedding_column, @@ -325,6 +335,7 @@ def init_vectorstore_table( table_name: str, vector_size: int, *, + vector_type: VectorType = DEFAULT_VECTOR_TYPE, schema_name: str = "public", content_column: str = "content", embedding_column: str = "embedding", @@ -341,6 +352,8 @@ def init_vectorstore_table( Args: table_name (str): The database table name. vector_size (int): Vector size for the embedding model to be used. + vector_type (VectorType): Type of the vector column to store embeddings. + Default: VectorType.VECTOR. schema_name (str): The schema name. Default: "public". content_column (str): Name of the column to store document content. @@ -365,6 +378,7 @@ def init_vectorstore_table( self._ainit_vectorstore_table( table_name, vector_size, + vector_type=vector_type, schema_name=schema_name, content_column=content_column, embedding_column=embedding_column, diff --git a/langchain_postgres/v2/indexes.py b/langchain_postgres/v2/indexes.py index 8dbcc4f..da27e6a 100644 --- a/langchain_postgres/v2/indexes.py +++ b/langchain_postgres/v2/indexes.py @@ -15,21 +15,30 @@ class StrategyMixin: operator: str search_function: str - index_function: str + operator_class_suffix: str class DistanceStrategy(StrategyMixin, enum.Enum): """Enumerator of the Distance strategies.""" - EUCLIDEAN = "<->", "l2_distance", "vector_l2_ops" - COSINE_DISTANCE = "<=>", "cosine_distance", "vector_cosine_ops" - INNER_PRODUCT = "<#>", "inner_product", "vector_ip_ops" + EUCLIDEAN = "<->", "l2_distance", "l2_ops" + COSINE_DISTANCE = "<=>", "cosine_distance", "cosine_ops" + INNER_PRODUCT = "<#>", "inner_product", "ip_ops" DEFAULT_DISTANCE_STRATEGY: DistanceStrategy = DistanceStrategy.COSINE_DISTANCE DEFAULT_INDEX_NAME_SUFFIX: str = "langchainvectorindex" +class VectorType(enum.Enum): + VECTOR = "vector" + HALFVEC = "halfvec" + SPARSEVEC = "sparsevec" + + +DEFAULT_VECTOR_TYPE: VectorType = VectorType.VECTOR + + def validate_identifier(identifier: str) -> None: if re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", identifier) is None: raise ValueError( @@ -47,6 +56,8 @@ class BaseIndex(ABC): index_type (str): A string identifying the type of index. Defaults to "base". distance_strategy (DistanceStrategy): The strategy used to calculate distances between vectors in the index. Defaults to DistanceStrategy.COSINE_DISTANCE. + vector_type (VectorType): The type of vector column, + on which the index will be created. Defaults to VectorType.VECTOR partial_indexes (Optional[list[str]]): A list of names of partial indexes. Defaults to None. extension_name (Optional[str]): The name of the extension to be created for the index, if any. Defaults to None. """ @@ -56,6 +67,7 @@ class BaseIndex(ABC): distance_strategy: DistanceStrategy = field( default_factory=lambda: DistanceStrategy.COSINE_DISTANCE ) + vector_type: VectorType = DEFAULT_VECTOR_TYPE partial_indexes: Optional[list[str]] = None extension_name: Optional[str] = None @@ -66,8 +78,11 @@ def index_options(self) -> str: "index_options method must be implemented by subclass" ) - def get_index_function(self) -> str: - return self.distance_strategy.index_function + def operator_class(self) -> str: + """Returns index operator class, based on vector type and distance strategy.""" + return ( + f"{self.vector_type.value}_{self.distance_strategy.operator_class_suffix}" + ) def __post_init__(self) -> None: """Check if initialization parameters are valid. @@ -133,6 +148,19 @@ class IVFFlatIndex(BaseIndex): index_type: str = "ivfflat" lists: int = 100 + def __post_init__(self) -> None: + """Check if vector_type is valid. + + Raises: + ValueError: if vector_type is SPARSEVEC + """ + super().__post_init__() + + if self.vector_type is VectorType.SPARSEVEC: + raise ValueError( + "IVFFlatIndex does not support sparsevec, use VECTOR or HALFVEC instead" + ) + def index_options(self) -> str: """Set index query options for vector store initialization.""" return f"(lists = {self.lists})" diff --git a/tests/unit_tests/v2/test_async_pg_vectorstore_index.py b/tests/unit_tests/v2/test_async_pg_vectorstore_index.py index 8585bcd..c50fa5a 100644 --- a/tests/unit_tests/v2/test_async_pg_vectorstore_index.py +++ b/tests/unit_tests/v2/test_async_pg_vectorstore_index.py @@ -11,15 +11,22 @@ from langchain_postgres import PGEngine from langchain_postgres.v2.async_vectorstore import AsyncPGVectorStore from langchain_postgres.v2.hybrid_search_config import HybridSearchConfig -from langchain_postgres.v2.indexes import DistanceStrategy, HNSWIndex, IVFFlatIndex +from langchain_postgres.v2.indexes import ( + DistanceStrategy, + HNSWIndex, + IVFFlatIndex, + VectorType, +) from tests.utils import VECTORSTORE_CONNECTION_STRING as CONNECTION_STRING uuid_str = str(uuid.uuid4()).replace("-", "_") DEFAULT_TABLE = "default" + uuid_str DEFAULT_HYBRID_TABLE = "hybrid" + uuid_str +SIMPLE_TABLE = "simple" + uuid_str +HALFVEC_TABLE = "halfvec" + uuid_str + DEFAULT_INDEX_NAME = "index" + uuid_str VECTOR_SIZE = 768 -SIMPLE_TABLE = "default_table" embeddings_service = DeterministicFakeEmbedding(size=VECTOR_SIZE) @@ -57,6 +64,7 @@ async def engine(self) -> AsyncIterator[PGEngine]: await engine._adrop_table(DEFAULT_TABLE) await engine._adrop_table(DEFAULT_HYBRID_TABLE) await engine._adrop_table(SIMPLE_TABLE) + await engine._adrop_table(HALFVEC_TABLE) await engine.close() @pytest_asyncio.fixture(scope="class") @@ -94,6 +102,25 @@ async def test_aapply_vector_index(self, vs: AsyncPGVectorStore) -> None: assert await vs.is_valid_index(DEFAULT_INDEX_NAME) await vs.adrop_vector_index(DEFAULT_INDEX_NAME) + async def test_aapply_vector_index_halfvec(self, engine: PGEngine) -> None: + await engine._ainit_vectorstore_table( + HALFVEC_TABLE, + VECTOR_SIZE, + vector_type=VectorType.HALFVEC, + overwrite_existing=True, + ) + vs = await AsyncPGVectorStore.create( + engine, + embedding_service=embeddings_service, + table_name=HALFVEC_TABLE, + ) + await vs.aadd_texts(texts, ids=ids) + await vs.adrop_vector_index() + index = HNSWIndex(name=DEFAULT_INDEX_NAME, vector_type=VectorType.HALFVEC) + await vs.aapply_vector_index(index) + assert await vs.is_valid_index(DEFAULT_INDEX_NAME) + await vs.adrop_vector_index(DEFAULT_INDEX_NAME) + async def test_aapply_vector_index_non_hybrid_search_vs( self, vs: AsyncPGVectorStore ) -> None: diff --git a/tests/unit_tests/v2/test_engine.py b/tests/unit_tests/v2/test_engine.py index 66f299a..87b1531 100644 --- a/tests/unit_tests/v2/test_engine.py +++ b/tests/unit_tests/v2/test_engine.py @@ -12,6 +12,7 @@ from langchain_postgres import Column, PGEngine from langchain_postgres.v2.hybrid_search_config import HybridSearchConfig +from langchain_postgres.v2.indexes import VectorType from tests.utils import VECTORSTORE_CONNECTION_STRING as CONNECTION_STRING DEFAULT_TABLE = "default" + str(uuid.uuid4()).replace("-", "_") @@ -19,11 +20,13 @@ HYBRID_SEARCH_TABLE = "hybrid" + str(uuid.uuid4()).replace("-", "_") CUSTOM_TYPEDDICT_TABLE = "custom_td" + str(uuid.uuid4()).replace("-", "_") INT_ID_CUSTOM_TABLE = "custom_int_id" + str(uuid.uuid4()).replace("-", "_") +CUSTOM_VECTOR_TYPE_TABLE = "custom_vt" + str(uuid.uuid4()).replace("-", "_") DEFAULT_TABLE_SYNC = "default_sync" + str(uuid.uuid4()).replace("-", "_") CUSTOM_TABLE_SYNC = "custom_sync" + str(uuid.uuid4()).replace("-", "_") HYBRID_SEARCH_TABLE_SYNC = "hybrid_sync" + str(uuid.uuid4()).replace("-", "_") CUSTOM_TYPEDDICT_TABLE_SYNC = "custom_td_sync" + str(uuid.uuid4()).replace("-", "_") INT_ID_CUSTOM_TABLE_SYNC = "custom_int_id_sync" + str(uuid.uuid4()).replace("-", "_") +CUSTOM_VECTOR_TYPE_TABLE_SYNC = "custom_vt_sync" + str(uuid.uuid4()).replace("-", "_") VECTOR_SIZE = 768 embeddings_service = DeterministicFakeEmbedding(size=VECTOR_SIZE) @@ -76,6 +79,7 @@ async def engine(self) -> AsyncIterator[PGEngine]: await aexecute(engine, f'DROP TABLE IF EXISTS "{CUSTOM_TYPEDDICT_TABLE}"') await aexecute(engine, f'DROP TABLE IF EXISTS "{DEFAULT_TABLE}"') await aexecute(engine, f'DROP TABLE IF EXISTS "{INT_ID_CUSTOM_TABLE}"') + await aexecute(engine, f'DROP TABLE IF EXISTS "{CUSTOM_VECTOR_TYPE_TABLE}"') await engine.close() async def test_init_table(self, engine: PGEngine) -> None: @@ -219,6 +223,22 @@ async def test_init_table_with_int_id(self, engine: PGEngine) -> None: for row in results: assert row in expected + async def test_init_table_custom_vector_type(self, engine: PGEngine) -> None: + await engine.ainit_vectorstore_table( + CUSTOM_VECTOR_TYPE_TABLE, + VECTOR_SIZE, + vector_type=VectorType.HALFVEC, + embedding_column="my_embedding", + ) + stmt = ( + "SELECT column_name, udt_name " + f"FROM information_schema.columns " + f"WHERE table_name = '{CUSTOM_VECTOR_TYPE_TABLE}' AND column_name = 'my_embedding';" + ) + + results = await afetch(engine, stmt) + assert results == [{"column_name": "my_embedding", "udt_name": "halfvec"}] + async def test_from_engine(self) -> None: engine = create_async_engine( CONNECTION_STRING, @@ -264,6 +284,9 @@ async def engine(self) -> AsyncIterator[PGEngine]: await aexecute(engine, f'DROP TABLE IF EXISTS "{DEFAULT_TABLE_SYNC}"') await aexecute(engine, f'DROP TABLE IF EXISTS "{INT_ID_CUSTOM_TABLE_SYNC}"') await aexecute(engine, f'DROP TABLE IF EXISTS "{CUSTOM_TYPEDDICT_TABLE_SYNC}"') + await aexecute( + engine, f'DROP TABLE IF EXISTS "{CUSTOM_VECTOR_TYPE_TABLE_SYNC}"' + ) await engine.close() async def test_init_table(self, engine: PGEngine) -> None: @@ -403,6 +426,22 @@ async def test_init_table_with_int_id(self, engine: PGEngine) -> None: for row in results: assert row in expected + async def test_init_table_custom_vector_type(self, engine: PGEngine) -> None: + engine.init_vectorstore_table( + CUSTOM_VECTOR_TYPE_TABLE_SYNC, + VECTOR_SIZE, + vector_type=VectorType.HALFVEC, + embedding_column="my_embedding", + ) + stmt = ( + "SELECT column_name, udt_name " + f"FROM information_schema.columns " + f"WHERE table_name = '{CUSTOM_VECTOR_TYPE_TABLE_SYNC}' AND column_name = 'my_embedding';" + ) + + results = await afetch(engine, stmt) + assert results == [{"column_name": "my_embedding", "udt_name": "halfvec"}] + async def test_engine_constructor_key( self, engine: PGEngine, diff --git a/tests/unit_tests/v2/test_indexes.py b/tests/unit_tests/v2/test_indexes.py index 8359b48..4359ff2 100644 --- a/tests/unit_tests/v2/test_indexes.py +++ b/tests/unit_tests/v2/test_indexes.py @@ -8,6 +8,7 @@ HNSWQueryOptions, IVFFlatIndex, IVFFlatQueryOptions, + VectorType, ) @@ -16,15 +17,50 @@ class TestPGIndex: def test_distance_strategy(self) -> None: assert DistanceStrategy.EUCLIDEAN.operator == "<->" assert DistanceStrategy.EUCLIDEAN.search_function == "l2_distance" - assert DistanceStrategy.EUCLIDEAN.index_function == "vector_l2_ops" + assert DistanceStrategy.EUCLIDEAN.operator_class_suffix == "l2_ops" assert DistanceStrategy.COSINE_DISTANCE.operator == "<=>" assert DistanceStrategy.COSINE_DISTANCE.search_function == "cosine_distance" - assert DistanceStrategy.COSINE_DISTANCE.index_function == "vector_cosine_ops" + assert DistanceStrategy.COSINE_DISTANCE.operator_class_suffix == "cosine_ops" assert DistanceStrategy.INNER_PRODUCT.operator == "<#>" assert DistanceStrategy.INNER_PRODUCT.search_function == "inner_product" - assert DistanceStrategy.INNER_PRODUCT.index_function == "vector_ip_ops" + assert DistanceStrategy.INNER_PRODUCT.operator_class_suffix == "ip_ops" + + @pytest.mark.parametrize( + "vector_type, distance_strategy, expected", + [ + ( + VectorType.VECTOR, + DistanceStrategy.COSINE_DISTANCE, + "vector_cosine_ops", + ), + (VectorType.VECTOR, DistanceStrategy.EUCLIDEAN, "vector_l2_ops"), + (VectorType.VECTOR, DistanceStrategy.INNER_PRODUCT, "vector_ip_ops"), + ( + VectorType.HALFVEC, + DistanceStrategy.COSINE_DISTANCE, + "halfvec_cosine_ops", + ), + (VectorType.HALFVEC, DistanceStrategy.EUCLIDEAN, "halfvec_l2_ops"), + (VectorType.HALFVEC, DistanceStrategy.INNER_PRODUCT, "halfvec_ip_ops"), + ( + VectorType.SPARSEVEC, + DistanceStrategy.COSINE_DISTANCE, + "sparsevec_cosine_ops", + ), + (VectorType.SPARSEVEC, DistanceStrategy.EUCLIDEAN, "sparsevec_l2_ops"), + (VectorType.SPARSEVEC, DistanceStrategy.INNER_PRODUCT, "sparsevec_ip_ops"), + ], + ) + def test_operator_class_by_vector_type( + self, + vector_type: VectorType, + distance_strategy: DistanceStrategy, + expected: str, + ) -> None: + idx = HNSWIndex(vector_type=vector_type, distance_strategy=distance_strategy) + assert idx.operator_class() == expected def test_hnsw_index(self) -> None: index = HNSWIndex(name="test_index", m=32, ef_construction=128) @@ -32,6 +68,7 @@ def test_hnsw_index(self) -> None: assert index.m == 32 assert index.ef_construction == 128 assert index.index_options() == "(m = 32, ef_construction = 128)" + assert index.operator_class() == "vector_cosine_ops" def test_hnsw_query_options(self) -> None: options = HNSWQueryOptions(ef_search=80) @@ -50,6 +87,7 @@ def test_ivfflat_index(self) -> None: assert index.index_type == "ivfflat" assert index.lists == 200 assert index.index_options() == "(lists = 200)" + assert index.operator_class() == "vector_cosine_ops" def test_ivfflat_query_options(self) -> None: options = IVFFlatQueryOptions(probes=2)