Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌟 [destination-pinecone]: Add support for Pinecone Serverless #37756

Merged
merged 15 commits into from
May 6, 2024
Merged
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -24,28 +24,44 @@ class DestinationPinecone(Destination):
embedder: Embedder

def _init_indexer(self, config: ConfigModel):
self.embedder = create_from_config(config.embedding, config.processing)
self.indexer = PineconeIndexer(config.indexing, self.embedder.embedding_dimensions)
try:
self.embedder = create_from_config(config.embedding, config.processing)
self.indexer = PineconeIndexer(config.indexing, self.embedder.embedding_dimensions)
except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=str(e))

def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
config_model = ConfigModel.parse_obj(config)
self._init_indexer(config_model)
writer = Writer(
config_model.processing, self.indexer, self.embedder, batch_size=BATCH_SIZE, omit_raw_text=config_model.omit_raw_text
)
yield from writer.write(configured_catalog, input_messages)
try:
config_model = ConfigModel.parse_obj(config)
self._init_indexer(config_model)
writer = Writer(
config_model.processing, self.indexer, self.embedder, batch_size=BATCH_SIZE, omit_raw_text=config_model.omit_raw_text
)
yield from writer.write(configured_catalog, input_messages)
except Exception as e:
yield AirbyteMessage(type="LOG", log=AirbyteLogger(level="ERROR", message=str(e)))

def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
parsed_config = ConfigModel.parse_obj(config)
self._init_indexer(parsed_config)
checks = [self.embedder.check(), self.indexer.check(), DocumentProcessor.check_config(parsed_config.processing)]
errors = [error for error in checks if error is not None]
if len(errors) > 0:
return AirbyteConnectionStatus(status=Status.FAILED, message="\n".join(errors))
else:
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
try:
parsed_config = ConfigModel.parse_obj(config)
init_status = self._init_indexer(parsed_config)
if init_status and init_status.status == Status.FAILED:
logger.error(f"Initialization failed with message: {init_status.message}")
return init_status # Return the failure status immediately if initialization fails

checks = [self.embedder.check(), self.indexer.check(), DocumentProcessor.check_config(parsed_config.processing)]
errors = [error for error in checks if error is not None]
if len(errors) > 0:
error_message = "\n".join(errors)
logger.error(f"Configuration check failed: {error_message}")
return AirbyteConnectionStatus(status=Status.FAILED, message=error_message)
else:
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
logger.error(f"Exception during configuration check: {str(e)}")
return AirbyteConnectionStatus(status=Status.FAILED, message=str(e))

def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
return ConnectorSpecification(
Original file line number Diff line number Diff line change
@@ -5,13 +5,15 @@
import uuid
from typing import Optional

import pinecone
import urllib3
from airbyte_cdk.destinations.vector_db_based.document_processor import METADATA_RECORD_ID_FIELD, METADATA_STREAM_FIELD
from airbyte_cdk.destinations.vector_db_based.indexer import Indexer
from airbyte_cdk.destinations.vector_db_based.utils import create_chunks, create_stream_identifier, format_exception
from airbyte_cdk.models import AirbyteConnectionStatus, Status
from airbyte_cdk.models.airbyte_protocol import ConfiguredAirbyteCatalog, DestinationSyncMode
from destination_pinecone.config import PineconeIndexingModel
from pinecone import PineconeException
from pinecone.grpc import PineconeGRPC

# large enough to speed up processing, small enough to not hit pinecone request limits
PINECONE_BATCH_SIZE = 40
@@ -29,29 +31,48 @@ class PineconeIndexer(Indexer):

def __init__(self, config: PineconeIndexingModel, embedding_dimensions: int):
super().__init__(config)
pinecone.init(api_key=config.pinecone_key, environment=config.pinecone_environment, threaded=True)
try:
self.pc = PineconeGRPC(api_key=config.pinecone_key, threaded=True)
except PineconeException as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=str(e))

self.pinecone_index = pinecone.GRPCIndex(config.index)
self.pinecone_index = self.pc.Index(config.index)
self.embedding_dimensions = embedding_dimensions

def determine_spec_type(self, index_name):
description = self.pc.describe_index(index_name)
spec_keys = description.get("spec", {})
if "pod" in spec_keys:
return "pod"
elif "serverless" in spec_keys:
return "serverless"
else:
raise ValueError("Unknown index specification type.")

def pre_sync(self, catalog: ConfiguredAirbyteCatalog):
index_description = pinecone.describe_index(self.config.index)
self._pod_type = index_description.pod_type
self._pod_type = self.determine_spec_type(self.config.index)

