Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pinecone: Namespace and field remapping support #31373

Merged
merged 33 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
cb7a5a8
add OpenAI-compatible embedding
Oct 12, 2023
9c32526
add changelog
Oct 12, 2023
eda2bb4
Automated Commit - Formatting Changes
flash1293 Oct 12, 2023
e2bbdd5
adjust indexers
Oct 13, 2023
9843ccb
Merge branch 'flash1293/allow-openai-compatible-embedding-modes' of g…
Oct 13, 2023
68abfb9
Merge remote-tracking branch 'origin/master' into flash1293/allow-ope…
Oct 13, 2023
da17199
use GITHUB_TOKEN instead of GH_PAT_MAINTENANCE_OCTAVIA to avoid GHA r…
alafanechere Oct 13, 2023
024561e
Revert "use GITHUB_TOKEN instead of GH_PAT_MAINTENANCE_OCTAVIA to avo…
alafanechere Oct 13, 2023
5f68d7c
remove repo-token on labeler action to fallback to default
alafanechere Oct 13, 2023
844b89d
Revert "remove repo-token on labeler action to fallback to default"
alafanechere Oct 13, 2023
264308e
Merge remote-tracking branch 'origin/master' into flash1293/allow-ope…
Oct 13, 2023
e3569c1
bump cdk version
Oct 13, 2023
f1d2eeb
add support for namespaces
Oct 13, 2023
ec1bc69
Trigger CI
alafanechere Oct 13, 2023
ef86321
Automated Commit - Formatting Changes
flash1293 Oct 13, 2023
3693b27
fix tests
Oct 13, 2023
0d5ed69
Merge branch 'flash1293/allow-openai-compatible-embedding-modes' of g…
Oct 13, 2023
339d55d
Merge branch 'flash1293/allow-openai-compatible-embedding-modes' into…
Oct 13, 2023
ab92cb6
fix tests
Oct 13, 2023
3348771
Automated Commit - Formatting Changes
flash1293 Oct 13, 2023
56ec383
Merge remote-tracking branch 'origin/master' into flash1293/pinecone-…
Oct 18, 2023
b35a0b3
prepare release
Oct 18, 2023
c5151ae
Merge branch 'master' into flash1293/pinecone-namespace-support-2
Oct 18, 2023
39a7093
Merge branch 'master' into flash1293/pinecone-namespace-support-2
Oct 19, 2023
58b34b9
fix namespace clearing error
Oct 19, 2023
2b0da74
Automated Commit - Formatting Changes
flash1293 Oct 19, 2023
014c442
Merge remote-tracking branch 'origin/master' into flash1293/pinecone-…
Oct 20, 2023
a343098
fix tests
Oct 20, 2023
4a70c1a
remove dockerfile again
Oct 20, 2023
f75b047
fix tests
Oct 20, 2023
498abbf
Automated Commit - Formatting Changes
flash1293 Oct 20, 2023
ee14f88
Update destination.py
Oct 20, 2023
412216d
Merge remote-tracking branch 'origin/master' into flash1293/pinecone-…
Oct 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pinecone
from airbyte_cdk.destinations.vector_db_based.document_processor import METADATA_RECORD_ID_FIELD, METADATA_STREAM_FIELD
from airbyte_cdk.destinations.vector_db_based.indexer import Indexer
from airbyte_cdk.destinations.vector_db_based.utils import create_chunks, format_exception
from airbyte_cdk.destinations.vector_db_based.utils import create_chunks, create_stream_identifier, format_exception
from airbyte_cdk.models.airbyte_protocol import ConfiguredAirbyteCatalog, DestinationSyncMode
from destination_pinecone.config import PineconeIndexingModel

Expand Down Expand Up @@ -38,30 +38,32 @@ def pre_sync(self, catalog: ConfiguredAirbyteCatalog):
self._pod_type = index_description.pod_type
for stream in catalog.streams:
if stream.destination_sync_mode == DestinationSyncMode.overwrite:
self.delete_vectors(filter={METADATA_STREAM_FIELD: stream.stream.name})
self.delete_vectors(
filter={METADATA_STREAM_FIELD: create_stream_identifier(stream.stream)}, namespace=stream.stream.namespace
)

def post_sync(self):
return []

def delete_vectors(self, filter):
def delete_vectors(self, filter, namespace=None):
if self._pod_type == "starter":
# Starter pod types have a maximum of 100000 rows
top_k = 10000
self.delete_by_metadata(filter, top_k)
self.delete_by_metadata(filter, top_k, namespace)
else:
self.pinecone_index.delete(filter=filter)
self.pinecone_index.delete(filter=filter, namespace=namespace)

def delete_by_metadata(self, filter, top_k):
def delete_by_metadata(self, filter, top_k, namespace=None):
zero_vector = [0.0] * self.embedding_dimensions
query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k)
query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k, namespace=namespace)
while len(query_result.matches) > 0:
vector_ids = [doc.id for doc in query_result.matches]
if len(vector_ids) > 0:
# split into chunks of 1000 ids to avoid id limit
batches = create_chunks(vector_ids, batch_size=MAX_IDS_PER_DELETE)
for batch in batches:
self.pinecone_index.delete(ids=list(batch))
query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k)
self.pinecone_index.delete(ids=list(batch), namespace=namespace)
query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k, namespace=namespace)

