Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pinecone destination: Add namespace setting #30789

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ COPY destination_pinecone ./destination_pinecone
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.0.13
LABEL io.airbyte.version=0.0.14

LABEL io.airbyte.name=airbyte/destination-pinecone
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Union
from typing import Literal, Optional, Union

import dpath.util
from airbyte_cdk.destinations.vector_db_based.config import (
Expand All @@ -16,6 +16,15 @@
from pydantic import BaseModel, Field


class NamespaceModel(BaseModel):
mode: Literal["constant"] = Field("constant", const=True)
value: Optional[str] = Field(
title="Namespace",
default="",
description="Namespace to use for all records. This is not supported on starter pods",
)


class PineconeIndexingModel(BaseModel):
pinecone_key: str = Field(
...,
Expand All @@ -27,6 +36,7 @@ class PineconeIndexingModel(BaseModel):
..., title="Pinecone Environment", description="Pinecone Cloud environment to use", examples=["us-west1-gcp", "gcp-starter"]
)
index: str = Field(..., title="Index", description="Pinecone index in your project to load data into")
namespace: Optional[NamespaceModel]

class Config:
title = "Indexing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, config: PineconeIndexingModel, embedding_dimensions: int):

self.pinecone_index = pinecone.GRPCIndex(config.index)
self.embedding_dimensions = embedding_dimensions
self._namespace = config.namespace.value if config.namespace and config.namespace.value != "" else None

def pre_sync(self, catalog: ConfiguredAirbyteCatalog):
index_description = pinecone.describe_index(self.config.index)
Expand All @@ -49,7 +50,7 @@ def delete_vectors(self, filter):
top_k = 10000
self.delete_by_metadata(filter, top_k)
else:
self.pinecone_index.delete(filter=filter)
self.pinecone_index.delete(filter=filter, namespace=self._namespace)

def delete_by_metadata(self, filter, top_k):
zero_vector = [0.0] * self.embedding_dimensions
Expand Down Expand Up @@ -94,7 +95,7 @@ def index(self, document_chunks, delete_ids):
serial_batches = create_chunks(pinecone_docs, batch_size=PINECONE_BATCH_SIZE * PARALLELISM_LIMIT)
for batch in serial_batches:
async_results = [
self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False)
self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False, namespace=self._namespace)
for ids_vectors_chunk in create_chunks(batch, batch_size=PINECONE_BATCH_SIZE)
]
# Wait for and retrieve responses (this raises in case of error)
Expand All @@ -106,6 +107,8 @@ def check(self) -> Optional[str]:
actual_dimension = int(description.dimension)
if actual_dimension != self.embedding_dimensions:
return f"Your embedding configuration will produce vectors with dimension {self.embedding_dimensions:d}, but your index is configured with dimension {actual_dimension:d}. Make sure embedding and indexing configurations match."
if description.pod_type == "starter" and self._namespace is not None:
return "Namespaces are not supported for starter pods."
except Exception as e:
return format_exception(e)
return None
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,25 @@
"title": "Index",
"description": "Pinecone index in your project to load data into",
"type": "string"
},
"namespace": {
"title": "NamespaceModel",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this title show up anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's just internal.

"type": "object",
"properties": {
"mode": {
"title": "Mode",
"default": "constant",
"const": "constant",
"enum": ["constant"],
"type": "string"
},
"value": {
"title": "Namespace",
"description": "Namespace to use for all records. This is not supported on starter pods",
"default": "",
"type": "string"
}
}
}
},
"required": ["pinecone_key", "pinecone_environment", "index"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 3d2b6f84-7f0d-4e3f-a5e5-7c7d4b50eabd
dockerImageTag: 0.0.13
dockerImageTag: 0.0.14
dockerRepository: airbyte/destination-pinecone
githubIssueLabel: destination-pinecone
icon: pinecone.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

import pytest
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from destination_pinecone.config import PineconeIndexingModel
from destination_pinecone.config import NamespaceModel, PineconeIndexingModel
from destination_pinecone.indexer import PineconeIndexer
from pinecone import IndexDescription


def create_pinecone_indexer():
config = PineconeIndexingModel(mode="pinecone", pinecone_environment="myenv", pinecone_key="mykey", index="myindex")
def create_pinecone_indexer(namespace=None):
config = PineconeIndexingModel(mode="pinecone", pinecone_environment="myenv", pinecone_key="mykey", index="myindex", namespace=NamespaceModel(mode="constant", value=namespace) if namespace is not None else None)
indexer = PineconeIndexer(config, 3)

indexer.pinecone_index.delete = MagicMock()
Expand Down Expand Up @@ -43,8 +43,16 @@ def mock_describe_index():
yield mock


def test_pinecone_index_upsert_and_delete(mock_describe_index):
indexer = create_pinecone_indexer()
@pytest.mark.parametrize(
"namespace, called_with",
[
(None, None),
("", None),
("my_namespace", "my_namespace"),
]
)
def test_pinecone_index_upsert_and_delete(mock_describe_index, namespace, called_with):
indexer = create_pinecone_indexer(namespace)
indexer._pod_type = "p1"
indexer.index(
[
Expand All @@ -53,14 +61,15 @@ def test_pinecone_index_upsert_and_delete(mock_describe_index):
],
["delete_id1", "delete_id2"],
)
indexer.pinecone_index.delete.assert_called_with(filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}})
indexer.pinecone_index.delete.assert_called_with(filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}}, namespace=called_with)
indexer.pinecone_index.upsert.assert_called_with(
vectors=(
(ANY, [1, 2, 3], {"_ab_stream": "abc", "text": "test"}),
(ANY, [4, 5, 6], {"_ab_stream": "abc", "text": "test2"}),
),
async_req=True,
show_progress=False,
namespace=called_with,
)