for stream in catalog.streams:
stream_identifier = create_stream_identifier(stream.stream)
if stream.destination_sync_mode == DestinationSyncMode.overwrite:
self.delete_vectors(
filter={METADATA_STREAM_FIELD: create_stream_identifier(stream.stream)}, namespace=stream.stream.namespace
filter={METADATA_STREAM_FIELD: stream_identifier}, namespace=stream.stream.namespace, prefix=stream_identifier
)

def post_sync(self):
return []

def delete_vectors(self, filter, namespace=None):
def delete_vectors(self, filter, namespace=None, prefix=None):
if self._pod_type == "starter":
# Starter pod types have a maximum of 100000 rows
top_k = 10000
self.delete_by_metadata(filter, top_k, namespace)
elif self._pod_type == "serverless":
if prefix == None:
raise ValueError("Prefix is required for a serverless index.")
self.delete_by_prefix(prefix=prefix, namespace=namespace)
else:
# Pod spec
self.pinecone_index.delete(filter=filter, namespace=namespace)

def delete_by_metadata(self, filter, top_k, namespace=None):
@@ -66,6 +87,10 @@ def delete_by_metadata(self, filter, top_k, namespace=None):
self.pinecone_index.delete(ids=list(batch), namespace=namespace)
query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k, namespace=namespace)

def delete_by_prefix(self, prefix, namespace=None):
for ids in self.pinecone_index.list(prefix=prefix, namespace=namespace):
self.pinecone_index.delete(ids=ids, namespace=namespace)
bindipankhudi marked this conversation as resolved.
Show resolved Hide resolved

