Skip to content

Commit

Permalink
chore: format code
Browse files Browse the repository at this point in the history
  • Loading branch information
pnilan committed May 8, 2024
1 parent 1d1578b commit a0cc4f5
Show file tree
Hide file tree
Showing 11 changed files with 49 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@


class BackoffStrategy(ABC):

@abstractmethod
def backoff_time(self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]]) -> Optional[float]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

from .backoff_strategy import BackoffStrategy


class DefaultBackoffStrategy(BackoffStrategy):
def __init__(
self,
max_retries: int = 5,
max_time: int = 60 * 10,
retry_factor: float = 5,

):
self.max_retries = max_retries
self.max_time = max_time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Optional, Union

import requests

from .response_models import ErrorResolution


Expand All @@ -15,9 +16,7 @@ class ErrorHandler(ABC):
"""

@abstractmethod
def interpret_response(
self, response: Optional[Union[requests.Response, Exception]]
) -> ErrorResolution:
def interpret_response(self, response: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
"""
Interpret the response or exception and return the corresponding response action, failure type, and error message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from typing import Any, Mapping, Optional, Tuple, Union

import requests
from requests import RequestException
from airbyte_cdk.models import FailureType
from requests import RequestException

from .error_handler import ErrorHandler
from .response_models import ResponseAction, ErrorResolution
from .response_models import ErrorResolution, ResponseAction


class HttpStatusErrorHandler(ErrorHandler):
Expand Down Expand Up @@ -79,9 +79,7 @@ def __init__(
self._logger = logger
self._error_mapping = error_mapping or self._DEFAULT_ERROR_MAPPING

def interpret_response(
self, response_or_exception: Optional[Union[requests.Response, Exception]] = None
) -> ErrorResolution:
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
"""
Interpret the response and return the corresponding response action, failure type, and error message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@


class JsonErrorMessageParser(ErrorMessageParser):

def _try_get_error(self, value: Optional[JsonType]) -> Optional[str]:
if isinstance(value, str):
return value
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from dataclasses import dataclass
from enum import Enum
from typing import Optional
from dataclasses import dataclass

from airbyte_cdk.models import FailureType


Expand All @@ -12,6 +13,7 @@ class ResponseAction(Enum):
FAIL = "FAIL"
IGNORE = "IGNORE"


@dataclass
class ErrorResolution:
response_action: Optional[ResponseAction] = None
Expand Down
20 changes: 16 additions & 4 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,23 @@
#


from typing import Union, Optional
from typing import Optional, Union

import requests


class BaseBackoffException(requests.exceptions.HTTPError):
def __init__(self, request: requests.PreparedRequest, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], error_message: str = ""):
def __init__(
self,
request: requests.PreparedRequest,
response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
error_message: str = "",
):

if isinstance(response_or_exception, requests.Response):
error_message = (
error_message or f"Request URL: {request.url}, Response Code: {response_or_exception.status_code}, Response Text: {response_or_exception.text}"
error_message
or f"Request URL: {request.url}, Response Code: {response_or_exception.status_code}, Response Text: {response_or_exception.text}"
)
else:
error_message = error_message or f"Request URL: {request.url}, Exception: {response_or_exception}"
Expand All @@ -32,7 +38,13 @@ class UserDefinedBackoffException(BaseBackoffException):
An exception that exposes how long it attempted to backoff
"""

def __init__(self, backoff: Union[int, float], request: requests.PreparedRequest, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], error_message: str = ""):
def __init__(
self,
backoff: Union[int, float],
request: requests.PreparedRequest,
response_or_exception: Optional[Union[requests.Response, requests.RequestException]],
error_message: str = "",
):
"""
:param backoff: how long to backoff in seconds
:param request: the request that triggered this backoff exception
Expand Down
26 changes: 20 additions & 6 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import logging
import os
import urllib
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Mapping, Optional, Tuple, Union
from dataclasses import dataclass

import requests
import requests_cache
Expand All @@ -17,14 +17,23 @@
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from requests.auth import AuthBase

from .error_handlers import DefaultBackoffStrategy, HttpStatusErrorHandler, JsonErrorMessageParser, ResponseAction, ErrorResolution, ErrorHandler, BackoffStrategy, ErrorMessageParser
from .error_handlers import (
BackoffStrategy,
DefaultBackoffStrategy,
ErrorHandler,
ErrorMessageParser,
ErrorResolution,
HttpStatusErrorHandler,
JsonErrorMessageParser,
ResponseAction,
)
from .exceptions import DefaultBackoffException, RequestBodyException, UserDefinedBackoffException
from .rate_limiting import default_backoff_handler, user_defined_backoff_handler

BODY_REQUEST_METHODS = ("GET", "POST", "PUT", "PATCH")

class HttpClient:

class HttpClient:
def __init__(
self,
name: str,
Expand Down Expand Up @@ -179,7 +188,9 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str,

elif error_resolution.response_action == ResponseAction.IGNORE:
if response:
log_message = f"Ignoring response for '{request.method}' request to '{request.url}' with response code '{response.status_code}'"
log_message = (
f"Ignoring response for '{request.method}' request to '{request.url}' with response code '{response.status_code}'"
)
else:
log_message = f"Ignoring response for '{request.method}' request to '{request.url}' with error '{exc}'"

Expand All @@ -188,7 +199,10 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str,
# TODO: Consider dynamic retry count depending on subsequent error codes
elif error_resolution.response_action == ResponseAction.RETRY:
custom_backoff_time = self._backoff_strategy.backoff_time(response or exc)
error_message = error_resolution.error_message or f"Request to {request.url} failed with failure type {error_resolution.failure_type}, response action {error_resolution.response_action}."
error_message = (
error_resolution.error_message
or f"Request to {request.url} failed with failure type {error_resolution.failure_type}, response action {error_resolution.response_action}."
)
if custom_backoff_time:
raise UserDefinedBackoffException(
backoff=custom_backoff_time, request=request, response_or_exception=(response or exc), error_message=error_message
Expand All @@ -203,7 +217,7 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str,
self._logger.error(response.text)
raise e

return response # type: ignore # will either return a valid response of type requests.Response or raise an exception
return response # type: ignore # will either return a valid response of type requests.Response or raise an exception

def send_request(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

import requests
from typing import Optional, Union
import pytest

import pytest
import requests
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy, DefaultBackoffStrategy



def test_given_no_arguments_default_backoff_strategy_returns_default_values():
response = requests.Response()
backoff_strategy = DefaultBackoffStrategy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest
import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.streams.http.error_handlers import HttpStatusErrorHandler, ResponseAction, ErrorResolution
from airbyte_cdk.sources.streams.http.error_handlers import ErrorResolution, HttpStatusErrorHandler, ResponseAction

logger = MagicMock()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.streams.call_rate import APIBudget, CachedLimiterSession, LimiterSession
from airbyte_cdk.sources.streams.http import HttpClient
from airbyte_cdk.sources.streams.http.error_handlers import DefaultBackoffStrategy, ResponseAction, ErrorResolution
from airbyte_cdk.sources.streams.http.error_handlers import DefaultBackoffStrategy, ErrorResolution, ResponseAction
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException, RequestBodyException, UserDefinedBackoffException
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
Expand Down

0 comments on commit a0cc4f5

Please sign in to comment.