Expand All @@ -86,6 +95,7 @@ def test_pinecone_index_upsert_and_delete_starter(mock_describe_index):
),
async_req=True,
show_progress=False,
namespace=None,
)


Expand Down Expand Up @@ -164,10 +174,18 @@ def generate_catalog():
)


def test_pinecone_pre_sync(mock_describe_index):
indexer = create_pinecone_indexer()
@pytest.mark.parametrize(
"namespace, called_with",
[
(None, None),
("", None),
("my_namespace", "my_namespace"),
]
)
def test_pinecone_pre_sync(mock_describe_index, namespace, called_with):
indexer = create_pinecone_indexer(namespace)
indexer.pre_sync(generate_catalog())
indexer.pinecone_index.delete.assert_called_with(filter={"_ab_stream": "example_stream2"})
indexer.pinecone_index.delete.assert_called_with(filter={"_ab_stream": "example_stream2"}, namespace=called_with)


def test_pinecone_pre_sync_starter(mock_describe_index):
Expand All @@ -180,21 +198,24 @@ def test_pinecone_pre_sync_starter(mock_describe_index):


@pytest.mark.parametrize(
"describe_throws,reported_dimensions,check_succeeds",
"describe_throws,reported_dimensions,pod_type,namespace,check_succeeds",
[
(False, 3, True),
(False, 4, False),
(True, 3, False),
(True, 4, False),
(False, 3, "p1", None, True),
(False, 4, "p1", None, False),
(True, 3, "p1", None, False),
(True, 4, "p1", None, False),
(False, 3, "starter", None, True),
(False, 3, "starter", "", True),
(False, 3, "starter", "my_namespace", False),
],
)
@patch("pinecone.describe_index")
def test_pinecone_check(describe_mock, describe_throws, reported_dimensions, check_succeeds):
indexer = create_pinecone_indexer()
def test_pinecone_check(describe_mock, describe_throws, reported_dimensions, pod_type, namespace, check_succeeds):
indexer = create_pinecone_indexer(namespace)
indexer.embedding_dimensions = 3
if describe_throws:
describe_mock.side_effect = Exception("describe failed")
describe_mock.return_value = create_index_description(dimensions=reported_dimensions)
describe_mock.return_value = create_index_description(dimensions=reported_dimensions, pod_type=pod_type)
result = indexer.check()
if check_succeeds:
assert result is None
Expand Down Expand Up @@ -226,4 +247,5 @@ def test_metadata_normalization():
vectors=((ANY, [1, 2, 3], {"_ab_stream": "abc", "text": "test", "small": "a", "id": 1}),),
async_req=True,
show_progress=False,
namespace=None
)
1 change: 1 addition & 0 deletions docs/integrations/destinations/pinecone.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ OpenAI and Fake embeddings produce vectors with 1536 dimensions, and the Cohere

| Version | Date | Pull Request | Subject |
|:--------| :--------- |:--------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.0.14 | 2023-09-28 | [#30789](https://github.com/airbytehq/airbyte/pull/30789) | Support namespaces |
| 0.0.13 | 2023-09-26 | [#30649](https://github.com/airbytehq/airbyte/pull/30649) | Allow more text splitting options |
| 0.0.12 | 2023-09-25 | [#30649](https://github.com/airbytehq/airbyte/pull/30649) | Fix bug with stale documents left on starter pods |
| 0.0.11 | 2023-09-22 | [#30649](https://github.com/airbytehq/airbyte/pull/30649) | Set visible certified flag |
Expand Down
Loading