Skip to content

Commit

Permalink
add namespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Reuter committed Sep 27, 2023
1 parent 8186e80 commit bc37daf
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ COPY destination_pinecone ./destination_pinecone
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.0.13
LABEL io.airbyte.version=0.0.14

LABEL io.airbyte.name=airbyte/destination-pinecone
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Union
from typing import Literal, Optional, Union

import dpath.util
from airbyte_cdk.destinations.vector_db_based.config import (
Expand All @@ -16,6 +16,15 @@
from pydantic import BaseModel, Field


class NamespaceModel(BaseModel):
mode: Literal["constant"] = Field("constant", const=True)
value: Optional[str] = Field(
title="Namespace",
default="",
description="Namespace to use for all records. This is not supported on starter pods",
)


class PineconeIndexingModel(BaseModel):
pinecone_key: str = Field(
...,
Expand All @@ -27,6 +36,7 @@ class PineconeIndexingModel(BaseModel):
..., title="Pinecone Environment", description="Pinecone Cloud environment to use", examples=["us-west1-gcp", "gcp-starter"]
)
index: str = Field(..., title="Index", description="Pinecone index in your project to load data into")
namespace: Optional[NamespaceModel]

class Config:
title = "Indexing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, config: PineconeIndexingModel, embedding_dimensions: int):

self.pinecone_index = pinecone.GRPCIndex(config.index)
self.embedding_dimensions = embedding_dimensions
self._namespace = config.namespace.value if config.namespace and config.namespace.value != "" else None

def pre_sync(self, catalog: ConfiguredAirbyteCatalog):
index_description = pinecone.describe_index(self.config.index)
Expand All @@ -49,7 +50,7 @@ def delete_vectors(self, filter):
top_k = 10000
self.delete_by_metadata(filter, top_k)
else:
self.pinecone_index.delete(filter=filter)
self.pinecone_index.delete(filter=filter, namespace=self._namespace)

def delete_by_metadata(self, filter, top_k):
zero_vector = [0.0] * self.embedding_dimensions
Expand Down Expand Up @@ -94,7 +95,7 @@ def index(self, document_chunks, delete_ids):
serial_batches = create_chunks(pinecone_docs, batch_size=PINECONE_BATCH_SIZE * PARALLELISM_LIMIT)
for batch in serial_batches:
async_results = [
self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False)
self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False, namespace=self._namespace)
for ids_vectors_chunk in create_chunks(batch, batch_size=PINECONE_BATCH_SIZE)
]
# Wait for and retrieve responses (this raises in case of error)
Expand All @@ -106,6 +107,8 @@ def check(self) -> Optional[str]:
actual_dimension = int(description.dimension)
if actual_dimension != self.embedding_dimensions:
return f"Your embedding configuration will produce vectors with dimension {self.embedding_dimensions:d}, but your index is configured with dimension {actual_dimension:d}. Make sure embedding and indexing configurations match."
if description.pod_type == "starter" and self._namespace is not None:
return "Namespaces are not supported for starter pods."
except Exception as e:
return format_exception(e)
return None
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,25 @@
"title": "Index",
"description": "Pinecone index in your project to load data into",
"type": "string"
},
"namespace": {
"title": "NamespaceModel",
"type": "object",
"properties": {
"mode": {
"title": "Mode",
"default": "constant",
"const": "constant",
"enum": ["constant"],
"type": "string"
},
"value": {
"title": "Namespace",
"description": "Namespace to use for all records. This is not supported on starter pods",
"default": "",
"type": "string"
}
}
}
},
"required": ["pinecone_key", "pinecone_environment", "index"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 3d2b6f84-7f0d-4e3f-a5e5-7c7d4b50eabd
dockerImageTag: 0.0.13
dockerImageTag: 0.0.14
dockerRepository: airbyte/destination-pinecone
githubIssueLabel: destination-pinecone
icon: pinecone.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

import pytest
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from destination_pinecone.config import PineconeIndexingModel
from destination_pinecone.config import NamespaceModel, PineconeIndexingModel
from destination_pinecone.indexer import PineconeIndexer
from pinecone import IndexDescription


def create_pinecone_indexer():
config = PineconeIndexingModel(mode="pinecone", pinecone_environment="myenv", pinecone_key="mykey", index="myindex")
def create_pinecone_indexer(namespace=None):
config = PineconeIndexingModel(mode="pinecone", pinecone_environment="myenv", pinecone_key="mykey", index="myindex", namespace=NamespaceModel(mode="constant", value=namespace) if namespace is not None else None)
indexer = PineconeIndexer(config, 3)

indexer.pinecone_index.delete = MagicMock()
Expand Down Expand Up @@ -43,8 +43,16 @@ def mock_describe_index():
yield mock


def test_pinecone_index_upsert_and_delete(mock_describe_index):
indexer = create_pinecone_indexer()
@pytest.mark.parametrize(
"namespace, called_with",
[
(None, None),
("", None),
("my_namespace", "my_namespace"),
]
)
def test_pinecone_index_upsert_and_delete(mock_describe_index, namespace, called_with):
indexer = create_pinecone_indexer(namespace)
indexer._pod_type = "p1"
indexer.index(
[
Expand All @@ -53,14 +61,15 @@ def test_pinecone_index_upsert_and_delete(mock_describe_index):
],
["delete_id1", "delete_id2"],
)
indexer.pinecone_index.delete.assert_called_with(filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}})
indexer.pinecone_index.delete.assert_called_with(filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}}, namespace=called_with)
indexer.pinecone_index.upsert.assert_called_with(
vectors=(
(ANY, [1, 2, 3], {"_ab_stream": "abc", "text": "test"}),
(ANY, [4, 5, 6], {"_ab_stream": "abc", "text": "test2"}),
),
async_req=True,
show_progress=False,
namespace=called_with,
)


