Skip to content

Commit

Permalink
Bug: Fix issue with Pinecone custom namespaces not being created auto…
Browse files Browse the repository at this point in the history
…matically (#38336)
  • Loading branch information
bindipankhudi authored May 18, 2024
1 parent 9e2b057 commit b7de9f1
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ def index(self, document_chunks, namespace, streamName):
for batch in serial_batches:
async_results = []
for ids_vectors_chunk in create_chunks(batch, batch_size=PINECONE_BATCH_SIZE):
async_result = self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False)
async_result = self.pinecone_index.upsert(
vectors=ids_vectors_chunk, async_req=True, show_progress=False, namespace=namespace
)
async_results.append(async_result)
# Wait for and retrieve responses (this raises in case of error)
[async_result.result() for async_result in async_results]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,18 @@

from airbyte_cdk.destinations.vector_db_based.embedder import OPEN_AI_VECTOR_SIZE
from airbyte_cdk.destinations.vector_db_based.test_utils import BaseIntegrationTest
from airbyte_cdk.models import DestinationSyncMode, Status
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
Status,
SyncMode,
Type,
)
from destination_pinecone.destination import DestinationPinecone
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Pinecone
Expand Down Expand Up @@ -47,7 +58,14 @@ def tearDown(self):
if "Namespace not found" not in str(e):
raise(e)
else :
print("Noting to delete. No data in the index/namespace.")
print("Nothing to delete in default namespace. No data in the index/namespace.")
try:
self.pinecone_index.delete(delete_all=True, namespace="ns1")
except PineconeException as e:
if "Namespace not found" not in str(e):
raise(e)
else :
print("Nothing to delete in ns1 namespace. No data in the index/namespace.")

def test_integration_test_flag_is_set(self):
assert "PYTEST_CURRENT_TEST" in os.environ
Expand Down Expand Up @@ -107,3 +125,44 @@ def test_write(self):
vector_store = Pinecone(self.pinecone_index_rest, embeddings.embed_query, "text")
result = vector_store.similarity_search("feline animals", 1)
assert result[0].metadata["_ab_record_id"] == "mystream_2"

def test_write_with_namespace(self):
catalog = self._get_configured_catalog_with_namespace(DestinationSyncMode.overwrite)
first_state_message = self._state({"state": "1"})
first_record_chunk = [self._record_with_namespace("mystream", f"Dogs are number {i}", i) for i in range(5)]

# initial sync
destination = DestinationPinecone()
list(destination.write(self.config, catalog, [*first_record_chunk, first_state_message]))

self._wait()
assert self.pinecone_index.describe_index_stats().total_vector_count == 5


def _get_configured_catalog_with_namespace(self, destination_mode: DestinationSyncMode) -> ConfiguredAirbyteCatalog:
stream_schema = {"type": "object", "properties": {"str_col": {"type": "str"}, "int_col": {"type": "integer"}, "random_col": {"type": "integer"}}}

overwrite_stream = ConfiguredAirbyteStream(
stream=AirbyteStream(
name="mystream",
namespace="ns1",
json_schema=stream_schema,
supported_sync_modes=[SyncMode.incremental, SyncMode.full_refresh]
),
primary_key=[["int_col"]],
sync_mode=SyncMode.incremental,
destination_sync_mode=destination_mode,
)

return ConfiguredAirbyteCatalog(streams=[overwrite_stream])

def _record_with_namespace(self, stream: str, str_value: str, int_value: int) -> AirbyteMessage:
return AirbyteMessage(
type=Type.RECORD, record=AirbyteRecordMessage(stream=stream,
namespace="ns1",
data={"str_col": str_value, "int_col": int_value},
emitted_at=0)
)



Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ data:
connectorSubtype: vectorstore
connectorType: destination
definitionId: 3d2b6f84-7f0d-4e3f-a5e5-7c7d4b50eabd
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
dockerRepository: airbyte/destination-pinecone
documentationUrl: https://docs.airbyte.com/integrations/destinations/pinecone
githubIssueLabel: destination-pinecone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "airbyte-destination-pinecone"
version = "0.1.1"
version = "0.1.2"
description = "Airbyte destination implementation for Pinecone."
authors = ["Airbyte <[email protected]>"]
license = "MIT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ def test_pinecone_index_upsert_and_delete(mock_describe_index):
(ANY, [4, 5, 6], {"_ab_stream": "abc", "text": "test2"}),
),
async_req=True,
show_progress=False
show_progress=False,
namespace="ns1",
)


Expand Down Expand Up @@ -139,6 +140,7 @@ def test_pinecone_index_upsert_and_delete_starter(mock_describe_index, mock_dete
),
async_req=True,
show_progress=False,
namespace="ns1",
)

def test_pinecone_index_upsert_and_delete_pod(mock_describe_index, mock_determine_spec_type):
Expand Down Expand Up @@ -168,6 +170,7 @@ def test_pinecone_index_upsert_and_delete_pod(mock_describe_index, mock_determin
),
async_req=True,
show_progress=False,
namespace="ns1",
)

def test_pinecone_index_upsert_and_delete_serverless(mock_describe_index, mock_determine_spec_type):
Expand Down Expand Up @@ -197,6 +200,7 @@ def test_pinecone_index_upsert_and_delete_serverless(mock_describe_index, mock_d
),
async_req=True,
show_progress=False,
namespace="ns1",
)


Expand Down Expand Up @@ -356,4 +360,5 @@ def test_metadata_normalization():
vectors=((ANY, [1, 2, 3], {"_ab_stream": "abc", "text": "test", "small": "a", "id": 1}),),
async_req=True,
show_progress=False,
namespace=None,
)
1 change: 1 addition & 0 deletions docs/integrations/destinations/pinecone.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ OpenAI and Fake embeddings produce vectors with 1536 dimensions, and the Cohere

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :-------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------- |
| 0.1.2 | 2023-05-17 | [#38336](https://github.com/airbytehq/airbyte/pull/338336) | Fix for regression:Custom namespaces not created automatically
| 0.1.1 | 2023-05-14 | [#38151](https://github.com/airbytehq/airbyte/pull/38151) | Add airbyte source tag for attribution
| 0.1.0 | 2023-05-06 | [#37756](https://github.com/airbytehq/airbyte/pull/37756) | Add support for Pinecone Serverless |
| 0.0.24 | 2023-04-15 | [#37333](https://github.com/airbytehq/airbyte/pull/37333) | Update CDK & pytest version to fix security vulnerabilities. |
Expand Down

0 comments on commit b7de9f1

Please sign in to comment.