From bee749b04bf83b114dc1f176c6f6191544ec5a2d Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Thu, 26 Oct 2023 16:10:23 +0200 Subject: [PATCH] Pinecone: Namespace and field remapping support (#31373) Co-authored-by: flash1293 Co-authored-by: alafanechere --- .../destination_pinecone/indexer.py | 24 ++++++----- .../integration_tests/spec.json | 23 +++++++++++ .../destination-pinecone/metadata.yaml | 2 +- .../connectors/destination-pinecone/setup.py | 2 +- .../unit_tests/pinecone_indexer_test.py | 40 ++++++++++++------- docs/integrations/destinations/pinecone.md | 3 +- 6 files changed, 66 insertions(+), 28 deletions(-) diff --git a/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py b/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py index 5a2266971089..92c64d676b0e 100644 --- a/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py +++ b/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py @@ -8,7 +8,7 @@ import pinecone from airbyte_cdk.destinations.vector_db_based.document_processor import METADATA_RECORD_ID_FIELD, METADATA_STREAM_FIELD from airbyte_cdk.destinations.vector_db_based.indexer import Indexer -from airbyte_cdk.destinations.vector_db_based.utils import create_chunks, format_exception +from airbyte_cdk.destinations.vector_db_based.utils import create_chunks, create_stream_identifier, format_exception from airbyte_cdk.models.airbyte_protocol import ConfiguredAirbyteCatalog, DestinationSyncMode from destination_pinecone.config import PineconeIndexingModel @@ -38,30 +38,32 @@ def pre_sync(self, catalog: ConfiguredAirbyteCatalog): self._pod_type = index_description.pod_type for stream in catalog.streams: if stream.destination_sync_mode == DestinationSyncMode.overwrite: - self.delete_vectors(filter={METADATA_STREAM_FIELD: stream.stream.name}) + self.delete_vectors( + filter={METADATA_STREAM_FIELD: create_stream_identifier(stream.stream)}, namespace=stream.stream.namespace + ) def post_sync(self): return [] - def delete_vectors(self, filter): + def delete_vectors(self, filter, namespace=None): if self._pod_type == "starter": # Starter pod types have a maximum of 100000 rows top_k = 10000 - self.delete_by_metadata(filter, top_k) + self.delete_by_metadata(filter, top_k, namespace) else: - self.pinecone_index.delete(filter=filter) + self.pinecone_index.delete(filter=filter, namespace=namespace) - def delete_by_metadata(self, filter, top_k): + def delete_by_metadata(self, filter, top_k, namespace=None): zero_vector = [0.0] * self.embedding_dimensions - query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k) + query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k, namespace=namespace) while len(query_result.matches) > 0: vector_ids = [doc.id for doc in query_result.matches] if len(vector_ids) > 0: # split into chunks of 1000 ids to avoid id limit batches = create_chunks(vector_ids, batch_size=MAX_IDS_PER_DELETE) for batch in batches: - self.pinecone_index.delete(ids=list(batch)) - query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k) + self.pinecone_index.delete(ids=list(batch), namespace=namespace) + query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k, namespace=namespace) def _truncate_metadata(self, metadata: dict) -> dict: """ @@ -92,7 +94,7 @@ def index(self, document_chunks, namespace, stream): serial_batches = create_chunks(pinecone_docs, batch_size=PINECONE_BATCH_SIZE * PARALLELISM_LIMIT) for batch in serial_batches: async_results = [ - self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False) + self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False, namespace=namespace) for ids_vectors_chunk in create_chunks(batch, batch_size=PINECONE_BATCH_SIZE) ] # Wait for and retrieve responses (this raises in case of error) @@ -100,7 +102,7 @@ def index(self, document_chunks, namespace, stream): def delete(self, delete_ids, namespace, stream): if len(delete_ids) > 0: - self.delete_vectors(filter={METADATA_RECORD_ID_FIELD: {"$in": delete_ids}}) + self.delete_vectors(filter={METADATA_RECORD_ID_FIELD: {"$in": delete_ids}}, namespace=namespace) def check(self) -> Optional[str]: try: diff --git a/airbyte-integrations/connectors/destination-pinecone/integration_tests/spec.json b/airbyte-integrations/connectors/destination-pinecone/integration_tests/spec.json index 31e76e8d43b3..22873f556522 100644 --- a/airbyte-integrations/connectors/destination-pinecone/integration_tests/spec.json +++ b/airbyte-integrations/connectors/destination-pinecone/integration_tests/spec.json @@ -199,6 +199,29 @@ "type": "array", "items": { "type": "string" } }, + "field_name_mappings": { + "title": "Field name mappings", + "description": "List of fields to rename. Not applicable for nested fields, but can be used to rename fields already flattened via dot notation.", + "default": [], + "type": "array", + "items": { + "title": "FieldNameMappingConfigModel", + "type": "object", + "properties": { + "from_field": { + "title": "From field name", + "description": "The field name in the source", + "type": "string" + }, + "to_field": { + "title": "To field name", + "description": "The field name to use in the destination", + "type": "string" + } + }, + "required": ["from_field", "to_field"] + } + }, "text_splitter": { "title": "Text splitter", "description": "Split text fields into chunks based on the specified method.", diff --git a/airbyte-integrations/connectors/destination-pinecone/metadata.yaml b/airbyte-integrations/connectors/destination-pinecone/metadata.yaml index 848a73b36f60..a3cf0121ac1f 100644 --- a/airbyte-integrations/connectors/destination-pinecone/metadata.yaml +++ b/airbyte-integrations/connectors/destination-pinecone/metadata.yaml @@ -13,7 +13,7 @@ data: connectorSubtype: vectorstore connectorType: destination definitionId: 3d2b6f84-7f0d-4e3f-a5e5-7c7d4b50eabd - dockerImageTag: 0.0.17 + dockerImageTag: 0.0.18 dockerRepository: airbyte/destination-pinecone documentationUrl: https://docs.airbyte.com/integrations/destinations/pinecone githubIssueLabel: destination-pinecone diff --git a/airbyte-integrations/connectors/destination-pinecone/setup.py b/airbyte-integrations/connectors/destination-pinecone/setup.py index 8c2727e0c6dc..8d8f8fc00019 100644 --- a/airbyte-integrations/connectors/destination-pinecone/setup.py +++ b/airbyte-integrations/connectors/destination-pinecone/setup.py @@ -6,7 +6,7 @@ from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ - "airbyte-cdk[vector-db-based]==0.51.34", + "airbyte-cdk[vector-db-based]==0.51.41", "pinecone-client[grpc]", ] diff --git a/airbyte-integrations/connectors/destination-pinecone/unit_tests/pinecone_indexer_test.py b/airbyte-integrations/connectors/destination-pinecone/unit_tests/pinecone_indexer_test.py index 86abf10208bc..c6f50424ade6 100644 --- a/airbyte-integrations/connectors/destination-pinecone/unit_tests/pinecone_indexer_test.py +++ b/airbyte-integrations/connectors/destination-pinecone/unit_tests/pinecone_indexer_test.py @@ -51,11 +51,11 @@ def test_pinecone_index_upsert_and_delete(mock_describe_index): Mock(page_content="test", metadata={"_ab_stream": "abc"}, embedding=[1, 2, 3]), Mock(page_content="test2", metadata={"_ab_stream": "abc"}, embedding=[4, 5, 6]), ], - None, + "ns1", "some_stream", ) - indexer.delete(["delete_id1", "delete_id2"], None, "some_stram") - indexer.pinecone_index.delete.assert_called_with(filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}}) + indexer.delete(["delete_id1", "delete_id2"], "ns1", "some_stram") + indexer.pinecone_index.delete.assert_called_with(filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}}, namespace="ns1") indexer.pinecone_index.upsert.assert_called_with( vectors=( (ANY, [1, 2, 3], {"_ab_stream": "abc", "text": "test"}), @@ -63,6 +63,7 @@ def test_pinecone_index_upsert_and_delete(mock_describe_index): ), async_req=True, show_progress=False, + namespace="ns1", ) @@ -79,14 +80,16 @@ def test_pinecone_index_upsert_and_delete_starter(mock_describe_index): Mock(page_content="test", metadata={"_ab_stream": "abc"}, embedding=[1, 2, 3]), Mock(page_content="test2", metadata={"_ab_stream": "abc"}, embedding=[4, 5, 6]), ], - None, + "ns1", "some_stream", ) - indexer.delete(["delete_id1", "delete_id2"], None, "some_stram") + indexer.delete(["delete_id1", "delete_id2"], "ns1", "some_stram") indexer.pinecone_index.query.assert_called_with( - vector=[0, 0, 0], filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}}, top_k=10_000 + vector=[0, 0, 0], filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}}, top_k=10_000, namespace="ns1" + ) + indexer.pinecone_index.delete.assert_has_calls( + [call(ids=["doc_id1", "doc_id2"], namespace="ns1"), call(ids=["doc_id3"], namespace="ns1")] ) - indexer.pinecone_index.delete.assert_has_calls([call(ids=["doc_id1", "doc_id2"]), call(ids=["doc_id3"])]) indexer.pinecone_index.upsert.assert_called_with( vectors=( (ANY, [1, 2, 3], {"_ab_stream": "abc", "text": "test"}), @@ -94,6 +97,7 @@ def test_pinecone_index_upsert_and_delete_starter(mock_describe_index): ), async_req=True, show_progress=False, + namespace="ns1", ) @@ -104,15 +108,18 @@ def test_pinecone_index_delete_1k_limit(mock_describe_index): MagicMock(matches=[MagicMock(id=f"doc_id_{str(i)}") for i in range(1300)]), MagicMock(matches=[]), ] - indexer.delete(["delete_id1"], None, "some_stream") + indexer.delete(["delete_id1"], "ns1", "some_stream") indexer.pinecone_index.delete.assert_has_calls( - [call(ids=[f"doc_id_{str(i)}" for i in range(1000)]), call(ids=[f"doc_id_{str(i+1000)}" for i in range(300)])] + [ + call(ids=[f"doc_id_{str(i)}" for i in range(1000)], namespace="ns1"), + call(ids=[f"doc_id_{str(i+1000)}" for i in range(300)], namespace="ns1"), + ] ) def test_pinecone_index_empty_batch(): indexer = create_pinecone_indexer() - indexer.index([], None, "some_stream") + indexer.index([], "ns1", "some_stream") indexer.pinecone_index.delete.assert_not_called() indexer.pinecone_index.upsert.assert_not_called() @@ -121,7 +128,7 @@ def test_pinecone_index_upsert_batching(): indexer = create_pinecone_indexer() indexer.index( [Mock(page_content=f"test {i}", metadata={"_ab_stream": "abc"}, embedding=[i, i, i]) for i in range(50)], - None, + "ns1", "some_stream", ) assert indexer.pinecone_index.upsert.call_count == 2 @@ -150,6 +157,7 @@ def generate_catalog(): "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_cursor": False, "default_cursor_field": ["column_name"], + "namespace": "ns1", }, "primary_key": [["id"]], "sync_mode": "incremental", @@ -162,6 +170,7 @@ def generate_catalog(): "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_cursor": False, "default_cursor_field": ["column_name"], + "namespace": "ns2", }, "primary_key": [["id"]], "sync_mode": "full_refresh", @@ -175,7 +184,7 @@ def generate_catalog(): def test_pinecone_pre_sync(mock_describe_index): indexer = create_pinecone_indexer() indexer.pre_sync(generate_catalog()) - indexer.pinecone_index.delete.assert_called_with(filter={"_ab_stream": "example_stream2"}) + indexer.pinecone_index.delete.assert_called_with(filter={"_ab_stream": "ns2_example_stream2"}, namespace="ns2") def test_pinecone_pre_sync_starter(mock_describe_index): @@ -186,8 +195,10 @@ def test_pinecone_pre_sync_starter(mock_describe_index): MagicMock(matches=[]), ] indexer.pre_sync(generate_catalog()) - indexer.pinecone_index.query.assert_called_with(vector=[0, 0, 0], filter={"_ab_stream": "example_stream2"}, top_k=10_000) - indexer.pinecone_index.delete.assert_called_with(ids=["doc_id1", "doc_id2"]) + indexer.pinecone_index.query.assert_called_with( + vector=[0, 0, 0], filter={"_ab_stream": "ns2_example_stream2"}, top_k=10_000, namespace="ns2" + ) + indexer.pinecone_index.delete.assert_called_with(ids=["doc_id1", "doc_id2"], namespace="ns2") @pytest.mark.parametrize( @@ -238,4 +249,5 @@ def test_metadata_normalization(): vectors=((ANY, [1, 2, 3], {"_ab_stream": "abc", "text": "test", "small": "a", "id": 1}),), async_req=True, show_progress=False, + namespace=None, ) diff --git a/docs/integrations/destinations/pinecone.md b/docs/integrations/destinations/pinecone.md index 10965767273b..7337329bff84 100644 --- a/docs/integrations/destinations/pinecone.md +++ b/docs/integrations/destinations/pinecone.md @@ -30,7 +30,7 @@ You'll need the following information to configure the destination: | Full Refresh Sync | Yes | | | Incremental - Append Sync | Yes | | | Incremental - Append + Deduped | Yes | Deleting records via CDC is not supported (see issue [#29827](https://github.com/airbytehq/airbyte/issues/29827)) | -| Namespaces | No | | +| Namespaces | Yes | | ## Data type mapping @@ -74,6 +74,7 @@ OpenAI and Fake embeddings produce vectors with 1536 dimensions, and the Cohere | Version | Date | Pull Request | Subject | |:--------| :--------- |:--------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.0.18 | 2023-10-20 | [#31329](https://github.com/airbytehq/airbyte/pull/31373) | Add support for namespaces and fix index cleaning when namespace is defined | | 0.0.17 | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Base image migration: remove Dockerfile and use the python-connector-base image | | 0.0.16 | 2023-10-15 | [#31329](https://github.com/airbytehq/airbyte/pull/31329) | Add OpenAI-compatible embedder option | | 0.0.15 | 2023-10-04 | [#31075](https://github.com/airbytehq/airbyte/pull/31075) | Fix OpenAI embedder batch size |