Skip to content

Commit

Permalink
Ocean async client (#285)
Browse files Browse the repository at this point in the history
  • Loading branch information
yairsimantov20 authored Dec 21, 2023
1 parent f0aa9df commit b3c5b29
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 18 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.4.10 (2023-12-21)


### Improvements

- Wrapped the httpx async client with implementation that overrides the default transport class with custom transport to apply all default httpx features that are ignored when passing a custom transport instance. This allows the missing behevior of the http (proxy environment variable)[https://www.python-httpx.org/environment_variables/#proxies] (PORT-5676)
- Changed deprecated `poetry lock --check` in the make files to `poetry check` (PORT-5711)

### Bug Fixes

- Changed the way we upsert and delete bulk of entities from the catalog to be batched rather than spawning all requests at once


## 0.4.9 (2023-12-19)


Expand Down
1 change: 0 additions & 1 deletion changelog/1.bugfix.md

This file was deleted.

1 change: 0 additions & 1 deletion changelog/PORT-5711.improvement.md

This file was deleted.

4 changes: 2 additions & 2 deletions port_ocean/clients/port/retry_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@


class TokenRetryTransport(RetryTransport):
def __init__(self, port_client: "PortClient", *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
def __init__(self, port_client: "PortClient", **kwargs: Any) -> None:
super().__init__(**kwargs)
self.port_client = port_client

def _is_retryable_method(self, request: httpx.Request) -> bool:
Expand Down
10 changes: 4 additions & 6 deletions port_ocean/clients/port/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from werkzeug.local import LocalStack, LocalProxy

from port_ocean.clients.port.retry_transport import TokenRetryTransport
from port_ocean.helpers.async_client import OceanAsyncClient

if TYPE_CHECKING:
from port_ocean.clients.port.client import PortClient
Expand All @@ -27,12 +28,9 @@
def _get_http_client_context(port_client: "PortClient") -> httpx.AsyncClient:
client = _http_client.top
if client is None:
client = httpx.AsyncClient(
transport=TokenRetryTransport(
port_client,
httpx.AsyncHTTPTransport(),
logger=logger,
),
client = OceanAsyncClient(
TokenRetryTransport,
transport_kwargs={"port_client": port_client},
timeout=httpx.Timeout(PORT_HTTP_TIMEOUT),
limits=httpx.Limits(
max_connections=PORT_HTTP_MAX_CONNECTIONS_LIMIT,
Expand Down
53 changes: 53 additions & 0 deletions port_ocean/helpers/async_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import Any, Callable, Type

import httpx
from loguru import logger

from port_ocean.helpers.retry import RetryTransport


class OceanAsyncClient(httpx.AsyncClient):
"""
This class is a wrapper around httpx.AsyncClient that uses a custom transport class.
This is done to allow passing our custom transport class to the AsyncClient constructor while still allowing
all the default AsyncClient behavior that is changed when passing a custom transport instance.
"""

def __init__(
self,
transport_class: Type[RetryTransport] = RetryTransport,
transport_kwargs: dict[str, Any] | None = None,
**kwargs: Any,
):
self._transport_kwargs = transport_kwargs
self._transport_class = transport_class
super().__init__(**kwargs)

def _init_transport( # type: ignore[override]
self,
transport: httpx.AsyncBaseTransport | None = None,
app: Callable[..., Any] | None = None,
**kwargs: Any,
) -> httpx.AsyncBaseTransport:
if transport is not None or app is not None:
return super()._init_transport(transport=transport, app=app, **kwargs)

return self._transport_class(
wrapped_transport=httpx.AsyncHTTPTransport(
**kwargs,
),
logger=logger,
**(self._transport_kwargs or {}),
)

def _init_proxy_transport( # type: ignore[override]
self, proxy: httpx.Proxy, **kwargs: Any
) -> httpx.AsyncBaseTransport:
return self._transport_class(
wrapped_transport=httpx.AsyncHTTPTransport(
proxy=proxy,
**kwargs,
),
logger=logger,
**(self._transport_kwargs or {}),
)
10 changes: 3 additions & 7 deletions port_ocean/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from asyncio import ensure_future
from functools import wraps
from importlib.util import module_from_spec, spec_from_file_location
from pathlib import Path
from time import time
from traceback import format_exception
from types import ModuleType
Expand All @@ -13,10 +14,10 @@
import tomli
import yaml
from loguru import logger
from pathlib import Path
from starlette.concurrency import run_in_threadpool
from werkzeug.local import LocalStack, LocalProxy

from port_ocean.helpers.async_client import OceanAsyncClient
from port_ocean.helpers.retry import RetryTransport

_http_client: LocalStack[httpx.AsyncClient] = LocalStack()
Expand All @@ -25,12 +26,7 @@
def _get_http_client_context() -> httpx.AsyncClient:
client = _http_client.top
if client is None:
client = httpx.AsyncClient(
transport=RetryTransport(
httpx.AsyncHTTPTransport(),
logger=logger,
)
)
client = OceanAsyncClient(RetryTransport)
_http_client.push(client)

return client
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.4.9"
version = "0.4.10"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down

0 comments on commit b3c5b29

Please sign in to comment.