diff --git a/airbyte-integrations/connectors/destination-pinecone/Dockerfile b/airbyte-integrations/connectors/destination-pinecone/Dockerfile index fe132dc1c6ae..a81ecf1972bf 100644 --- a/airbyte-integrations/connectors/destination-pinecone/Dockerfile +++ b/airbyte-integrations/connectors/destination-pinecone/Dockerfile @@ -38,6 +38,6 @@ COPY destination_pinecone ./destination_pinecone ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.0.13 +LABEL io.airbyte.version=0.0.14 LABEL io.airbyte.name=airbyte/destination-pinecone diff --git a/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/config.py b/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/config.py index 5b0ee1797116..ffa741adebeb 100644 --- a/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/config.py +++ b/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/config.py @@ -2,7 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from typing import Union +from typing import Literal, Optional, Union import dpath.util from airbyte_cdk.destinations.vector_db_based.config import ( @@ -16,6 +16,15 @@ from pydantic import BaseModel, Field +class NamespaceModel(BaseModel): + mode: Literal["constant"] = Field("constant", const=True) + value: Optional[str] = Field( + title="Namespace", + default="", + description="Namespace to use for all records. This is not supported on starter pods", + ) + + class PineconeIndexingModel(BaseModel): pinecone_key: str = Field( ..., @@ -27,6 +36,7 @@ class PineconeIndexingModel(BaseModel): ..., title="Pinecone Environment", description="Pinecone Cloud environment to use", examples=["us-west1-gcp", "gcp-starter"] ) index: str = Field(..., title="Index", description="Pinecone index in your project to load data into") + namespace: Optional[NamespaceModel] class Config: title = "Indexing" diff --git a/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py b/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py index 521504a9856f..8d1a7a7ea10c 100644 --- a/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py +++ b/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py @@ -32,6 +32,7 @@ def __init__(self, config: PineconeIndexingModel, embedding_dimensions: int): self.pinecone_index = pinecone.GRPCIndex(config.index) self.embedding_dimensions = embedding_dimensions + self._namespace = config.namespace.value if config.namespace and config.namespace.value != "" else None def pre_sync(self, catalog: ConfiguredAirbyteCatalog): index_description = pinecone.describe_index(self.config.index) @@ -49,7 +50,7 @@ def delete_vectors(self, filter): top_k = 10000 self.delete_by_metadata(filter, top_k) else: - self.pinecone_index.delete(filter=filter) + self.pinecone_index.delete(filter=filter, namespace=self._namespace) def delete_by_metadata(self, filter, top_k): zero_vector = [0.0] * self.embedding_dimensions @@ -94,7 +95,7 @@ def index(self, document_chunks, delete_ids): serial_batches = create_chunks(pinecone_docs, batch_size=PINECONE_BATCH_SIZE * PARALLELISM_LIMIT) for batch in serial_batches: async_results = [ - self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False) + self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False, namespace=self._namespace) for ids_vectors_chunk in create_chunks(batch, batch_size=PINECONE_BATCH_SIZE) ] # Wait for and retrieve responses (this raises in case of error) @@ -106,6 +107,8 @@ def check(self) -> Optional[str]: actual_dimension = int(description.dimension) if actual_dimension != self.embedding_dimensions: return f"Your embedding configuration will produce vectors with dimension {self.embedding_dimensions:d}, but your index is configured with dimension {actual_dimension:d}. Make sure embedding and indexing configurations match." + if description.pod_type == "starter" and self._namespace is not None: + return "Namespaces are not supported for starter pods." except Exception as e: return format_exception(e) return None diff --git a/airbyte-integrations/connectors/destination-pinecone/integration_tests/spec.json b/airbyte-integrations/connectors/destination-pinecone/integration_tests/spec.json index 37e5719e223f..eb1d9595550e 100644 --- a/airbyte-integrations/connectors/destination-pinecone/integration_tests/spec.json +++ b/airbyte-integrations/connectors/destination-pinecone/integration_tests/spec.json @@ -24,6 +24,25 @@ "title": "Index", "description": "Pinecone index in your project to load data into", "type": "string" + }, + "namespace": { + "title": "NamespaceModel", + "type": "object", + "properties": { + "mode": { + "title": "Mode", + "default": "constant", + "const": "constant", + "enum": ["constant"], + "type": "string" + }, + "value": { + "title": "Namespace", + "description": "Namespace to use for all records. This is not supported on starter pods", + "default": "", + "type": "string" + } + } } }, "required": ["pinecone_key", "pinecone_environment", "index"], diff --git a/airbyte-integrations/connectors/destination-pinecone/metadata.yaml b/airbyte-integrations/connectors/destination-pinecone/metadata.yaml index 8da4b15203a1..8554f936e449 100644 --- a/airbyte-integrations/connectors/destination-pinecone/metadata.yaml +++ b/airbyte-integrations/connectors/destination-pinecone/metadata.yaml @@ -20,7 +20,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 3d2b6f84-7f0d-4e3f-a5e5-7c7d4b50eabd - dockerImageTag: 0.0.13 + dockerImageTag: 0.0.14 dockerRepository: airbyte/destination-pinecone githubIssueLabel: destination-pinecone icon: pinecone.svg diff --git a/airbyte-integrations/connectors/destination-pinecone/unit_tests/pinecone_indexer_test.py b/airbyte-integrations/connectors/destination-pinecone/unit_tests/pinecone_indexer_test.py index 848cb9d4d830..4bce12d16434 100644 --- a/airbyte-integrations/connectors/destination-pinecone/unit_tests/pinecone_indexer_test.py +++ b/airbyte-integrations/connectors/destination-pinecone/unit_tests/pinecone_indexer_test.py @@ -6,13 +6,13 @@ import pytest from airbyte_cdk.models import ConfiguredAirbyteCatalog -from destination_pinecone.config import PineconeIndexingModel +from destination_pinecone.config import NamespaceModel, PineconeIndexingModel from destination_pinecone.indexer import PineconeIndexer from pinecone import IndexDescription -def create_pinecone_indexer(): - config = PineconeIndexingModel(mode="pinecone", pinecone_environment="myenv", pinecone_key="mykey", index="myindex") +def create_pinecone_indexer(namespace=None): + config = PineconeIndexingModel(mode="pinecone", pinecone_environment="myenv", pinecone_key="mykey", index="myindex", namespace=NamespaceModel(mode="constant", value=namespace) if namespace is not None else None) indexer = PineconeIndexer(config, 3) indexer.pinecone_index.delete = MagicMock() @@ -43,8 +43,16 @@ def mock_describe_index(): yield mock -def test_pinecone_index_upsert_and_delete(mock_describe_index): - indexer = create_pinecone_indexer() +@pytest.mark.parametrize( + "namespace, called_with", + [ + (None, None), + ("", None), + ("my_namespace", "my_namespace"), + ] +) +def test_pinecone_index_upsert_and_delete(mock_describe_index, namespace, called_with): + indexer = create_pinecone_indexer(namespace) indexer._pod_type = "p1" indexer.index( [ @@ -53,7 +61,7 @@ def test_pinecone_index_upsert_and_delete(mock_describe_index): ], ["delete_id1", "delete_id2"], ) - indexer.pinecone_index.delete.assert_called_with(filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}}) + indexer.pinecone_index.delete.assert_called_with(filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}}, namespace=called_with) indexer.pinecone_index.upsert.assert_called_with( vectors=( (ANY, [1, 2, 3], {"_ab_stream": "abc", "text": "test"}), @@ -61,6 +69,7 @@ def test_pinecone_index_upsert_and_delete(mock_describe_index): ), async_req=True, show_progress=False, + namespace=called_with, ) @@ -86,6 +95,7 @@ def test_pinecone_index_upsert_and_delete_starter(mock_describe_index): ), async_req=True, show_progress=False, + namespace=None, ) @@ -164,10 +174,18 @@ def generate_catalog(): ) -def test_pinecone_pre_sync(mock_describe_index): - indexer = create_pinecone_indexer() +@pytest.mark.parametrize( + "namespace, called_with", + [ + (None, None), + ("", None), + ("my_namespace", "my_namespace"), + ] +) +def test_pinecone_pre_sync(mock_describe_index, namespace, called_with): + indexer = create_pinecone_indexer(namespace) indexer.pre_sync(generate_catalog()) - indexer.pinecone_index.delete.assert_called_with(filter={"_ab_stream": "example_stream2"}) + indexer.pinecone_index.delete.assert_called_with(filter={"_ab_stream": "example_stream2"}, namespace=called_with) def test_pinecone_pre_sync_starter(mock_describe_index): @@ -180,21 +198,24 @@ def test_pinecone_pre_sync_starter(mock_describe_index): @pytest.mark.parametrize( - "describe_throws,reported_dimensions,check_succeeds", + "describe_throws,reported_dimensions,pod_type,namespace,check_succeeds", [ - (False, 3, True), - (False, 4, False), - (True, 3, False), - (True, 4, False), + (False, 3, "p1", None, True), + (False, 4, "p1", None, False), + (True, 3, "p1", None, False), + (True, 4, "p1", None, False), + (False, 3, "starter", None, True), + (False, 3, "starter", "", True), + (False, 3, "starter", "my_namespace", False), ], ) @patch("pinecone.describe_index") -def test_pinecone_check(describe_mock, describe_throws, reported_dimensions, check_succeeds): - indexer = create_pinecone_indexer() +def test_pinecone_check(describe_mock, describe_throws, reported_dimensions, pod_type, namespace, check_succeeds): + indexer = create_pinecone_indexer(namespace) indexer.embedding_dimensions = 3 if describe_throws: describe_mock.side_effect = Exception("describe failed") - describe_mock.return_value = create_index_description(dimensions=reported_dimensions) + describe_mock.return_value = create_index_description(dimensions=reported_dimensions, pod_type=pod_type) result = indexer.check() if check_succeeds: assert result is None @@ -226,4 +247,5 @@ def test_metadata_normalization(): vectors=((ANY, [1, 2, 3], {"_ab_stream": "abc", "text": "test", "small": "a", "id": 1}),), async_req=True, show_progress=False, + namespace=None ) diff --git a/docs/integrations/destinations/pinecone.md b/docs/integrations/destinations/pinecone.md index 986cca6566a8..b4c72b4964d5 100644 --- a/docs/integrations/destinations/pinecone.md +++ b/docs/integrations/destinations/pinecone.md @@ -74,6 +74,7 @@ OpenAI and Fake embeddings produce vectors with 1536 dimensions, and the Cohere | Version | Date | Pull Request | Subject | |:--------| :--------- |:--------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.0.14 | 2023-09-28 | [#30649](https://github.com/airbytehq/airbyte/pull/30649) | Support namespaces | | 0.0.13 | 2023-09-26 | [#30649](https://github.com/airbytehq/airbyte/pull/30649) | Allow more text splitting options | | 0.0.12 | 2023-09-25 | [#30649](https://github.com/airbytehq/airbyte/pull/30649) | Fix bug with stale documents left on starter pods | | 0.0.11 | 2023-09-22 | [#30649](https://github.com/airbytehq/airbyte/pull/30649) | Set visible certified flag |