diff --git a/airbyte-integrations/connectors/destination-meilisearch/Dockerfile b/airbyte-integrations/connectors/destination-meilisearch/Dockerfile index f573460c64a2..00bbfb9c4846 100644 --- a/airbyte-integrations/connectors/destination-meilisearch/Dockerfile +++ b/airbyte-integrations/connectors/destination-meilisearch/Dockerfile @@ -34,5 +34,5 @@ COPY destination_meilisearch ./destination_meilisearch ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=1.0.0 +LABEL io.airbyte.version=1.0.1 LABEL io.airbyte.name=airbyte/destination-meilisearch diff --git a/airbyte-integrations/connectors/destination-meilisearch/destination_meilisearch/destination.py b/airbyte-integrations/connectors/destination-meilisearch/destination_meilisearch/destination.py index d6a44c1d5f9c..32d08b787bf1 100644 --- a/airbyte-integrations/connectors/destination-meilisearch/destination_meilisearch/destination.py +++ b/airbyte-integrations/connectors/destination-meilisearch/destination_meilisearch/destination.py @@ -3,14 +3,16 @@ # -from logging import Logger -from typing import Any, Iterable, Mapping +from logging import Logger, getLogger +from typing import Any, Dict, Iterable, Mapping from airbyte_cdk.destinations import Destination from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type from destination_meilisearch.writer import MeiliWriter from meilisearch import Client +logger = getLogger("airbyte") + def get_client(config: Mapping[str, Any]) -> Client: host = config.get("host") @@ -21,36 +23,51 @@ def get_client(config: Mapping[str, Any]) -> Client: class DestinationMeilisearch(Destination): primary_key = "_ab_pk" + def _flush_streams(self, streams: Dict[str, MeiliWriter]) -> Iterable[AirbyteMessage]: + for stream in streams: + streams[stream].flush() + def write( self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage] ) -> Iterable[AirbyteMessage]: client = get_client(config=config) + # Creating Meilisearch writers + writers = {s.stream.name: MeiliWriter(client, s.stream.name, self.primary_key) for s in configured_catalog.streams} for configured_stream in configured_catalog.streams: - steam_name = configured_stream.stream.name + stream_name = configured_stream.stream.name + # Deleting index in Meilisearch if sync mode is overwite if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite: - client.delete_index(steam_name) - client.create_index(steam_name, {"primaryKey": self.primary_key}) - - writer = MeiliWriter(client, steam_name, self.primary_key) - for message in input_messages: - if message.type == Type.STATE: - writer.flush() - yield message - elif message.type == Type.RECORD: - writer.queue_write_operation(message.record.data) - else: + logger.debug(f"Deleting index: {stream_name}.") + client.delete_index(stream_name) + # Creating index in Meilisearch + client.create_index(stream_name, {"primaryKey": self.primary_key}) + logger.debug(f"Creating index: {stream_name}.") + + for message in input_messages: + if message.type == Type.STATE: + yield message + elif message.type == Type.RECORD: + data = message.record.data + stream = message.record.stream + # Skip unselected streams + if stream not in writers: + logger.debug(f"Stream {stream} was not present in configured streams, skipping") continue - writer.flush() + writers[stream].queue_write_operation(data) + else: + logger.info(f"Unhandled message type {message.type}: {message}") + + # Flush any leftover messages + self._flush_streams(writers) def check(self, logger: Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: try: client = get_client(config=config) - create_index_job = client.create_index("_airbyte", {"primaryKey": "id"}) - client.wait_for_task(create_index_job["taskUid"]) + client.create_index("_airbyte", {"primaryKey": "id"}) - add_documents_job = client.index("_airbyte").add_documents( + client.index("_airbyte").add_documents( [ { "id": 287947, @@ -59,9 +76,7 @@ def check(self, logger: Logger, config: Mapping[str, Any]) -> AirbyteConnectionS } ] ) - client.wait_for_task(add_documents_job.task_uid) - client.index("_airbyte").search("Shazam") client.delete_index("_airbyte") return AirbyteConnectionStatus(status=Status.SUCCEEDED) except Exception as e: diff --git a/airbyte-integrations/connectors/destination-meilisearch/destination_meilisearch/writer.py b/airbyte-integrations/connectors/destination-meilisearch/destination_meilisearch/writer.py index c2eca6a88ce9..e2450f825106 100644 --- a/airbyte-integrations/connectors/destination-meilisearch/destination_meilisearch/writer.py +++ b/airbyte-integrations/connectors/destination-meilisearch/destination_meilisearch/writer.py @@ -12,25 +12,28 @@ class MeiliWriter: - write_buffer = [] flush_interval = 50000 - def __init__(self, client: Client, steam_name: str, primary_key: str): + def __init__(self, client: Client, stream_name: str, primary_key: str): self.client = client - self.steam_name = steam_name self.primary_key = primary_key + self.stream_name: str = stream_name + self._write_buffer = [] + + logger.info(f"Creating MeiliWriter for {self.stream_name}") def queue_write_operation(self, data: Mapping): random_key = str(uuid4()) - self.write_buffer.append({**data, self.primary_key: random_key}) - if len(self.write_buffer) == self.flush_interval: + self._write_buffer.append({**data, self.primary_key: random_key}) + if len(self._write_buffer) == self.flush_interval: + logger.debug(f"Reached limit size: flushing records for {self.stream_name}") self.flush() def flush(self): - buffer_size = len(self.write_buffer) + buffer_size = len(self._write_buffer) if buffer_size == 0: return - logger.info(f"flushing {buffer_size} records") - response = self.client.index(self.steam_name).add_documents(self.write_buffer) + logger.info(f"Flushing {buffer_size} records") + response = self.client.index(self.stream_name).add_documents(self._write_buffer) self.client.wait_for_task(response.task_uid, 1800000, 1000) - self.write_buffer.clear() + self._write_buffer.clear() diff --git a/airbyte-integrations/connectors/destination-meilisearch/integration_tests/integration_test.py b/airbyte-integrations/connectors/destination-meilisearch/integration_tests/integration_test.py index 9e63e24dc87d..1d9687e97c7d 100644 --- a/airbyte-integrations/connectors/destination-meilisearch/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/destination-meilisearch/integration_tests/integration_test.py @@ -4,7 +4,6 @@ import json import logging -import time from typing import Any, Dict, Mapping import pytest @@ -56,12 +55,7 @@ def teardown(config: Mapping): def client_fixture(config) -> Client: client = get_client(config=config) resp = client.create_index("_airbyte", {"primaryKey": "_ab_pk"}) - while True: - time.sleep(0.2) - task = client.get_task(resp["taskUid"]) - status = task["status"] - if status == "succeeded" or status == "failed": - break + client.wait_for_task(_handle_breaking_wait_for_task(resp)) return client @@ -87,6 +81,13 @@ def _record(stream: str, str_value: str, int_value: int) -> AirbyteMessage: ) +def _handle_breaking_wait_for_task(task: Any) -> int: + if type(task) is dict: + return task["taskUid"] + else: + return task.task_uid + + def records_count(client: Client) -> int: documents_results = client.index("_airbyte").get_documents() return documents_results.total diff --git a/airbyte-integrations/connectors/destination-meilisearch/metadata.yaml b/airbyte-integrations/connectors/destination-meilisearch/metadata.yaml index 7826092b697b..79a5f5851984 100644 --- a/airbyte-integrations/connectors/destination-meilisearch/metadata.yaml +++ b/airbyte-integrations/connectors/destination-meilisearch/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: api connectorType: destination definitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e - dockerImageTag: 1.0.0 + dockerImageTag: 1.0.1 dockerRepository: airbyte/destination-meilisearch githubIssueLabel: destination-meilisearch icon: meilisearch.svg diff --git a/airbyte-integrations/connectors/destination-meilisearch/unit_tests/unit_test.py b/airbyte-integrations/connectors/destination-meilisearch/unit_tests/unit_test.py index df1f503df180..c09a3f7d8744 100644 --- a/airbyte-integrations/connectors/destination-meilisearch/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/destination-meilisearch/unit_tests/unit_test.py @@ -9,15 +9,21 @@ @patch("meilisearch.Client") def test_queue_write_operation(client): - writer = MeiliWriter(client, "steam_name", "primary_key") + writer = MeiliWriter(client, "stream_name", "primary_key") writer.queue_write_operation({"a": "a"}) - assert len(writer.write_buffer) == 1 + assert len(writer._write_buffer) == 1 + writer.queue_write_operation({"b": "b"}) + assert len(writer._write_buffer) == 2 + writer2 = MeiliWriter(client, "stream_name2", "primary_key") + writer2.queue_write_operation({"a": "a"}) + assert len(writer2._write_buffer) == 1 + assert len(writer._write_buffer) == 2 @patch("meilisearch.Client") def test_flush(client): - writer = MeiliWriter(client, "steam_name", "primary_key") + writer = MeiliWriter(client, "stream_name", "primary_key") writer.queue_write_operation({"a": "a"}) writer.flush() - client.index.assert_called_once_with("steam_name") + client.index.assert_called_once_with("stream_name") client.wait_for_task.assert_called_once() diff --git a/docs/integrations/destinations/meilisearch.md b/docs/integrations/destinations/meilisearch.md index d7f40201b775..f788f9613057 100644 --- a/docs/integrations/destinations/meilisearch.md +++ b/docs/integrations/destinations/meilisearch.md @@ -33,7 +33,9 @@ The setup only requires two fields. First is the `host` which is the address at | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :----------------------------------------------------- | +| 1.0.1 | 2023-12-19 | [27692](https://github.com/airbytehq/airbyte/pull/27692) | Fix incomplete data indexing | | 1.0.0 | 2022-10-26 | [18036](https://github.com/airbytehq/airbyte/pull/18036) | Migrate MeiliSearch to Python CDK | | 0.2.13 | 2022-06-17 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Updated stacktrace format for any trace message errors | | 0.2.12 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option | | 0.2.11 | 2021-12-28 | [9156](https://github.com/airbytehq/airbyte/pull/9156) | Update connector fields title/description | +