def _truncate_metadata(self, metadata: dict) -> dict:
"""
Expand Down Expand Up @@ -92,15 +94,15 @@ def index(self, document_chunks, namespace, stream):
serial_batches = create_chunks(pinecone_docs, batch_size=PINECONE_BATCH_SIZE * PARALLELISM_LIMIT)
for batch in serial_batches:
async_results = [
self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False)
self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False, namespace=namespace)
for ids_vectors_chunk in create_chunks(batch, batch_size=PINECONE_BATCH_SIZE)
]
# Wait for and retrieve responses (this raises in case of error)
[async_result.result() for async_result in async_results]

def delete(self, delete_ids, namespace, stream):
if len(delete_ids) > 0:
self.delete_vectors(filter={METADATA_RECORD_ID_FIELD: {"$in": delete_ids}})
self.delete_vectors(filter={METADATA_RECORD_ID_FIELD: {"$in": delete_ids}}, namespace=namespace)

def check(self) -> Optional[str]:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,29 @@
"type": "array",
"items": { "type": "string" }
},
"field_name_mappings": {
"title": "Field name mappings",
"description": "List of fields to rename. Not applicable for nested fields, but can be used to rename fields already flattened via dot notation.",
"default": [],
"type": "array",
"items": {
"title": "FieldNameMappingConfigModel",
"type": "object",
"properties": {
"from_field": {
"title": "From field name",
"description": "The field name in the source",
"type": "string"
},
"to_field": {
"title": "To field name",
"description": "The field name to use in the destination",
"type": "string"
}
},
"required": ["from_field", "to_field"]
}
},
"text_splitter": {
"title": "Text splitter",
"description": "Split text fields into chunks based on the specified method.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ data:
connectorSubtype: vectorstore
connectorType: destination
definitionId: 3d2b6f84-7f0d-4e3f-a5e5-7c7d4b50eabd
dockerImageTag: 0.0.17
dockerImageTag: 0.0.18
dockerRepository: airbyte/destination-pinecone
documentationUrl: https://docs.airbyte.com/integrations/destinations/pinecone
githubIssueLabel: destination-pinecone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk[vector-db-based]==0.51.34",
"airbyte-cdk[vector-db-based]==0.51.41",
"pinecone-client[grpc]",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,19 @@ def test_pinecone_index_upsert_and_delete(mock_describe_index):
Mock(page_content="test", metadata={"_ab_stream": "abc"}, embedding=[1, 2, 3]),
Mock(page_content="test2", metadata={"_ab_stream": "abc"}, embedding=[4, 5, 6]),
],
None,
"ns1",
"some_stream",
)
indexer.delete(["delete_id1", "delete_id2"], None, "some_stram")
indexer.pinecone_index.delete.assert_called_with(filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}})
indexer.delete(["delete_id1", "delete_id2"], "ns1", "some_stram")
indexer.pinecone_index.delete.assert_called_with(filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}}, namespace="ns1")
indexer.pinecone_index.upsert.assert_called_with(
vectors=(
(ANY, [1, 2, 3], {"_ab_stream": "abc", "text": "test"}),
(ANY, [4, 5, 6], {"_ab_stream": "abc", "text": "test2"}),
),
async_req=True,
show_progress=False,
namespace="ns1",
)


Expand All @@ -79,21 +80,24 @@ def test_pinecone_index_upsert_and_delete_starter(mock_describe_index):
Mock(page_content="test", metadata={"_ab_stream": "abc"}, embedding=[1, 2, 3]),
Mock(page_content="test2", metadata={"_ab_stream": "abc"}, embedding=[4, 5, 6]),
],
None,
"ns1",
"some_stream",
)
indexer.delete(["delete_id1", "delete_id2"], None, "some_stram")
indexer.delete(["delete_id1", "delete_id2"], "ns1", "some_stram")
indexer.pinecone_index.query.assert_called_with(
vector=[0, 0, 0], filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}}, top_k=10_000
vector=[0, 0, 0], filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}}, top_k=10_000, namespace="ns1"
)
indexer.pinecone_index.delete.assert_has_calls(
[call(ids=["doc_id1", "doc_id2"], namespace="ns1"), call(ids=["doc_id3"], namespace="ns1")]
)
indexer.pinecone_index.delete.assert_has_calls([call(ids=["doc_id1", "doc_id2"]), call(ids=["doc_id3"])])
indexer.pinecone_index.upsert.assert_called_with(
vectors=(
(ANY, [1, 2, 3], {"_ab_stream": "abc", "text": "test"}),
(ANY, [4, 5, 6], {"_ab_stream": "abc", "text": "test2"}),
),
async_req=True,
show_progress=False,
namespace="ns1",
)


Expand All @@ -104,15 +108,18 @@ def test_pinecone_index_delete_1k_limit(mock_describe_index):
MagicMock(matches=[MagicMock(id=f"doc_id_{str(i)}") for i in range(1300)]),
MagicMock(matches=[]),
]
indexer.delete(["delete_id1"], None, "some_stream")
indexer.delete(["delete_id1"], "ns1", "some_stream")
indexer.pinecone_index.delete.assert_has_calls(
[call(ids=[f"doc_id_{str(i)}" for i in range(1000)]), call(ids=[f"doc_id_{str(i+1000)}" for i in range(300)])]
[
call(ids=[f"doc_id_{str(i)}" for i in range(1000)], namespace="ns1"),
call(ids=[f"doc_id_{str(i+1000)}" for i in range(300)], namespace="ns1"),
]
)


def test_pinecone_index_empty_batch():
indexer = create_pinecone_indexer()
indexer.index([], None, "some_stream")
indexer.index([], "ns1", "some_stream")
indexer.pinecone_index.delete.assert_not_called()
indexer.pinecone_index.upsert.assert_not_called()

Expand All @@ -121,7 +128,7 @@ def test_pinecone_index_upsert_batching():
indexer = create_pinecone_indexer()
indexer.index(
[Mock(page_content=f"test {i}", metadata={"_ab_stream": "abc"}, embedding=[i, i, i]) for i in range(50)],
None,
"ns1",
"some_stream",
)
assert indexer.pinecone_index.upsert.call_count == 2
Expand Down Expand Up @@ -150,6 +157,7 @@ def generate_catalog():
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": False,
"default_cursor_field": ["column_name"],
"namespace": "ns1",
},
"primary_key": [["id"]],
"sync_mode": "incremental",
Expand All @@ -162,6 +170,7 @@ def generate_catalog():
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": False,
"default_cursor_field": ["column_name"],
"namespace": "ns2",
},
"primary_key": [["id"]],
"sync_mode": "full_refresh",
Expand All @@ -175,7 +184,7 @@ def generate_catalog():
def test_pinecone_pre_sync(mock_describe_index):
indexer = create_pinecone_indexer()
indexer.pre_sync(generate_catalog())
indexer.pinecone_index.delete.assert_called_with(filter={"_ab_stream": "example_stream2"})
indexer.pinecone_index.delete.assert_called_with(filter={"_ab_stream": "ns2_example_stream2"}, namespace="ns2")
Copy link
Collaborator

@aaronsteers aaronsteers Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to double check - do you think think we should list this as a breaking change?

Namely, does the namespace come through from the source by default, and would this change the destination index name? Or is the namespace purely opt-in, so we can assume that wherever it was not handled previously, this is 'fixing' the prior omitted prefix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a breaking change because the namespace is opt-in (can be configured on the connection level) - it changes here because I added a namespace to the test.



def test_pinecone_pre_sync_starter(mock_describe_index):
Expand All @@ -186,8 +195,10 @@ def test_pinecone_pre_sync_starter(mock_describe_index):
MagicMock(matches=[]),
]
indexer.pre_sync(generate_catalog())
indexer.pinecone_index.query.assert_called_with(vector=[0, 0, 0], filter={"_ab_stream": "example_stream2"}, top_k=10_000)
indexer.pinecone_index.delete.assert_called_with(ids=["doc_id1", "doc_id2"])
indexer.pinecone_index.query.assert_called_with(
vector=[0, 0, 0], filter={"_ab_stream": "ns2_example_stream2"}, top_k=10_000, namespace="ns2"
)
indexer.pinecone_index.delete.assert_called_with(ids=["doc_id1", "doc_id2"], namespace="ns2")


@pytest.mark.parametrize(
Expand Down Expand Up @@ -238,4 +249,5 @@ def test_metadata_normalization():
vectors=((ANY, [1, 2, 3], {"_ab_stream": "abc", "text": "test", "small": "a", "id": 1}),),
async_req=True,
show_progress=False,
namespace=None,
)
3 changes: 2 additions & 1 deletion docs/integrations/destinations/pinecone.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ You'll need the following information to configure the destination:
| Full Refresh Sync | Yes | |
| Incremental - Append Sync | Yes | |
| Incremental - Append + Deduped | Yes | Deleting records via CDC is not supported (see issue [#29827](https://github.com/airbytehq/airbyte/issues/29827)) |
| Namespaces | No | |
| Namespaces | Yes | |

## Data type mapping

Expand Down Expand Up @@ -74,6 +74,7 @@ OpenAI and Fake embeddings produce vectors with 1536 dimensions, and the Cohere

| Version | Date | Pull Request | Subject |
|:--------| :--------- |:--------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.0.18 | 2023-10-20 | [#31329](https://github.com/airbytehq/airbyte/pull/31373) | Add support for namespaces and fix index cleaning when namespace is defined |
| 0.0.17 | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Base image migration: remove Dockerfile and use the python-connector-base image |
| 0.0.16 | 2023-10-15 | [#31329](https://github.com/airbytehq/airbyte/pull/31329) | Add OpenAI-compatible embedder option |
| 0.0.15 | 2023-10-04 | [#31075](https://github.com/airbytehq/airbyte/pull/31075) | Fix OpenAI embedder batch size |
Expand Down
Loading