From d6dc857cc676cb43da24a1a4b4f22648c1f19742 Mon Sep 17 00:00:00 2001 From: Roie Schwaber-Cohen Date: Thu, 25 Apr 2024 16:30:52 -0700 Subject: [PATCH 1/3] updating to serverless --- .../destination_pinecone/indexer.py | 57 ++++++++++--- .../pinecone_integration_test.py | 8 +- .../destination-pinecone/poetry.lock | 83 ++++--------------- .../destination-pinecone/pyproject.toml | 4 +- 4 files changed, 65 insertions(+), 87 deletions(-) diff --git a/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py b/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py index c09269f20268..91c9fef9d707 100644 --- a/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py +++ b/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py @@ -5,7 +5,7 @@ import uuid from typing import Optional -import pinecone +from pinecone import PineconeGRPC import urllib3 from airbyte_cdk.destinations.vector_db_based.document_processor import METADATA_RECORD_ID_FIELD, METADATA_STREAM_FIELD from airbyte_cdk.destinations.vector_db_based.indexer import Indexer @@ -29,29 +29,46 @@ class PineconeIndexer(Indexer): def __init__(self, config: PineconeIndexingModel, embedding_dimensions: int): super().__init__(config) - pinecone.init(api_key=config.pinecone_key, environment=config.pinecone_environment, threaded=True) - - self.pinecone_index = pinecone.GRPCIndex(config.index) + self.pc = PineconeGRPC(api_key=config.pinecone_key, threaded=True) + self.pinecone_index = self.pc.Index(config.index) self.embedding_dimensions = embedding_dimensions + + def determine_spec_type(self, index_name): + description = self.pc.describe_index(index_name) + spec_keys = description.get('spec', {}) + if 'pod' in spec_keys: + return 'pod' + elif 'serverless' in spec_keys: + return 'serverless' + else: + raise ValueError("Unknown index specification type.") def pre_sync(self, catalog: ConfiguredAirbyteCatalog): - index_description = pinecone.describe_index(self.config.index) - self._pod_type = index_description.pod_type + index_description = self.pc.describe_index(self.config.index) + self._pod_type = self.determine_spec_type(self.config.index) + stream_identifier = create_stream_identifier(stream.stream) for stream in catalog.streams: if stream.destination_sync_mode == DestinationSyncMode.overwrite: self.delete_vectors( - filter={METADATA_STREAM_FIELD: create_stream_identifier(stream.stream)}, namespace=stream.stream.namespace + filter={METADATA_STREAM_FIELD: stream_identifier}, namespace=stream.stream.namespace, prefix=stream_identifier ) def post_sync(self): return [] + - def delete_vectors(self, filter, namespace=None): + + def delete_vectors(self, filter, namespace=None, prefix=None): if self._pod_type == "starter": # Starter pod types have a maximum of 100000 rows top_k = 10000 self.delete_by_metadata(filter, top_k, namespace) + elif self._pod_type == "serverless": + if prefix == None: + raise ValueError("Prefix is required for a serverless index.") + self.delete_by_prefix(prefix=prefix, namespace=namespace) else: + # Pod spec self.pinecone_index.delete(filter=filter, namespace=namespace) def delete_by_metadata(self, filter, top_k, namespace=None): @@ -66,6 +83,10 @@ def delete_by_metadata(self, filter, top_k, namespace=None): self.pinecone_index.delete(ids=list(batch), namespace=namespace) query_result = self.pinecone_index.query(vector=zero_vector, filter=filter, top_k=top_k, namespace=namespace) + def delete_by_prefix(self, prefix, namespace=None): + for ids in self.pinecone_index.list(prefix=prefix, namespace=namespace): + self.pinecone_index.delete(ids=ids, namespace=namespace) + def _truncate_metadata(self, metadata: dict) -> dict: """ Normalize metadata to ensure it is within the size limit and doesn't contain complex objects. @@ -92,7 +113,8 @@ def index(self, document_chunks, namespace, stream): metadata = self._truncate_metadata(chunk.metadata) if chunk.page_content is not None: metadata["text"] = chunk.page_content - pinecone_docs.append((str(uuid.uuid4()), chunk.embedding, metadata)) + prefix = create_stream_identifier(stream.stream) + pinecone_docs.append((prefix + "#" + str(uuid.uuid4()), chunk.embedding, metadata)) serial_batches = create_chunks(pinecone_docs, batch_size=PINECONE_BATCH_SIZE * PARALLELISM_LIMIT) for batch in serial_batches: async_results = [ @@ -103,16 +125,25 @@ def index(self, document_chunks, namespace, stream): [async_result.result() for async_result in async_results] def delete(self, delete_ids, namespace, stream): + filter = {METADATA_RECORD_ID_FIELD: {"$in": delete_ids}} if len(delete_ids) > 0: - self.delete_vectors(filter={METADATA_RECORD_ID_FIELD: {"$in": delete_ids}}, namespace=namespace) + if self._pod_type == "starter": + # Starter pod types have a maximum of 100000 rows + top_k = 10000 + self.delete_by_metadata(filter=filter, top_k=top_k, namespace=namespace) + elif self._pod_type == "serverless": + self.pinecone_index.delete(ids=delete_ids, namespace=namespace) + else: + # Pod spec + self.pinecone_index.delete(filter=filter, namespace=namespace) def check(self) -> Optional[str]: try: - indexes = pinecone.list_indexes() + indexes = self.pc.list_indexes() if self.config.index not in indexes: return f"Index {self.config.index} does not exist in environment {self.config.pinecone_environment}." - description = pinecone.describe_index(self.config.index) + description = self.pc.describe_index(self.config.index) actual_dimension = int(description.dimension) if actual_dimension != self.embedding_dimensions: return f"Your embedding configuration will produce vectors with dimension {self.embedding_dimensions:d}, but your index is configured with dimension {actual_dimension:d}. Make sure embedding and indexing configurations match." @@ -121,7 +152,7 @@ def check(self) -> Optional[str]: if f"Failed to resolve 'controller.{self.config.pinecone_environment}.pinecone.io'" in str(e.reason): return f"Failed to resolve environment, please check whether {self.config.pinecone_environment} is correct." - if isinstance(e, pinecone.exceptions.UnauthorizedException): + if isinstance(e, self.pc.exceptions.UnauthorizedException): if e.body: return e.body diff --git a/airbyte-integrations/connectors/destination-pinecone/integration_tests/pinecone_integration_test.py b/airbyte-integrations/connectors/destination-pinecone/integration_tests/pinecone_integration_test.py index b70232356dd8..2d7c7518f5e7 100644 --- a/airbyte-integrations/connectors/destination-pinecone/integration_tests/pinecone_integration_test.py +++ b/airbyte-integrations/connectors/destination-pinecone/integration_tests/pinecone_integration_test.py @@ -5,7 +5,7 @@ import json import logging -import pinecone +from pinecone import PineconeGRPC from airbyte_cdk.destinations.vector_db_based.embedder import OPEN_AI_VECTOR_SIZE from airbyte_cdk.destinations.vector_db_based.test_utils import BaseIntegrationTest from airbyte_cdk.models import DestinationSyncMode, Status @@ -16,8 +16,8 @@ class PineconeIntegrationTest(BaseIntegrationTest): def _init_pinecone(self): - pinecone.init(api_key=self.config["indexing"]["pinecone_key"], environment=self.config["indexing"]["pinecone_environment"]) - self.pinecone_index = pinecone.Index(self.config["indexing"]["index"]) + self.pc = PineconeGRPC(api_key=self.config["indexing"]["pinecone_key"]) + self.pinecone_index = self.pc.Index(self.config["indexing"]["index"]) def setUp(self): with open("secrets/config.json", "r") as f: @@ -43,7 +43,7 @@ def test_check_invalid_config(self): "mode": "pinecone", "pinecone_key": "mykey", "index": "testdata", - "pinecone_environment": "asia-southeast1-gcp-free", + "pinecone_environment": "us-west1-gcp", }, }, ) diff --git a/airbyte-integrations/connectors/destination-pinecone/poetry.lock b/airbyte-integrations/connectors/destination-pinecone/poetry.lock index a3da1fd378a4..c0dbac29db30 100644 --- a/airbyte-integrations/connectors/destination-pinecone/poetry.lock +++ b/airbyte-integrations/connectors/destination-pinecone/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "aiohttp" @@ -503,26 +503,6 @@ wrapt = ">=1.10,<2" [package.extras] dev = ["PyTest", "PyTest-Cov", "bump2version (<1)", "sphinx (<2)", "tox"] -[[package]] -name = "dnspython" -version = "2.6.1" -description = "DNS toolkit" -optional = false -python-versions = ">=3.8" -files = [ - {file = "dnspython-2.6.1-py3-none-any.whl", hash = "sha256:5ef3b9680161f6fa89daf8ad451b5f1a33b18ae8a1c6778cdf4b43f08c0a6e50"}, - {file = "dnspython-2.6.1.tar.gz", hash = "sha256:e8f0f9c23a7b7cb99ded64e6c3a6f3e701d78f50c55e002b839dea7225cff7cc"}, -] - -[package.extras] -dev = ["black (>=23.1.0)", "coverage (>=7.0)", "flake8 (>=7)", "mypy (>=1.8)", "pylint (>=3)", "pytest (>=7.4)", "pytest-cov (>=4.1.0)", "sphinx (>=7.2.0)", "twine (>=4.0.0)", "wheel (>=0.42.0)"] -dnssec = ["cryptography (>=41)"] -doh = ["h2 (>=4.1.0)", "httpcore (>=1.0.0)", "httpx (>=0.26.0)"] -doq = ["aioquic (>=0.9.25)"] -idna = ["idna (>=3.6)"] -trio = ["trio (>=0.23)"] -wmi = ["wmi (>=1.5.1)"] - [[package]] name = "dpath" version = "2.0.8" @@ -1321,24 +1301,6 @@ orjson = ">=3.9.14,<4.0.0" pydantic = ">=1,<3" requests = ">=2,<3" -[[package]] -name = "loguru" -version = "0.7.2" -description = "Python logging made (stupidly) simple" -optional = false -python-versions = ">=3.5" -files = [ - {file = "loguru-0.7.2-py3-none-any.whl", hash = "sha256:003d71e3d3ed35f0f8984898359d65b79e5b21943f78af86aa5491210429b8eb"}, - {file = "loguru-0.7.2.tar.gz", hash = "sha256:e671a53522515f34fd406340ee968cb9ecafbc4b36c679da03c18fd8d0bd51ac"}, -] - -[package.dependencies] -colorama = {version = ">=0.3.4", markers = "sys_platform == \"win32\""} -win32-setctime = {version = ">=1.0.0", markers = "sys_platform == \"win32\""} - -[package.extras] -dev = ["Sphinx (==7.2.5)", "colorama (==0.4.5)", "colorama (==0.4.6)", "exceptiongroup (==1.1.3)", "freezegun (==1.1.0)", "freezegun (==1.2.2)", "mypy (==v0.910)", "mypy (==v0.971)", "mypy (==v1.4.1)", "mypy (==v1.5.1)", "pre-commit (==3.4.0)", "pytest (==6.1.2)", "pytest (==7.4.0)", "pytest-cov (==2.12.1)", "pytest-cov (==4.1.0)", "pytest-mypy-plugins (==1.9.3)", "pytest-mypy-plugins (==3.0.0)", "sphinx-autobuild (==2021.3.14)", "sphinx-rtd-theme (==1.3.0)", "tox (==3.27.1)", "tox (==4.11.0)"] - [[package]] name = "lz4" version = "4.3.3" @@ -1852,6 +1814,7 @@ optional = false python-versions = ">=3.9" files = [ {file = "pandas-2.2.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:90c6fca2acf139569e74e8781709dccb6fe25940488755716d1d354d6bc58bce"}, + {file = "pandas-2.2.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c7adfc142dac335d8c1e0dcbd37eb8617eac386596eb9e1a1b77791cf2498238"}, {file = "pandas-2.2.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4abfe0be0d7221be4f12552995e58723c7422c80a659da13ca382697de830c08"}, {file = "pandas-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8635c16bf3d99040fdf3ca3db669a7250ddf49c55dc4aa8fe0ae0fa8d6dcc1f0"}, {file = "pandas-2.2.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:40ae1dffb3967a52203105a077415a86044a2bea011b5f321c6aa64b379a3f51"}, @@ -1872,6 +1835,7 @@ files = [ {file = "pandas-2.2.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:43498c0bdb43d55cb162cdc8c06fac328ccb5d2eabe3cadeb3529ae6f0517c32"}, {file = "pandas-2.2.2-cp312-cp312-win_amd64.whl", hash = "sha256:d187d355ecec3629624fccb01d104da7d7f391db0311145817525281e2804d23"}, {file = "pandas-2.2.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:0ca6377b8fca51815f382bd0b697a0814c8bda55115678cbc94c30aacbb6eff2"}, + {file = "pandas-2.2.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:9057e6aa78a584bc93a13f0a9bf7e753a5e9770a30b4d758b8d5f2a62a9433cd"}, {file = "pandas-2.2.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:001910ad31abc7bf06f49dcc903755d2f7f3a9186c0c040b827e522e9cef0863"}, {file = "pandas-2.2.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:66b479b0bd07204e37583c191535505410daa8df638fd8e75ae1b383851fe921"}, {file = "pandas-2.2.2-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:a77e9d1c386196879aa5eb712e77461aaee433e54c68cf253053a73b7e49c33a"}, @@ -2051,33 +2015,31 @@ xmp = ["defusedxml"] [[package]] name = "pinecone-client" -version = "2.2.4" +version = "3.1.0" description = "Pinecone client and SDK" optional = false -python-versions = ">=3.8" +python-versions = ">=3.8,<4.0" files = [ - {file = "pinecone-client-2.2.4.tar.gz", hash = "sha256:2c1cc1d6648b2be66e944db2ffa59166a37b9164d1135ad525d9cd8b1e298168"}, - {file = "pinecone_client-2.2.4-py3-none-any.whl", hash = "sha256:5bf496c01c2f82f4e5c2dc977cc5062ecd7168b8ed90743b09afcc8c7eb242ec"}, + {file = "pinecone_client-3.1.0-py3-none-any.whl", hash = "sha256:66dfe9859ed5b3412c3b59c68c9706c0f522cafd1a15c5d05e28d5664c2c48a4"}, + {file = "pinecone_client-3.1.0.tar.gz", hash = "sha256:45b8206013f91a982b994f1fbaa39e7e8c99d30ef3778a9f319c43b8c992fc42"}, ] [package.dependencies] -dnspython = ">=2.0.0" +certifi = ">=2019.11.17" googleapis-common-protos = {version = ">=1.53.0", optional = true, markers = "extra == \"grpc\""} grpc-gateway-protoc-gen-openapiv2 = {version = "0.1.0", optional = true, markers = "extra == \"grpc\""} -grpcio = {version = ">=1.44.0", optional = true, markers = "extra == \"grpc\""} -loguru = ">=0.5.0" +grpcio = [ + {version = ">=1.44.0", optional = true, markers = "python_version >= \"3.8\" and python_version < \"3.11\" and extra == \"grpc\""}, + {version = ">=1.59.0", optional = true, markers = "python_version >= \"3.11\" and python_version < \"4.0\" and extra == \"grpc\""}, +] lz4 = {version = ">=3.1.3", optional = true, markers = "extra == \"grpc\""} -numpy = ">=1.22.0" protobuf = {version = ">=3.20.0,<3.21.0", optional = true, markers = "extra == \"grpc\""} -python-dateutil = ">=2.5.3" -pyyaml = ">=5.4" -requests = ">=2.19.0" tqdm = ">=4.64.1" typing-extensions = ">=3.7.4" -urllib3 = ">=1.21.1" +urllib3 = {version = ">=1.26.0", markers = "python_version >= \"3.8\" and python_version < \"3.12\""} [package.extras] -grpc = ["googleapis-common-protos (>=1.53.0)", "grpc-gateway-protoc-gen-openapiv2 (==0.1.0)", "grpcio (>=1.44.0)", "lz4 (>=3.1.3)", "protobuf (>=3.20.0,<3.21.0)"] +grpc = ["googleapis-common-protos (>=1.53.0)", "grpc-gateway-protoc-gen-openapiv2 (==0.1.0)", "grpcio (>=1.44.0)", "grpcio (>=1.59.0)", "lz4 (>=3.1.3)", "protobuf (>=3.20.0,<3.21.0)"] [[package]] name = "platformdirs" @@ -2360,7 +2322,6 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -2966,20 +2927,6 @@ files = [ [package.dependencies] bracex = ">=2.1.1" -[[package]] -name = "win32-setctime" -version = "1.1.0" -description = "A small Python utility to set file creation time on Windows" -optional = false -python-versions = ">=3.5" -files = [ - {file = "win32_setctime-1.1.0-py3-none-any.whl", hash = "sha256:231db239e959c2fe7eb1d7dc129f11172354f98361c4fa2d6d2d7e278baa8aad"}, - {file = "win32_setctime-1.1.0.tar.gz", hash = "sha256:15cf5750465118d6929ae4de4eb46e8edae9a5634350c01ba582df868e932cb2"}, -] - -[package.extras] -dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"] - [[package]] name = "wrapt" version = "1.16.0" @@ -3180,4 +3127,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.9,<3.12" -content-hash = "d889a207a4f74be7d21811863564840a0253e405684ebbc8806435fc502e563d" +content-hash = "b591b1161342e3b610bf746fd4b730f525d93ec3f073236a0472fa429ae1561c" diff --git a/airbyte-integrations/connectors/destination-pinecone/pyproject.toml b/airbyte-integrations/connectors/destination-pinecone/pyproject.toml index e5201b7aa74c..95e5e0500b45 100644 --- a/airbyte-integrations/connectors/destination-pinecone/pyproject.toml +++ b/airbyte-integrations/connectors/destination-pinecone/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "airbyte-destination-pinecone" -version = "0.0.24" +version = "0.0.25" description = "Airbyte destination implementation for Pinecone." authors = ["Airbyte "] license = "MIT" @@ -19,7 +19,7 @@ include = "destination_pinecone" [tool.poetry.dependencies] python = "^3.9,<3.12" airbyte-cdk = {version = "0.81.6", extras = ["vector-db-based"]} -pinecone-client = {version = "2.2.4", extras = ["grpc"]} +pinecone-client = {version = "3.1.0", extras = ["grpc"]} [tool.poetry.group.dev.dependencies] pytest = "^7.2" From c7945efbe7a5b5247d2fd21c645f0c9a15bc2fa8 Mon Sep 17 00:00:00 2001 From: Roie Schwaber-Cohen Date: Fri, 26 Apr 2024 12:27:00 -0700 Subject: [PATCH 2/3] fixed test --- .../destination_pinecone/destination.py | 52 ++++++++++++------- .../destination_pinecone/indexer.py | 36 ++++++++----- .../pinecone_integration_test.py | 33 +++++++++--- 3 files changed, 83 insertions(+), 38 deletions(-) diff --git a/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/destination.py b/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/destination.py index a8299e0e2710..c14a5489b7d3 100644 --- a/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/destination.py +++ b/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/destination.py @@ -24,28 +24,40 @@ class DestinationPinecone(Destination): embedder: Embedder def _init_indexer(self, config: ConfigModel): - self.embedder = create_from_config(config.embedding, config.processing) - self.indexer = PineconeIndexer(config.indexing, self.embedder.embedding_dimensions) - - def write( - self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage] - ) -> Iterable[AirbyteMessage]: - config_model = ConfigModel.parse_obj(config) - self._init_indexer(config_model) - writer = Writer( - config_model.processing, self.indexer, self.embedder, batch_size=BATCH_SIZE, omit_raw_text=config_model.omit_raw_text - ) - yield from writer.write(configured_catalog, input_messages) + try: + self.embedder = create_from_config(config.embedding, config.processing) + self.indexer = PineconeIndexer(config.indexing, self.embedder.embedding_dimensions) + except Exception as e: + return AirbyteConnectionStatus(status=Status.FAILED, message=str(e)) + + def write(self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]) -> Iterable[AirbyteMessage]: + try: + config_model = ConfigModel.parse_obj(config) + self._init_indexer(config_model) + writer = Writer(config_model.processing, self.indexer, self.embedder, batch_size=BATCH_SIZE, omit_raw_text=config_model.omit_raw_text) + yield from writer.write(configured_catalog, input_messages) + except Exception as e: + yield AirbyteMessage(type='LOG', log=AirbyteLogger(level='ERROR', message=str(e))) def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: - parsed_config = ConfigModel.parse_obj(config) - self._init_indexer(parsed_config) - checks = [self.embedder.check(), self.indexer.check(), DocumentProcessor.check_config(parsed_config.processing)] - errors = [error for error in checks if error is not None] - if len(errors) > 0: - return AirbyteConnectionStatus(status=Status.FAILED, message="\n".join(errors)) - else: - return AirbyteConnectionStatus(status=Status.SUCCEEDED) + try: + parsed_config = ConfigModel.parse_obj(config) + init_status = self._init_indexer(parsed_config) + if init_status and init_status.status == Status.FAILED: + logger.error(f"Initialization failed with message: {init_status.message}") + return init_status # Return the failure status immediately if initialization fails + + checks = [self.embedder.check(), self.indexer.check(), DocumentProcessor.check_config(parsed_config.processing)] + errors = [error for error in checks if error is not None] + if len(errors) > 0: + error_message = "\n".join(errors) + logger.error(f"Configuration check failed: {error_message}") + return AirbyteConnectionStatus(status=Status.FAILED, message=error_message) + else: + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + except Exception as e: + logger.error(f"Exception during configuration check: {str(e)}") + return AirbyteConnectionStatus(status=Status.FAILED, message=str(e)) def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification: return ConnectorSpecification( diff --git a/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py b/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py index 91c9fef9d707..7761b7293acd 100644 --- a/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py +++ b/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py @@ -5,12 +5,16 @@ import uuid from typing import Optional -from pinecone import PineconeGRPC +from pinecone.grpc import PineconeGRPC +from pinecone import PineconeException import urllib3 from airbyte_cdk.destinations.vector_db_based.document_processor import METADATA_RECORD_ID_FIELD, METADATA_STREAM_FIELD from airbyte_cdk.destinations.vector_db_based.indexer import Indexer from airbyte_cdk.destinations.vector_db_based.utils import create_chunks, create_stream_identifier, format_exception from airbyte_cdk.models.airbyte_protocol import ConfiguredAirbyteCatalog, DestinationSyncMode +from airbyte_cdk.models import AirbyteConnectionStatus, Status + + from destination_pinecone.config import PineconeIndexingModel # large enough to speed up processing, small enough to not hit pinecone request limits @@ -29,7 +33,11 @@ class PineconeIndexer(Indexer): def __init__(self, config: PineconeIndexingModel, embedding_dimensions: int): super().__init__(config) - self.pc = PineconeGRPC(api_key=config.pinecone_key, threaded=True) + try: + self.pc = PineconeGRPC(api_key=config.pinecone_key, threaded=True) + except PineconeException as e: + return AirbyteConnectionStatus(status=Status.FAILED, message=str(e)) + self.pinecone_index = self.pc.Index(config.index) self.embedding_dimensions = embedding_dimensions @@ -46,8 +54,9 @@ def determine_spec_type(self, index_name): def pre_sync(self, catalog: ConfiguredAirbyteCatalog): index_description = self.pc.describe_index(self.config.index) self._pod_type = self.determine_spec_type(self.config.index) - stream_identifier = create_stream_identifier(stream.stream) + for stream in catalog.streams: + stream_identifier = create_stream_identifier(stream.stream) if stream.destination_sync_mode == DestinationSyncMode.overwrite: self.delete_vectors( filter={METADATA_STREAM_FIELD: stream_identifier}, namespace=stream.stream.namespace, prefix=stream_identifier @@ -106,21 +115,22 @@ def _truncate_metadata(self, metadata: dict) -> dict: return result - def index(self, document_chunks, namespace, stream): + def index(self, document_chunks, namespace, streamName): pinecone_docs = [] for i in range(len(document_chunks)): chunk = document_chunks[i] metadata = self._truncate_metadata(chunk.metadata) if chunk.page_content is not None: - metadata["text"] = chunk.page_content - prefix = create_stream_identifier(stream.stream) + metadata["text"] = chunk.page_content + prefix = streamName pinecone_docs.append((prefix + "#" + str(uuid.uuid4()), chunk.embedding, metadata)) serial_batches = create_chunks(pinecone_docs, batch_size=PINECONE_BATCH_SIZE * PARALLELISM_LIMIT) for batch in serial_batches: - async_results = [ - self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False, namespace=namespace) - for ids_vectors_chunk in create_chunks(batch, batch_size=PINECONE_BATCH_SIZE) - ] + print(self.pinecone_index) + async_results = [] + for ids_vectors_chunk in create_chunks(batch, batch_size=PINECONE_BATCH_SIZE): + async_result = self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False) + async_results.append(async_result) # Wait for and retrieve responses (this raises in case of error) [async_result.result() for async_result in async_results] @@ -139,8 +149,10 @@ def delete(self, delete_ids, namespace, stream): def check(self) -> Optional[str]: try: - indexes = self.pc.list_indexes() - if self.config.index not in indexes: + list = self.pc.list_indexes() + index_names = [index['name'] for index in list.indexes] + + if self.config.index not in index_names: return f"Index {self.config.index} does not exist in environment {self.config.pinecone_environment}." description = self.pc.describe_index(self.config.index) diff --git a/airbyte-integrations/connectors/destination-pinecone/integration_tests/pinecone_integration_test.py b/airbyte-integrations/connectors/destination-pinecone/integration_tests/pinecone_integration_test.py index 2d7c7518f5e7..5eca7d92e139 100644 --- a/airbyte-integrations/connectors/destination-pinecone/integration_tests/pinecone_integration_test.py +++ b/airbyte-integrations/connectors/destination-pinecone/integration_tests/pinecone_integration_test.py @@ -4,8 +4,11 @@ import json import logging +import time -from pinecone import PineconeGRPC +from pinecone.grpc import PineconeGRPC +from pinecone import PineconeException +from pinecone import Pinecone as PineconeREST from airbyte_cdk.destinations.vector_db_based.embedder import OPEN_AI_VECTOR_SIZE from airbyte_cdk.destinations.vector_db_based.test_utils import BaseIntegrationTest from airbyte_cdk.models import DestinationSyncMode, Status @@ -18,19 +21,29 @@ class PineconeIntegrationTest(BaseIntegrationTest): def _init_pinecone(self): self.pc = PineconeGRPC(api_key=self.config["indexing"]["pinecone_key"]) self.pinecone_index = self.pc.Index(self.config["indexing"]["index"]) - + self.pc_rest = PineconeREST(api_key=self.config["indexing"]["pinecone_key"]) + self.pinecone_index_rest = self.pc_rest.Index(name=self.config["indexing"]["index"]) + + def _wait(self): + print("Waiting for Pinecone to index the data...", end='', flush=True) + for i in range(15): + time.sleep(1) + print(".", end='', flush=True) + print() # Move to the next line after the loop + def setUp(self): with open("secrets/config.json", "r") as f: self.config = json.loads(f.read()) self._init_pinecone() def tearDown(self): - # make sure pinecone is initialized correctly before cleaning up + self._wait() + # make sure pinecone is initialized correctly before cleaning up self._init_pinecone() - self.pinecone_index.delete(delete_all=True) + # self.pinecone_index.delete(delete_all=True, namespace="") def test_check_valid_config(self): - outcome = DestinationPinecone().check(logging.getLogger("airbyte"), self.config) + outcome = DestinationPinecone().check(logging.getLogger("airbyte"), self.config) assert outcome.status == Status.SUCCEEDED def test_check_invalid_config(self): @@ -47,6 +60,7 @@ def test_check_invalid_config(self): }, }, ) + assert outcome.status == Status.FAILED def test_write(self): @@ -57,14 +71,21 @@ def test_write(self): # initial sync destination = DestinationPinecone() list(destination.write(self.config, catalog, [*first_record_chunk, first_state_message])) + + + self._wait() assert self.pinecone_index.describe_index_stats().total_vector_count == 5 # incrementalally update a doc incremental_catalog = self._get_configured_catalog(DestinationSyncMode.append_dedup) list(destination.write(self.config, incremental_catalog, [self._record("mystream", "Cats are nice", 2), first_state_message])) + + self._wait() + result = self.pinecone_index.query( vector=[0] * OPEN_AI_VECTOR_SIZE, top_k=10, filter={"_ab_record_id": "mystream_2"}, include_metadata=True ) + assert len(result.matches) == 1 assert ( result.matches[0].metadata["text"] == "str_col: Cats are nice" @@ -73,6 +94,6 @@ def test_write(self): # test langchain integration embeddings = OpenAIEmbeddings(openai_api_key=self.config["embedding"]["openai_key"]) self._init_pinecone() - vector_store = Pinecone(self.pinecone_index, embeddings.embed_query, "text") + vector_store = Pinecone(self.pinecone_index_rest, embeddings.embed_query, "text") result = vector_store.similarity_search("feline animals", 1) assert result[0].metadata["_ab_record_id"] == "mystream_2" From b9543c0f909480db29bc271781299e3ab1a179fc Mon Sep 17 00:00:00 2001 From: Roie Schwaber-Cohen Date: Mon, 29 Apr 2024 10:01:05 -0700 Subject: [PATCH 3/3] tests passing --- .../destination_pinecone/indexer.py | 1 - .../integration_tests/pinecone_integration_test.py | 12 ++++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py b/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py index 7761b7293acd..f0a52d75b4b9 100644 --- a/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py +++ b/airbyte-integrations/connectors/destination-pinecone/destination_pinecone/indexer.py @@ -126,7 +126,6 @@ def index(self, document_chunks, namespace, streamName): pinecone_docs.append((prefix + "#" + str(uuid.uuid4()), chunk.embedding, metadata)) serial_batches = create_chunks(pinecone_docs, batch_size=PINECONE_BATCH_SIZE * PARALLELISM_LIMIT) for batch in serial_batches: - print(self.pinecone_index) async_results = [] for ids_vectors_chunk in create_chunks(batch, batch_size=PINECONE_BATCH_SIZE): async_result = self.pinecone_index.upsert(vectors=ids_vectors_chunk, async_req=True, show_progress=False) diff --git a/airbyte-integrations/connectors/destination-pinecone/integration_tests/pinecone_integration_test.py b/airbyte-integrations/connectors/destination-pinecone/integration_tests/pinecone_integration_test.py index 5eca7d92e139..6b647ca0ad26 100644 --- a/airbyte-integrations/connectors/destination-pinecone/integration_tests/pinecone_integration_test.py +++ b/airbyte-integrations/connectors/destination-pinecone/integration_tests/pinecone_integration_test.py @@ -25,7 +25,7 @@ def _init_pinecone(self): self.pinecone_index_rest = self.pc_rest.Index(name=self.config["indexing"]["index"]) def _wait(self): - print("Waiting for Pinecone to index the data...", end='', flush=True) + print("Waiting for Pinecone...", end='', flush=True) for i in range(15): time.sleep(1) print(".", end='', flush=True) @@ -35,12 +35,20 @@ def setUp(self): with open("secrets/config.json", "r") as f: self.config = json.loads(f.read()) self._init_pinecone() + # self.pinecone_index.delete(delete_all=True) def tearDown(self): self._wait() # make sure pinecone is initialized correctly before cleaning up self._init_pinecone() - # self.pinecone_index.delete(delete_all=True, namespace="") + try: + self.pinecone_index.delete(delete_all=True) + except PineconeException as e: + if "Namespace not found" not in str(e): + raise(e) + else : + print("Noting to delete. No data in the index/namespace.") + def test_check_valid_config(self): outcome = DestinationPinecone().check(logging.getLogger("airbyte"), self.config)