Skip to content

Commit

Permalink
Pinecone: Namespace and field remapping support (#31373)
Browse files Browse the repository at this point in the history
Co-authored-by: flash1293 <[email protected]>
Co-authored-by: alafanechere <[email protected]>
  • Loading branch information
3 people authored Oct 26, 2023
1 parent 1d4d372 commit bee749b
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pinecone
from airbyte_cdk.destinations.vector_db_based.document_processor import METADATA_RECORD_ID_FIELD, METADATA_STREAM_FIELD
from airbyte_cdk.destinations.vector_db_based.indexer import Indexer
from airbyte_cdk.destinations.vector_db_based.utils import create_chunks, format_exception
from airbyte_cdk.destinations.vector_db_based.utils import create_chunks, create_stream_identifier, format_exception
from airbyte_cdk.models.airbyte_protocol import ConfiguredAirbyteCatalog, DestinationSyncMode
from destination_pinecone.config import PineconeIndexingModel

Expand Down Expand Up @@ -38,30 +38,32 @@ def pre_sync(self, catalog: ConfiguredAirbyteCatalog):
self._pod_type = index_description.pod_type
for stream in catalog.streams:
if stream.destination_sync_mode == DestinationSyncMode.overwrite:
self.delete_vectors(filter={METADATA_STREAM_FIELD: stream.stream.name})
self.delete_vectors(
filter={METADATA_STREAM_FIELD: create_stream_identifier(stream.stream)}, namespace=stream.stream.namespace
)

def post_sync(self):
return []

def delete_vectors(self, filter):
def delete_vectors(self, filter, namespace=None):
if self._pod_type == "starter":
# Starter pod types have a maximum of 100000 rows
top_k = 10000
self.delete_by_metadata(filter, top_k)
self.delete_by_metadata(filter, top_k, namespace)
else:
self.pinecone_index.delete(filter=filter)
self.pinecone_index.delete(filter=filter, namespace=namespace)

def delete_by_metadata(self, filter, top_k):
def delete_by_metadata(self, filter, top_k, namespace=None):
zero_vector = [0.0] * self.embedding_dimensions
query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k)
query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k, namespace=namespace)
while len(query_result.matches) > 0:
vector_ids = [doc.id for doc in query_result.matches]
if len(vector_ids) > 0:
# split into chunks of 1000 ids to avoid id limit
batches = create_chunks(vector_ids, batch_size=MAX_IDS_PER_DELETE)
for batch in batches:
self.pinecone_index.delete(ids=list(batch))
query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k)
self.pinecone_index.delete(ids=list(batch), namespace=namespace)
query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k, namespace=namespace)

