Skip to content

Commit

Permalink
Add tests for writer
Browse files Browse the repository at this point in the history
  • Loading branch information
abrahamy committed Dec 17, 2024
1 parent 5056560 commit adae6d6
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from __future__ import annotations

import traceback
from time import time
from typing import TYPE_CHECKING, Any

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, Type
from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteLogMessage, AirbyteMessage, AirbyteTraceMessage, Level, TraceType, Type
from destination_deepset.api import APIError, DeepsetCloudApi
from destination_deepset.models import DeepsetCloudConfig, DeepsetCloudFile
from pipelines.airbyte_ci.connectors.migrate_to_manifest_only.declarative_component_schema import FailureType

if TYPE_CHECKING:
from collections.abc import Mapping
Expand Down Expand Up @@ -49,15 +51,20 @@ def write(self, file: DeepsetCloudFile) -> AirbyteMessage:
AirbyteMessage: Returns an Airbyte message with a suitable status.
"""
try:
file_id = self.client.upload(file=file)
except APIError:
file_id = self.client.upload(file)
except APIError as ex:
workspace = self.client.config.workspace
return AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.ERROR,
message=f"Failed to upload a record to deepset cloud workspace, {workspace = }.",
stack_trace=traceback.format_exc(),
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=time(),
error=AirbyteErrorTraceMessage(
message=f"Failed to upload a record to deepset cloud workspace, {workspace = }.",
internal_message=str(ex),
stack_trace=traceback.format_exc(),
failure_type=FailureType.transient_error.value,
),
),
)
else:
Expand All @@ -66,6 +73,6 @@ def write(self, file: DeepsetCloudFile) -> AirbyteMessage:
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.INFO,
message=("Successfully uploaded a record to a deepset cloud workspace. " f"Uploaded {file_id = !s}, {workspace = }."),
message=f"File uploaded, file_name = {file.name}, {file_id = }, {workspace = }.",
),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from __future__ import annotations

import secrets
from typing import TYPE_CHECKING, Any
from uuid import UUID, uuid4

import pytest
from airbyte_cdk.models import AirbyteLogMessage, AirbyteTraceMessage, Level, TraceType, Type
from destination_deepset.api import APIError, DeepsetCloudApi
from destination_deepset.models import DeepsetCloudConfig, DeepsetCloudFile
from destination_deepset.writer import DeepsetCloudFileWriter

if TYPE_CHECKING:
from collections.abc import Mapping


@pytest.fixture()
def config_dict() -> Mapping[str, Any]:
return {
"api_key": secrets.token_urlsafe(16),
"base_url": "https://api.dev.cloud.dpst.dev",
"workspace": "airbyte-test",
"retries": 5,
}


def test_factory(config_dict: Mapping[str, Any]) -> None:
writer = DeepsetCloudFileWriter.factory(config_dict)

assert isinstance(writer, DeepsetCloudFileWriter)
assert isinstance(writer.client, DeepsetCloudApi)
assert isinstance(writer.client.config, DeepsetCloudConfig)

config = writer.client.config
assert config.api_key == config_dict["api_key"]
assert config.base_url == config_dict["base_url"]
assert config.workspace == config_dict["workspace"]
assert config.retries == config_dict["retries"]


def test_write_happy_path(
monkeypatch: pytest.MonkeyPatch, config_dict: Mapping[str, Any], file: DeepsetCloudFile
) -> None:
writer = DeepsetCloudFileWriter.factory(config_dict)

def upload(file: DeepsetCloudFile) -> UUID:
return uuid4()

monkeypatch.setattr(writer.client, "upload", upload)

message = writer.write(file)
assert message.type == Type.LOG
assert isinstance(message.log, AirbyteLogMessage)
assert message.log.level == Level.INFO


def test_write_returns_trace_message_if_error(
monkeypatch: pytest.MonkeyPatch, config_dict: Mapping[str, Any], file: DeepsetCloudFile
) -> None:
writer = DeepsetCloudFileWriter.factory(config_dict)

def upload(_: ...) -> UUID:
raise APIError

monkeypatch.setattr(writer.client, "upload", upload)

message = writer.write(file)
assert message.type == Type.TRACE
assert isinstance(message.trace, AirbyteTraceMessage)
assert message.trace.type == TraceType.ERROR

0 comments on commit adae6d6

Please sign in to comment.