Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Marcos/test pr 29817 #33624

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY destination_typesense ./destination_typesense
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/destination-typesense
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#


from logging import Logger
from typing import Any, Iterable, Mapping

from airbyte_cdk.destinations import Destination
Expand Down Expand Up @@ -38,18 +37,19 @@ def write(
pass
client.collections.create({"name": steam_name, "fields": [{"name": ".*", "type": "auto"}]})

writer = TypesenseWriter(client, steam_name, config.get("batch_size"))
for message in input_messages:
if message.type == Type.STATE:
writer.flush()
yield message
elif message.type == Type.RECORD:
writer.queue_write_operation(message.record.data)
else:
continue
writer.flush()

def check(self, logger: Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
writer = TypesenseWriter(client, config.get("batch_size"))
for message in input_messages:
if message.type == Type.STATE:
writer.flush()
yield message
elif message.type == Type.RECORD:
record = message.record
writer.queue_write_operation(record.stream, record.data)
else:
continue
writer.flush()

def check(self, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
try:
client = get_client(config=config)
client.collections.create({"name": "_airbyte", "fields": [{"name": "title", "type": "string"}]})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from collections import defaultdict
from collections.abc import Mapping
from logging import getLogger
from uuid import uuid4
Expand All @@ -12,17 +13,15 @@


class TypesenseWriter:
write_buffer = []
write_buffer: list[tuple[str, Mapping]] = []

def __init__(self, client: Client, steam_name: str, batch_size: int = None):
def __init__(self, client: Client, batch_size: int = None):
self.client = client
self.steam_name = steam_name
self.batch_size = batch_size or 10000

def queue_write_operation(self, data: Mapping):
def queue_write_operation(self, stream_name: str, data: Mapping):
random_key = str(uuid4())
data_with_id = data if "id" in data else {**data, "id": random_key}
self.write_buffer.append(data_with_id)
self.write_buffer.append((stream_name, {**data, "id": random_key}))
if len(self.write_buffer) == self.batch_size:
self.flush()

Expand All @@ -31,5 +30,11 @@ def flush(self):
if buffer_size == 0:
return
logger.info(f"flushing {buffer_size} records")
self.client.collections[self.steam_name].documents.import_(self.write_buffer)

grouped_by_stream: defaultdict[str, list[Mapping]] = defaultdict(list)
for stream, data in self.write_buffer:
grouped_by_stream[stream].append(data)

for (stream, data) in grouped_by_stream.items():
self.client.collections[stream].documents.import_(data)
self.write_buffer.clear()
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#

import json
from logging import getLogger
from typing import Any, Dict, Mapping

import pytest
Expand Down Expand Up @@ -33,13 +32,13 @@ def config_fixture() -> Mapping[str, Any]:
def configured_catalog_fixture() -> ConfiguredAirbyteCatalog:
stream_schema = {"type": "object", "properties": {"col1": {"type": "str"}, "col2": {"type": "integer"}}}

overwrite_stream = ConfiguredAirbyteStream(
stream=AirbyteStream(name="_airbyte", json_schema=stream_schema, supported_sync_modes=[SyncMode.incremental]),
overwrite_stream = lambda n: ConfiguredAirbyteStream(
stream=AirbyteStream(name=f"_airbyte_{n}", json_schema=stream_schema, supported_sync_modes=[SyncMode.incremental]),
sync_mode=SyncMode.incremental,
destination_sync_mode=DestinationSyncMode.overwrite,
)

return ConfiguredAirbyteCatalog(streams=[overwrite_stream])
return ConfiguredAirbyteCatalog(streams=[overwrite_stream(i) for i in range(2)])


@pytest.fixture(autouse=True)
Expand All @@ -60,12 +59,12 @@ def client_fixture(config) -> Client:


def test_check_valid_config(config: Mapping):
outcome = DestinationTypesense().check(getLogger("airbyte"), config)
outcome = DestinationTypesense().check(config)
assert outcome.status == Status.SUCCEEDED


def test_check_invalid_config():
outcome = DestinationTypesense().check(getLogger("airbyte"), {"api_key": "not_a_real_key", "host": "https://www.fake.com"})
outcome = DestinationTypesense().check({"api_key": "not_a_real_key", "host": "https://www.fake.com"})
assert outcome.status == Status.FAILED


Expand All @@ -79,17 +78,18 @@ def _record(stream: str, str_value: str, int_value: int) -> AirbyteMessage:
)


def records_count(client: Client) -> int:
documents_results = client.index("_airbyte").get_documents()
return documents_results.total
def collection_size(client: Client, stream: str) -> int:
collection = client.collections[stream].retrieve()
return collection["num_documents"]


def test_write(config: Mapping, configured_catalog: ConfiguredAirbyteCatalog, client: Client):
overwrite_stream = configured_catalog.streams[0].stream.name
configured_streams = list(map(lambda s: s.stream.name, configured_catalog.streams))
first_state_message = _state({"state": "1"})
first_record_chunk = [_record(overwrite_stream, str(i), i) for i in range(2)]
first_record_chunk = [_record(stream, str(i), i) for i, stream in enumerate(configured_streams)]

destination = DestinationTypesense()
list(destination.write(config, configured_catalog, [*first_record_chunk, first_state_message]))
collection = client.collections["_airbyte"].retrieve()
assert collection["num_documents"] == 2

for stream in configured_streams:
assert collection_size(client, stream) == 1
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 36be8dc6-9851-49af-b776-9d4c30e4ab6a
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
dockerRepository: airbyte/destination-typesense
githubIssueLabel: destination-typesense
icon: typesense.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,32 @@

@patch("typesense.Client")
def test_default_batch_size(client):
writer = TypesenseWriter(client, "steam_name")
writer = TypesenseWriter(client)
assert writer.batch_size == 10000


@patch("typesense.Client")
def test_empty_batch_size(client):
writer = TypesenseWriter(client, "steam_name", "")
writer = TypesenseWriter(client, "")
assert writer.batch_size == 10000


@patch("typesense.Client")
def test_custom_batch_size(client):
writer = TypesenseWriter(client, "steam_name", 9000)
writer = TypesenseWriter(client, 9000)
assert writer.batch_size == 9000


@patch("typesense.Client")
def test_queue_write_operation(client):
writer = TypesenseWriter(client, "steam_name")
writer.queue_write_operation({"a": "a"})
writer = TypesenseWriter(client)
writer.queue_write_operation("stream_name", {"a": "a"})
assert len(writer.write_buffer) == 1


@patch("typesense.Client")
def test_flush(client):
writer = TypesenseWriter(client, "steam_name")
writer.queue_write_operation({"a": "a"})
writer = TypesenseWriter(client)
writer.queue_write_operation("stream_name", {"a": "a"})
writer.flush()
client.collections.__getitem__.assert_called_once_with("steam_name")
client.collections.__getitem__.assert_called_once_with("stream_name")
3 changes: 2 additions & 1 deletion docs/integrations/destinations/typesense.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ The setup only requires two fields. First is the `host` which is the address at

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :---------------------------- |
| 0.1.2 | 2023-08-25 | [29817](https://github.com/airbytehq/airbyte/pull/29817) | Fix writing multiple streams |
| 0.1.1 | 2023-08-24 | [29555](https://github.com/airbytehq/airbyte/pull/29555) | Increasing connection timeout |
| 0.1.0 | 2022-10-28 | [18349](https://github.com/airbytehq/airbyte/pull/18349) | New Typesense destination |
| 0.1.1 | 2023-22-17 | [29555](https://github.com/airbytehq/airbyte/pull/29555) | Increasing connection timeout |
Loading