diff --git a/airbyte-integrations/connectors/destination-typesense/Dockerfile b/airbyte-integrations/connectors/destination-typesense/Dockerfile index f5036e89ab0c..26fa70977d17 100644 --- a/airbyte-integrations/connectors/destination-typesense/Dockerfile +++ b/airbyte-integrations/connectors/destination-typesense/Dockerfile @@ -34,5 +34,5 @@ COPY destination_typesense ./destination_typesense ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/destination-typesense diff --git a/airbyte-integrations/connectors/destination-typesense/destination_typesense/destination.py b/airbyte-integrations/connectors/destination-typesense/destination_typesense/destination.py index 5e4de404d2af..a0d5a00e91e7 100644 --- a/airbyte-integrations/connectors/destination-typesense/destination_typesense/destination.py +++ b/airbyte-integrations/connectors/destination-typesense/destination_typesense/destination.py @@ -3,7 +3,6 @@ # -from logging import Logger from typing import Any, Iterable, Mapping from airbyte_cdk.destinations import Destination @@ -38,18 +37,19 @@ def write( pass client.collections.create({"name": steam_name, "fields": [{"name": ".*", "type": "auto"}]}) - writer = TypesenseWriter(client, steam_name, config.get("batch_size")) - for message in input_messages: - if message.type == Type.STATE: - writer.flush() - yield message - elif message.type == Type.RECORD: - writer.queue_write_operation(message.record.data) - else: - continue - writer.flush() - - def check(self, logger: Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + writer = TypesenseWriter(client, config.get("batch_size")) + for message in input_messages: + if message.type == Type.STATE: + writer.flush() + yield message + elif message.type == Type.RECORD: + record = message.record + writer.queue_write_operation(record.stream, record.data) + else: + continue + writer.flush() + + def check(self, config: Mapping[str, Any]) -> AirbyteConnectionStatus: try: client = get_client(config=config) client.collections.create({"name": "_airbyte", "fields": [{"name": "title", "type": "string"}]}) diff --git a/airbyte-integrations/connectors/destination-typesense/destination_typesense/writer.py b/airbyte-integrations/connectors/destination-typesense/destination_typesense/writer.py index fd9c0e3b5868..54e85d5512b7 100644 --- a/airbyte-integrations/connectors/destination-typesense/destination_typesense/writer.py +++ b/airbyte-integrations/connectors/destination-typesense/destination_typesense/writer.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from collections import defaultdict from collections.abc import Mapping from logging import getLogger from uuid import uuid4 @@ -12,17 +13,15 @@ class TypesenseWriter: - write_buffer = [] + write_buffer: list[tuple[str, Mapping]] = [] - def __init__(self, client: Client, steam_name: str, batch_size: int = None): + def __init__(self, client: Client, batch_size: int = None): self.client = client - self.steam_name = steam_name self.batch_size = batch_size or 10000 - def queue_write_operation(self, data: Mapping): + def queue_write_operation(self, stream_name: str, data: Mapping): random_key = str(uuid4()) - data_with_id = data if "id" in data else {**data, "id": random_key} - self.write_buffer.append(data_with_id) + self.write_buffer.append((stream_name, {**data, "id": random_key})) if len(self.write_buffer) == self.batch_size: self.flush() @@ -31,5 +30,11 @@ def flush(self): if buffer_size == 0: return logger.info(f"flushing {buffer_size} records") - self.client.collections[self.steam_name].documents.import_(self.write_buffer) + + grouped_by_stream: defaultdict[str, list[Mapping]] = defaultdict(list) + for stream, data in self.write_buffer: + grouped_by_stream[stream].append(data) + + for (stream, data) in grouped_by_stream.items(): + self.client.collections[stream].documents.import_(data) self.write_buffer.clear() diff --git a/airbyte-integrations/connectors/destination-typesense/integration_tests/integration_test.py b/airbyte-integrations/connectors/destination-typesense/integration_tests/integration_test.py index cb0d5aae3145..0b159f3eb857 100644 --- a/airbyte-integrations/connectors/destination-typesense/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/destination-typesense/integration_tests/integration_test.py @@ -3,7 +3,6 @@ # import json -from logging import getLogger from typing import Any, Dict, Mapping import pytest @@ -33,13 +32,13 @@ def config_fixture() -> Mapping[str, Any]: def configured_catalog_fixture() -> ConfiguredAirbyteCatalog: stream_schema = {"type": "object", "properties": {"col1": {"type": "str"}, "col2": {"type": "integer"}}} - overwrite_stream = ConfiguredAirbyteStream( - stream=AirbyteStream(name="_airbyte", json_schema=stream_schema, supported_sync_modes=[SyncMode.incremental]), + overwrite_stream = lambda n: ConfiguredAirbyteStream( + stream=AirbyteStream(name=f"_airbyte_{n}", json_schema=stream_schema, supported_sync_modes=[SyncMode.incremental]), sync_mode=SyncMode.incremental, destination_sync_mode=DestinationSyncMode.overwrite, ) - return ConfiguredAirbyteCatalog(streams=[overwrite_stream]) + return ConfiguredAirbyteCatalog(streams=[overwrite_stream(i) for i in range(2)]) @pytest.fixture(autouse=True) @@ -60,12 +59,12 @@ def client_fixture(config) -> Client: def test_check_valid_config(config: Mapping): - outcome = DestinationTypesense().check(getLogger("airbyte"), config) + outcome = DestinationTypesense().check(config) assert outcome.status == Status.SUCCEEDED def test_check_invalid_config(): - outcome = DestinationTypesense().check(getLogger("airbyte"), {"api_key": "not_a_real_key", "host": "https://www.fake.com"}) + outcome = DestinationTypesense().check({"api_key": "not_a_real_key", "host": "https://www.fake.com"}) assert outcome.status == Status.FAILED @@ -79,17 +78,18 @@ def _record(stream: str, str_value: str, int_value: int) -> AirbyteMessage: ) -def records_count(client: Client) -> int: - documents_results = client.index("_airbyte").get_documents() - return documents_results.total +def collection_size(client: Client, stream: str) -> int: + collection = client.collections[stream].retrieve() + return collection["num_documents"] def test_write(config: Mapping, configured_catalog: ConfiguredAirbyteCatalog, client: Client): - overwrite_stream = configured_catalog.streams[0].stream.name + configured_streams = list(map(lambda s: s.stream.name, configured_catalog.streams)) first_state_message = _state({"state": "1"}) - first_record_chunk = [_record(overwrite_stream, str(i), i) for i in range(2)] + first_record_chunk = [_record(stream, str(i), i) for i, stream in enumerate(configured_streams)] destination = DestinationTypesense() list(destination.write(config, configured_catalog, [*first_record_chunk, first_state_message])) - collection = client.collections["_airbyte"].retrieve() - assert collection["num_documents"] == 2 + + for stream in configured_streams: + assert collection_size(client, stream) == 1 diff --git a/airbyte-integrations/connectors/destination-typesense/metadata.yaml b/airbyte-integrations/connectors/destination-typesense/metadata.yaml index b3ee53fe4fee..ac5d918b2c45 100644 --- a/airbyte-integrations/connectors/destination-typesense/metadata.yaml +++ b/airbyte-integrations/connectors/destination-typesense/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 36be8dc6-9851-49af-b776-9d4c30e4ab6a - dockerImageTag: 0.1.1 + dockerImageTag: 0.1.2 dockerRepository: airbyte/destination-typesense githubIssueLabel: destination-typesense icon: typesense.svg diff --git a/airbyte-integrations/connectors/destination-typesense/unit_tests/unit_test.py b/airbyte-integrations/connectors/destination-typesense/unit_tests/unit_test.py index ba065cb9fc02..a14d7f5b2abf 100644 --- a/airbyte-integrations/connectors/destination-typesense/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/destination-typesense/unit_tests/unit_test.py @@ -9,32 +9,32 @@ @patch("typesense.Client") def test_default_batch_size(client): - writer = TypesenseWriter(client, "steam_name") + writer = TypesenseWriter(client) assert writer.batch_size == 10000 @patch("typesense.Client") def test_empty_batch_size(client): - writer = TypesenseWriter(client, "steam_name", "") + writer = TypesenseWriter(client, "") assert writer.batch_size == 10000 @patch("typesense.Client") def test_custom_batch_size(client): - writer = TypesenseWriter(client, "steam_name", 9000) + writer = TypesenseWriter(client, 9000) assert writer.batch_size == 9000 @patch("typesense.Client") def test_queue_write_operation(client): - writer = TypesenseWriter(client, "steam_name") - writer.queue_write_operation({"a": "a"}) + writer = TypesenseWriter(client) + writer.queue_write_operation("stream_name", {"a": "a"}) assert len(writer.write_buffer) == 1 @patch("typesense.Client") def test_flush(client): - writer = TypesenseWriter(client, "steam_name") - writer.queue_write_operation({"a": "a"}) + writer = TypesenseWriter(client) + writer.queue_write_operation("stream_name", {"a": "a"}) writer.flush() - client.collections.__getitem__.assert_called_once_with("steam_name") + client.collections.__getitem__.assert_called_once_with("stream_name") diff --git a/docs/integrations/destinations/typesense.md b/docs/integrations/destinations/typesense.md index f9dfff351c60..7185a69013df 100644 --- a/docs/integrations/destinations/typesense.md +++ b/docs/integrations/destinations/typesense.md @@ -37,5 +37,6 @@ The setup only requires two fields. First is the `host` which is the address at | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :---------------------------- | +| 0.1.2 | 2023-08-25 | [29817](https://github.com/airbytehq/airbyte/pull/29817) | Fix writing multiple streams | +| 0.1.1 | 2023-08-24 | [29555](https://github.com/airbytehq/airbyte/pull/29555) | Increasing connection timeout | | 0.1.0 | 2022-10-28 | [18349](https://github.com/airbytehq/airbyte/pull/18349) | New Typesense destination | -| 0.1.1 | 2023-22-17 | [29555](https://github.com/airbytehq/airbyte/pull/29555) | Increasing connection timeout |