def _truncate_metadata(self, metadata: dict) -> dict:
"""
Normalize metadata to ensure it is within the size limit and doesn't contain complex objects.
@@ -85,34 +110,45 @@ def _truncate_metadata(self, metadata: dict) -> dict:

return result

def index(self, document_chunks, namespace, stream):
def index(self, document_chunks, namespace, streamName):
pinecone_docs = []
for i in range(len(document_chunks)):
chunk = document_chunks[i]
metadata = self._truncate_metadata(chunk.metadata)
if chunk.page_content is not None:
metadata["text"] = chunk.page_content
pinecone_docs.append((str(uuid.uuid4()), chunk.embedding, metadata))
prefix = streamName
pinecone_docs.append((prefix + "#" + str(uuid.uuid4()), chunk.embedding, metadata))
serial_batches = create_chunks(pinecone_docs, batch_size=PINECONE_BATCH_SIZE * PARALLELISM_LIMIT)
for batch in serial_batches:
async_results = [
self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False, namespace=namespace)
for ids_vectors_chunk in create_chunks(batch, batch_size=PINECONE_BATCH_SIZE)
]
async_results = []
for ids_vectors_chunk in create_chunks(batch, batch_size=PINECONE_BATCH_SIZE):
async_result = self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False)
async_results.append(async_result)
# Wait for and retrieve responses (this raises in case of error)
[async_result.result() for async_result in async_results]

def delete(self, delete_ids, namespace, stream):
filter = {METADATA_RECORD_ID_FIELD: {"$in": delete_ids}}
if len(delete_ids) > 0:
self.delete_vectors(filter={METADATA_RECORD_ID_FIELD: {"$in": delete_ids}}, namespace=namespace)
if self._pod_type == "starter":
# Starter pod types have a maximum of 100000 rows
top_k = 10000
self.delete_by_metadata(filter=filter, top_k=top_k, namespace=namespace)
elif self._pod_type == "serverless":
self.pinecone_index.delete(ids=delete_ids, namespace=namespace)
else:
# Pod spec
self.pinecone_index.delete(filter=filter, namespace=namespace)

def check(self) -> Optional[str]:
try:
indexes = pinecone.list_indexes()
if self.config.index not in indexes:
list = self.pc.list_indexes()
index_names = [index["name"] for index in list.indexes]
if self.config.index not in index_names:
return f"Index {self.config.index} does not exist in environment {self.config.pinecone_environment}."

description = pinecone.describe_index(self.config.index)
description = self.pc.describe_index(self.config.index)
actual_dimension = int(description.dimension)
if actual_dimension != self.embedding_dimensions:
return f"Your embedding configuration will produce vectors with dimension {self.embedding_dimensions:d}, but your index is configured with dimension {actual_dimension:d}. Make sure embedding and indexing configurations match."
@@ -121,7 +157,7 @@ def check(self) -> Optional[str]:
if f"Failed to resolve 'controller.{self.config.pinecone_environment}.pinecone.io'" in str(e.reason):
return f"Failed to resolve environment, please check whether {self.config.pinecone_environment} is correct."

if isinstance(e, pinecone.exceptions.UnauthorizedException):
if isinstance(e, PineconeException):
if e.body:
return e.body

Original file line number Diff line number Diff line change
@@ -4,33 +4,54 @@

import json
import logging
import time

import pinecone
from airbyte_cdk.destinations.vector_db_based.embedder import OPEN_AI_VECTOR_SIZE
from airbyte_cdk.destinations.vector_db_based.test_utils import BaseIntegrationTest
from airbyte_cdk.models import DestinationSyncMode, Status
from destination_pinecone.destination import DestinationPinecone
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Pinecone
from pinecone import Pinecone as PineconeREST
from pinecone import PineconeException
from pinecone.grpc import PineconeGRPC


class PineconeIntegrationTest(BaseIntegrationTest):
def _init_pinecone(self):
pinecone.init(api_key=self.config["indexing"]["pinecone_key"], environment=self.config["indexing"]["pinecone_environment"])
self.pinecone_index = pinecone.Index(self.config["indexing"]["index"])

self.pc = PineconeGRPC(api_key=self.config["indexing"]["pinecone_key"])
self.pinecone_index = self.pc.Index(self.config["indexing"]["index"])
self.pc_rest = PineconeREST(api_key=self.config["indexing"]["pinecone_key"])
self.pinecone_index_rest = self.pc_rest.Index(name=self.config["indexing"]["index"])

def _wait(self):
print("Waiting for Pinecone...", end='', flush=True)
for i in range(15):
time.sleep(1)
print(".", end='', flush=True)
print() # Move to the next line after the loop

def setUp(self):
with open("secrets/config.json", "r") as f:
self.config = json.loads(f.read())
self._init_pinecone()
# self.pinecone_index.delete(delete_all=True)
bindipankhudi marked this conversation as resolved.
Show resolved Hide resolved

def tearDown(self):
# make sure pinecone is initialized correctly before cleaning up
self._wait()
# make sure pinecone is initialized correctly before cleaning up
self._init_pinecone()
self.pinecone_index.delete(delete_all=True)
try:
self.pinecone_index.delete(delete_all=True)
except PineconeException as e:
if "Namespace not found" not in str(e):
raise(e)
else :
print("Noting to delete. No data in the index/namespace.")


def test_check_valid_config(self):
outcome = DestinationPinecone().check(logging.getLogger("airbyte"), self.config)
outcome = DestinationPinecone().check(logging.getLogger("airbyte"), self.config)
assert outcome.status == Status.SUCCEEDED

def test_check_invalid_config(self):
@@ -43,10 +64,11 @@ def test_check_invalid_config(self):
"mode": "pinecone",
"pinecone_key": "mykey",
"index": "testdata",
"pinecone_environment": "asia-southeast1-gcp-free",
"pinecone_environment": "us-west1-gcp",
},
},
)

assert outcome.status == Status.FAILED

def test_write(self):
@@ -57,14 +79,21 @@ def test_write(self):
# initial sync
destination = DestinationPinecone()
list(destination.write(self.config, catalog, [*first_record_chunk, first_state_message]))


self._wait()
assert self.pinecone_index.describe_index_stats().total_vector_count == 5

# incrementalally update a doc
incremental_catalog = self._get_configured_catalog(DestinationSyncMode.append_dedup)
list(destination.write(self.config, incremental_catalog, [self._record("mystream", "Cats are nice", 2), first_state_message]))

self._wait()

result = self.pinecone_index.query(
vector=[0] * OPEN_AI_VECTOR_SIZE, top_k=10, filter={"_ab_record_id": "mystream_2"}, include_metadata=True
)

assert len(result.matches) == 1
assert (
result.matches[0].metadata["text"] == "str_col: Cats are nice"
@@ -73,6 +102,6 @@ def test_write(self):
# test langchain integration
embeddings = OpenAIEmbeddings(openai_api_key=self.config["embedding"]["openai_key"])
self._init_pinecone()
vector_store = Pinecone(self.pinecone_index, embeddings.embed_query, "text")
vector_store = Pinecone(self.pinecone_index_rest, embeddings.embed_query, "text")
result = vector_store.similarity_search("feline animals", 1)
assert result[0].metadata["_ab_record_id"] == "mystream_2"
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ data:
connectorSubtype: vectorstore
connectorType: destination
definitionId: 3d2b6f84-7f0d-4e3f-a5e5-7c7d4b50eabd
dockerImageTag: 0.0.24
dockerImageTag: 0.0.25
bindipankhudi marked this conversation as resolved.
Show resolved Hide resolved
dockerRepository: airbyte/destination-pinecone
documentationUrl: https://docs.airbyte.com/integrations/destinations/pinecone
githubIssueLabel: destination-pinecone
Loading