Skip to content

Commit

Permalink
add dedicated methods for create trace and log messages
Browse files Browse the repository at this point in the history
  • Loading branch information
abrahamy committed Dec 17, 2024
1 parent adae6d6 commit 84bbae2
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,20 @@
from __future__ import annotations

import logging
import traceback
from collections.abc import Iterable, Mapping
from typing import Any

from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteLogMessage, AirbyteMessage, ConfiguredAirbyteCatalog, Level, Status, Type
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status, Type
from destination_deepset import util
from destination_deepset.api import APIError, DeepsetCloudApi
from destination_deepset.models import DeepsetCloudConfig, DeepsetCloudFile, Filetypes
from destination_deepset.api import APIError
from destination_deepset.models import DeepsetCloudFile, Filetypes
from destination_deepset.writer import DeepsetCloudFileWriter, WriterError

logger = logging.getLogger("airbyte")


class DestinationDeepset(Destination):
def get_deepset_cloud_api(self, config: Mapping[str, Any]) -> DeepsetCloudApi:
deepset_cloud_config = DeepsetCloudConfig.parse_obj(config)
return DeepsetCloudApi(deepset_cloud_config)

def write(
self,
config: Mapping[str, Any],
Expand Down Expand Up @@ -66,15 +61,8 @@ def write(

try:
file = DeepsetCloudFile.from_record(message.record)
except ValueError:
yield AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.WARN,
message="Failed to parse data into deepset cloud file instance.",
stack_trace=traceback.format_exc(),
),
)
except ValueError as ex:
yield util.get_trace_message("Failed to parse data into deepset cloud file instance.", exception=ex)
else:
yield writer.write(file=file)
case _:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,21 @@

from __future__ import annotations

from time import time
from traceback import format_exc
from typing import TYPE_CHECKING, Any

from airbyte_cdk.models import (
AirbyteErrorTraceMessage,
AirbyteLogMessage,
AirbyteMessage,
AirbyteTraceMessage,
FailureType,
Level,
TraceType,
Type,
)

if TYPE_CHECKING:
from pydantic import BaseModel

Expand Down Expand Up @@ -31,3 +44,40 @@ def get(obj: dict[str, Any] | BaseModel, key_path: str, default: Any = None) ->
return default

return current


def get_trace_message(message: str, exception: Exception | None = None) -> AirbyteMessage:
"""Return a the message formatted as an `AirbyteMessage` of type `TRACE`.
Args:
message (str): The message to be formatted.
exception (Exception | None, optional): An optional `Exception` object. Defaults to None.
Returns:
AirbyteMessage: An `AirbyteMessage` instance.
"""
return AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=time(),
error=AirbyteErrorTraceMessage(
message=message,
internal_message=str(exception) if exception else None,
stack_trace=format_exc(),
failure_type=FailureType.transient_error.value,
),
),
)


def get_log_message(message: str) -> AirbyteMessage:
"""Return the message formatted as an `AirbyteMessage` of type `LOG`.
Args:
message (str): The message to be formatted.
Returns:
AirbyteMessage: An `AirbyteMessage` instance.
"""
return AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=message))
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

from __future__ import annotations

import traceback
from time import time
from typing import TYPE_CHECKING, Any

from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteLogMessage, AirbyteMessage, AirbyteTraceMessage, Level, TraceType, Type
from airbyte_cdk.models import AirbyteMessage
from destination_deepset import util
from destination_deepset.api import APIError, DeepsetCloudApi
from destination_deepset.models import DeepsetCloudConfig, DeepsetCloudFile
from pipelines.airbyte_ci.connectors.migrate_to_manifest_only.declarative_component_schema import FailureType

if TYPE_CHECKING:
from collections.abc import Mapping
Expand Down Expand Up @@ -53,26 +51,11 @@ def write(self, file: DeepsetCloudFile) -> AirbyteMessage:
try:
file_id = self.client.upload(file)
except APIError as ex:
workspace = self.client.config.workspace
return AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=time(),
error=AirbyteErrorTraceMessage(
message=f"Failed to upload a record to deepset cloud workspace, {workspace = }.",
internal_message=str(ex),
stack_trace=traceback.format_exc(),
failure_type=FailureType.transient_error.value,
),
),
return util.get_trace_message(
f"Failed to upload a record to deepset cloud workspace, workspace = {self.client.config.workspace}.",
exception=ex,
)
else:
workspace = self.client.config.workspace
return AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.INFO,
message=f"File uploaded, file_name = {file.name}, {file_id = }, {workspace = }.",
),
return util.get_log_message(
f"File uploaded, file_name = {file.name}, {file_id = }, workspace = {self.client.config.workspace}."
)

0 comments on commit 84bbae2

Please sign in to comment.