def _truncate_metadata(self, metadata: dict) -> dict:
"""
Expand Down Expand Up @@ -92,15 +94,15 @@ def index(self, document_chunks, namespace, stream):
serial_batches = create_chunks(pinecone_docs, batch_size=PINECONE_BATCH_SIZE * PARALLELISM_LIMIT)
for batch in serial_batches:
async_results = [
self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False)
self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False, namespace=namespace)
for ids_vectors_chunk in create_chunks(batch, batch_size=PINECONE_BATCH_SIZE)
]
# Wait for and retrieve responses (this raises in case of error)
[async_result.result() for async_result in async_results]

def delete(self, delete_ids, namespace, stream):
if len(delete_ids) > 0:
self.delete_vectors(filter={METADATA_RECORD_ID_FIELD: {"$in": delete_ids}})
self.delete_vectors(filter={METADATA_RECORD_ID_FIELD: {"$in": delete_ids}}, namespace=namespace)

def check(self) -> Optional[str]:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,29 @@
"type": "array",
"items": { "type": "string" }
},
"field_name_mappings": {
"title": "Field name mappings",
"description": "List of fields to rename. Not applicable for nested fields, but can be used to rename fields already flattened via dot notation.",
"default": [],
"type": "array",
"items": {
"title": "FieldNameMappingConfigModel",
"type": "object",
"properties": {
"from_field": {
"title": "From field name",
"description": "The field name in the source",
"type": "string"
},
"to_field": {
"title": "To field name",
"description": "The field name to use in the destination",
"type": "string"
}
},
"required": ["from_field", "to_field"]
}
},
"text_splitter": {
"title": "Text splitter",
"description": "Split text fields into chunks based on the specified method.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ data:
connectorSubtype: vectorstore
connectorType: destination
definitionId: 3d2b6f84-7f0d-4e3f-a5e5-7c7d4b50eabd
dockerImageTag: 0.0.17
dockerImageTag: 0.0.18
dockerRepository: airbyte/destination-pinecone
documentationUrl: https://docs.airbyte.com/integrations/destinations/pinecone
githubIssueLabel: destination-pinecone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk[vector-db-based]==0.51.34",
"airbyte-cdk[vector-db-based]==0.51.41",
"pinecone-client[grpc]",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,19 @@ def test_pinecone_index_upsert_and_delete(mock_describe_index):
Mock(page_content="test", metadata={"_ab_stream": "abc"}, embedding=[1, 2, 3]),
Mock(page_content="test2", metadata={"_ab_stream": "abc"}, embedding=[4, 5, 6]),
],
None,
"ns1",
"some_stream",
)
indexer.delete(["delete_id1", "delete_id2"], None, "some_stram")
indexer.pinecone_index.delete.assert_called_with(filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}})
indexer.delete(["delete_id1", "delete_id2"], "ns1", "some_stram")
indexer.pinecone_index.delete.assert_called_with(filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}}, namespace="ns1")
indexer.pinecone_index.upsert.assert_called_with(
vectors=(
(ANY, [1, 2, 3], {"_ab_stream": "abc", "text": "test"}),
(ANY, [4, 5, 6], {"_ab_stream": "abc", "text": "test2"}),
),
async_req=True,
show_progress=False,
namespace="ns1",
)


Expand All @@ -79,21 +80,24 @@ def test_pinecone_index_upsert_and_delete_starter(mock_describe_index):
Mock(page_content="test", metadata={"_ab_stream": "abc"}, embedding=[1, 2, 3]),
Mock(page_content="test2", metadata={"_ab_stream": "abc"}, embedding=[4, 5, 6]),
],
None,
"ns1",
"some_stream",
)
indexer.delete(["delete_id1", "delete_id2"], None, "some_stram")
indexer.delete(["delete_id1", "delete_id2"], "ns1", "some_stram")
indexer.pinecone_index.query.assert_called_with(
vector=[0, 0, 0], filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}}, top_k=10_000
vector=[0, 0, 0], filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}}, top_k=10_000, namespace="ns1"
)
indexer.pinecone_index.delete.assert_has_calls(
[call(ids=["doc_id1", "doc_id2"], namespace="ns1"), call(ids=["doc_id3"], namespace="ns1")]
)
indexer.pinecone_index.delete.assert_has_calls([call(ids=["doc_id1", "doc_id2"]), call(ids=["doc_id3"])])
indexer.pinecone_index.upsert.assert_called_with(
vectors=(
(ANY, [1, 2, 3], {"_ab_stream": "abc", "text": "test"}),
(ANY, [4, 5, 6], {"_ab_stream": "abc", "text": "test2"}),
),
async_req=True,
show_progress=False,
namespace="ns1",
)


Expand All @@ -104,15 +108,18 @@ def test_pinecone_index_delete_1k_limit(mock_describe_index):
MagicMock(matches=[MagicMock(id=f"doc_id_{str(i)}") for i in range(1300)]),
MagicMock(matches=[]),
]
indexer.delete(["delete_id1"], None, "some_stream")
indexer.delete(["delete_id1"], "ns1", "some_stream")
indexer.pinecone_index.delete.assert_has_calls(
[call(ids=[f"doc_id_{str(i)}" for i in range(1000)]), call(ids=[f"doc_id_{str(i+1000)}" for i in range(300)])]
[
call(ids=[f"doc_id_{str(i)}" for i in range(1000)], namespace="ns1"),
call(ids=[f"doc_id_{str(i+1000)}" for i in range(300)], namespace="ns1"),
]
)


def test_pinecone_index_empty_batch():
indexer = create_pinecone_indexer()
indexer.index([], None, "some_stream")
indexer.index([], "ns1", "some_stream")
indexer.pinecone_index.delete.assert_not_called()
indexer.pinecone_index.upsert.assert_not_called()

Expand All @@ -121,7 +128,7 @@ def test_pinecone_index_upsert_batching():
indexer = create_pinecone_indexer()
indexer.index(
[Mock(page_content=f"test {i}", metadata={"_ab_stream": "abc"}, embedding=[i, i, i]) for i in range(50)],
None,
"ns1",
"some_stream",
)
assert indexer.pinecone_index.upsert.call_count == 2
Expand Down Expand Up @@ -150,6 +157,7 @@ def generate_catalog():
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": False,
"default_cursor_field": ["column_name"],
"namespace": "ns1",
},
"primary_key": [["id"]],
"sync_mode": "incremental",
Expand All @@ -162,6 +170,7 @@ def generate_catalog():
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": False,
"default_cursor_field": ["column_name"],
"namespace": "ns2",
},
"primary_key": [["id"]],
"sync_mode": "full_refresh",
Expand All @@ -175,7 +184,7 @@ def generate_catalog():
def test_pinecone_pre_sync(mock_describe_index):
indexer = create_pinecone_indexer()
indexer.pre_sync(generate_catalog())
indexer.pinecone_index.delete.assert_called_with(filter={"_ab_stream": "example_stream2"})
indexer.pinecone_index.delete.assert_called_with(filter={"_ab_stream": "ns2_example_stream2"}, namespace="ns2")


def test_pinecone_pre_sync_starter(mock_describe_index):
Expand All @@ -186,8 +195,10 @@ def test_pinecone_pre_sync_starter(mock_describe_index):
MagicMock(matches=[]),
]
indexer.pre_sync(generate_catalog())
indexer.pinecone_index.query.assert_called_with(vector=[0, 0, 0], filter={"_ab_stream": "example_stream2"}, top_k=10_000)
indexer.pinecone_index.delete.assert_called_with(ids=["doc_id1", "doc_id2"])
indexer.pinecone_index.query.assert_called_with(
vector=[0, 0, 0], filter={"_ab_stream": "ns2_example_stream2"}, top_k=10_000, namespace="ns2"
)
indexer.pinecone_index.delete.assert_called_with(ids=["doc_id1", "doc_id2"], namespace="ns2")


@pytest.mark.parametrize(
Expand Down Expand Up @@ -238,4 +249,5 @@ def test_metadata_normalization():
vectors=((ANY, [1, 2, 3], {"_ab_stream": "abc", "text": "test", "small": "a", "id": 1}),),
async_req=True,
show_progress=False,
namespace=None,
)
3 changes: 2 additions & 1 deletion docs/integrations/destinations/pinecone.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ You'll need the following information to configure the destination:
| Full Refresh Sync | Yes | |
| Incremental - Append Sync | Yes | |
| Incremental - Append + Deduped | Yes | Deleting records via CDC is not supported (see issue [#29827](https://github.com/airbytehq/airbyte/issues/29827)) |
| Namespaces | No | |
| Namespaces | Yes | |

## Data type mapping

Expand Down Expand Up @@ -74,6 +74,7 @@ OpenAI and Fake embeddings produce vectors with 1536 dimensions, and the Cohere

| Version | Date | Pull Request | Subject |
|:--------| :--------- |:--------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.0.18 | 2023-10-20 | [#31329](https://github.com/airbytehq/airbyte/pull/31373) | Add support for namespaces and fix index cleaning when namespace is defined |
| 0.0.17 | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Base image migration: remove Dockerfile and use the python-connector-base image |
| 0.0.16 | 2023-10-15 | [#31329](https://github.com/airbytehq/airbyte/pull/31329) | Add OpenAI-compatible embedder option |
| 0.0.15 | 2023-10-04 | [#31075](https://github.com/airbytehq/airbyte/pull/31075) | Fix OpenAI embedder batch size |
Expand Down

0 comments on commit bee749b

Please sign in to comment.