Expand All @@ -86,6 +95,7 @@ def test_pinecone_index_upsert_and_delete_starter(mock_describe_index):
),
async_req=True,
show_progress=False,
namespace=None,
)


Expand Down Expand Up @@ -164,10 +174,18 @@ def generate_catalog():
)


def test_pinecone_pre_sync(mock_describe_index):
indexer = create_pinecone_indexer()
@pytest.mark.parametrize(
"namespace, called_with",
[
(None, None),
("", None),
("my_namespace", "my_namespace"),
]
)
def test_pinecone_pre_sync(mock_describe_index, namespace, called_with):
indexer = create_pinecone_indexer(namespace)
indexer.pre_sync(generate_catalog())
indexer.pinecone_index.delete.assert_called_with(filter={"_ab_stream": "example_stream2"})
indexer.pinecone_index.delete.assert_called_with(filter={"_ab_stream": "example_stream2"}, namespace=called_with)


def test_pinecone_pre_sync_starter(mock_describe_index):
Expand All @@ -180,21 +198,24 @@ def test_pinecone_pre_sync_starter(mock_describe_index):


@pytest.mark.parametrize(
"describe_throws,reported_dimensions,check_succeeds",
"describe_throws,reported_dimensions,pod_type,namespace,check_succeeds",
[
(False, 3, True),
(False, 4, False),
(True, 3, False),
(True, 4, False),
(False, 3, "p1", None, True),
(False, 4, "p1", None, False),
(True, 3, "p1", None, False),
(True, 4, "p1", None, False),
(False, 3, "starter", None, True),
(False, 3, "starter", "", True),
(False, 3, "starter", "my_namespace", False),
],
)
@patch("pinecone.describe_index")
def test_pinecone_check(describe_mock, describe_throws, reported_dimensions, check_succeeds):
indexer = create_pinecone_indexer()
def test_pinecone_check(describe_mock, describe_throws, reported_dimensions, pod_type, namespace, check_succeeds):
indexer = create_pinecone_indexer(namespace)
indexer.embedding_dimensions = 3
if describe_throws:
describe_mock.side_effect = Exception("describe failed")
describe_mock.return_value = create_index_description(dimensions=reported_dimensions)
describe_mock.return_value = create_index_description(dimensions=reported_dimensions, pod_type=pod_type)
result = indexer.check()
if check_succeeds:
assert result is None
Expand Down Expand Up @@ -226,4 +247,5 @@ def test_metadata_normalization():
vectors=((ANY, [1, 2, 3], {"_ab_stream": "abc", "text": "test", "small": "a", "id": 1}),),
async_req=True,
show_progress=False,
namespace=None
)
1 change: 1 addition & 0 deletions docs/integrations/destinations/pinecone.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ OpenAI and Fake embeddings produce vectors with 1536 dimensions, and the Cohere

| Version | Date | Pull Request | Subject |
|:--------| :--------- |:--------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.0.14 | 2023-09-28 | [#30649](https://github.com/airbytehq/airbyte/pull/30649) | Support namespaces |
| 0.0.13 | 2023-09-26 | [#30649](https://github.com/airbytehq/airbyte/pull/30649) | Allow more text splitting options |
| 0.0.12 | 2023-09-25 | [#30649](https://github.com/airbytehq/airbyte/pull/30649) | Fix bug with stale documents left on starter pods |
| 0.0.11 | 2023-09-22 | [#30649](https://github.com/airbytehq/airbyte/pull/30649) | Set visible certified flag |
Expand Down

0 comments on commit bc37daf

Please sign in to comment.