From e54829ae5b7e21e70751b6467106b50dbbe9af16 Mon Sep 17 00:00:00 2001 From: Omer Zuarets Date: Thu, 30 May 2024 17:20:40 +0300 Subject: [PATCH 1/3] add httpx client for HttpFetchProvider and make it default --- .../fetcher/providers/http_fetch_provider.py | 37 +++++++++++++------ packages/opal-common/opal_common/http.py | 15 +++++++- packages/opal-common/requires.txt | 1 + 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py b/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py index 05189876f..c2ff60d7e 100644 --- a/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py +++ b/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py @@ -1,10 +1,12 @@ """Simple HTTP get data fetcher using requests supports.""" from enum import Enum -from typing import Any +from typing import Any, Union, cast +import httpx from aiohttp import ClientResponse, ClientSession from pydantic import validator +from typing_extensions import Literal from ...http import is_http_error_response from ...security.sslcontext import get_custom_ssl_context @@ -48,6 +50,7 @@ class Config: class HttpFetchEvent(FetchEvent): fetcher: str = "HttpFetchProvider" config: HttpFetcherConfig = None + client_type: Literal["httpx", "aiohttp"] = "httpx" class HttpFetchProvider(BaseFetchProvider): @@ -71,13 +74,15 @@ async def __aenter__(self): headers = {} if self._event.config.headers is not None: headers = self._event.config.headers - self._session = await ClientSession( - headers=headers, raise_for_status=True - ).__aenter__() + if self._event.client_type == "httpx": + self._session = httpx.AsyncClient(headers=headers) + else: + self._session = ClientSession(headers=headers, raise_for_status=True) + self._session = self._session.__aenter__() return self async def __aexit__(self, exc_type=None, exc_val=None, tb=None): - await self._session.__aexit__(exc_type=exc_type, exc_val=exc_val, exc_tb=tb) + await self._session.__aexit__(exc_type, exc_val, tb) async def _fetch_(self): logger.debug(f"{self.__class__.__name__} fetching from {self._url}") @@ -93,21 +98,29 @@ async def _fetch_(self): return result @staticmethod - def match_http_method_from_type(session: ClientSession, method_type: HttpMethods): + def match_http_method_from_type( + session: Union[ClientSession, httpx.AsyncClient], method_type: HttpMethods + ): return getattr(session, method_type.value) - async def _process_(self, res: ClientResponse): + @staticmethod + async def _response_to_data( + res: Union[ClientResponse, httpx.Response], *, is_json: bool + ) -> Any: + if isinstance(res, httpx.Response): + return res.json() if is_json else res.text + else: + res = cast(ClientResponse, res) + return await (res.json() if is_json else res.text()) + + async def _process_(self, res: Union[ClientResponse, httpx.Response]): # do not process data when the http response is an error if is_http_error_response(res): return res # if we are asked to process the data before we return it if self._event.config.process_data: - # if data is JSON - if self._event.config.is_json: - data = await res.json() - else: - data = await res.text() + data = await self._response_to_data(res, is_json=self._event.config.is_json) return data # return raw result else: diff --git a/packages/opal-common/opal_common/http.py b/packages/opal-common/opal_common/http.py index 8ff942320..9c2d35a76 100644 --- a/packages/opal-common/opal_common/http.py +++ b/packages/opal-common/opal_common/http.py @@ -1,6 +1,17 @@ +from typing import Union + import aiohttp +import httpx -def is_http_error_response(response: aiohttp.ClientResponse) -> bool: +def is_http_error_response( + response: Union[aiohttp.ClientResponse, httpx.Response] +) -> bool: """HTTP 400 and above are considered error responses.""" - return response.status >= 400 + status: int = ( + response.status + if isinstance(response, aiohttp.ClientResponse) + else response.status_code + ) + + return status >= 400 diff --git a/packages/opal-common/requires.txt b/packages/opal-common/requires.txt index 30494e4f0..90d21f1df 100644 --- a/packages/opal-common/requires.txt +++ b/packages/opal-common/requires.txt @@ -10,3 +10,4 @@ datadog>=0.44.0, <1 ddtrace>=2.8.1,<3 certifi>=2023.7.22 # not directly required, pinned by Snyk to avoid a vulnerability requests>=2.31.0 # not directly required, pinned by Snyk to avoid a vulnerability +httpx==0.27.0 From c898bda0503735b890e53ba8ecfe1d592d9adc89 Mon Sep 17 00:00:00 2001 From: Omer Zuarets Date: Thu, 30 May 2024 18:11:25 +0300 Subject: [PATCH 2/3] change the http fetcher client to env var with default of aiohttp --- packages/opal-common/opal_common/config.py | 6 ++++++ .../opal_common/fetcher/providers/http_fetch_provider.py | 7 +++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/packages/opal-common/opal_common/config.py b/packages/opal-common/opal_common/config.py index 0397196b7..6058b3079 100644 --- a/packages/opal-common/opal_common/config.py +++ b/packages/opal-common/opal_common/config.py @@ -169,6 +169,12 @@ class OpalCommonConfig(Confi): False, description="Set if OPAL server should enable tracing with datadog APM", ) + HTTP_FETCHER_PROVIDER_CLIENT = confi.str( + "HTTP_FETCHER_PROVIDER_CLIENT", + "aiohttp", + description="The client to use for fetching data, can be either aiohttp or httpx." + "if provided different value, aiohttp will be used.", + ) opal_common_config = OpalCommonConfig(prefix="OPAL_") diff --git a/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py b/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py index c2ff60d7e..66cf8f6f1 100644 --- a/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py +++ b/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py @@ -6,7 +6,7 @@ import httpx from aiohttp import ClientResponse, ClientSession from pydantic import validator -from typing_extensions import Literal +from opal_common.config import opal_common_config from ...http import is_http_error_response from ...security.sslcontext import get_custom_ssl_context @@ -50,7 +50,6 @@ class Config: class HttpFetchEvent(FetchEvent): fetcher: str = "HttpFetchProvider" config: HttpFetcherConfig = None - client_type: Literal["httpx", "aiohttp"] = "httpx" class HttpFetchProvider(BaseFetchProvider): @@ -74,11 +73,11 @@ async def __aenter__(self): headers = {} if self._event.config.headers is not None: headers = self._event.config.headers - if self._event.client_type == "httpx": + if opal_common_config.HTTP_FETCHER_PROVIDER_CLIENT == "httpx": self._session = httpx.AsyncClient(headers=headers) else: self._session = ClientSession(headers=headers, raise_for_status=True) - self._session = self._session.__aenter__() + self._session = await self._session.__aenter__() return self async def __aexit__(self, exc_type=None, exc_val=None, tb=None): From a0b217fb3f1a3a669462164a9d92d327ad5ed08c Mon Sep 17 00:00:00 2001 From: Omer Zuarets Date: Thu, 30 May 2024 18:15:40 +0300 Subject: [PATCH 3/3] add raise_for_status as httpx doesn't have this configuration in client level --- packages/opal-common/opal_common/config.py | 2 +- .../opal_common/fetcher/providers/http_fetch_provider.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/opal-common/opal_common/config.py b/packages/opal-common/opal_common/config.py index 6058b3079..7666d47e4 100644 --- a/packages/opal-common/opal_common/config.py +++ b/packages/opal-common/opal_common/config.py @@ -173,7 +173,7 @@ class OpalCommonConfig(Confi): "HTTP_FETCHER_PROVIDER_CLIENT", "aiohttp", description="The client to use for fetching data, can be either aiohttp or httpx." - "if provided different value, aiohttp will be used.", + "if provided different value, aiohttp will be used.", ) diff --git a/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py b/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py index 66cf8f6f1..7261b538b 100644 --- a/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py +++ b/packages/opal-common/opal_common/fetcher/providers/http_fetch_provider.py @@ -5,8 +5,8 @@ import httpx from aiohttp import ClientResponse, ClientSession -from pydantic import validator from opal_common.config import opal_common_config +from pydantic import validator from ...http import is_http_error_response from ...security.sslcontext import get_custom_ssl_context @@ -89,11 +89,12 @@ async def _fetch_(self): self._session, self._event.config.method ) if self._event.config.data is not None: - result = await http_method( + result: Union[ClientResponse, httpx.Response] = await http_method( self._url, data=self._event.config.data, **self._ssl_context_kwargs ) else: result = await http_method(self._url, **self._ssl_context_kwargs) + result.raise_for_status() return result @staticmethod