From 944fd21e6cde47ef71d3bfd3af8bbf3f48ed4ae0 Mon Sep 17 00:00:00 2001 From: Sebastian Molenda Date: Fri, 28 Mar 2025 22:24:36 +0100 Subject: [PATCH] AioHTTP and Async HTTPX teardown improvments pt.1 --- pubnub/pubnub_core.py | 5 +- pubnub/request_handlers/async_aiohttp.py | 42 +++++++++++-- pubnub/request_handlers/async_httpx.py | 79 ++++++++++++++++++++---- pubnub/request_handlers/httpx.py | 2 +- 4 files changed, 107 insertions(+), 21 deletions(-) diff --git a/pubnub/pubnub_core.py b/pubnub/pubnub_core.py index 412cdda3..24fff96e 100644 --- a/pubnub/pubnub_core.py +++ b/pubnub/pubnub_core.py @@ -250,8 +250,9 @@ def list_push_channels(self, device_id: str = None, push_type: PNPushType = None environment: PNPushEnvironment = None) -> ListPushProvisions: return ListPushProvisions(self, device_id=device_id, push_type=push_type, topic=topic, environment=environment) - def add_channels_to_push(self, channels: Union[str, List[str]], device_id: str = None, push_type: PNPushType = None, - topic: str = None, environment: PNPushEnvironment = None) -> AddChannelsToPush: + def add_channels_to_push(self, channels: Union[str, List[str]] = None, device_id: str = None, + push_type: PNPushType = None, topic: str = None, environment: PNPushEnvironment = None + ) -> AddChannelsToPush: return AddChannelsToPush(self, channels=channels, device_id=device_id, push_type=push_type, topic=topic, environment=environment) diff --git a/pubnub/request_handlers/async_aiohttp.py b/pubnub/request_handlers/async_aiohttp.py index 8c7ee4fc..f27757fe 100644 --- a/pubnub/request_handlers/async_aiohttp.py +++ b/pubnub/request_handlers/async_aiohttp.py @@ -28,25 +28,55 @@ class AsyncAiohttpRequestHandler(BaseRequestHandler): ENDPOINT_THREAD_COUNTER: int = 0 _connector: aiohttp.TCPConnector = None _session: aiohttp.ClientSession = None + _is_closing: bool = False + _max_connections: int = 100 + _connection_timeout: float = 30.0 def __init__(self, pubnub): self.pubnub = pubnub - self._connector = aiohttp.TCPConnector(verify_ssl=True, loop=self.pubnub.event_loop) + self._connector = aiohttp.TCPConnector( + verify_ssl=True, + loop=self.pubnub.event_loop, + limit=self._max_connections, + ttl_dns_cache=300 + ) + self._is_closing = False async def create_session(self): - if not self._session: + if not self._session and not self._is_closing: self._session = aiohttp.ClientSession( loop=self.pubnub.event_loop, - timeout=aiohttp.ClientTimeout(connect=self.pubnub.config.connect_timeout), + timeout=aiohttp.ClientTimeout( + total=self._connection_timeout, + connect=self.pubnub.config.connect_timeout, + sock_read=self.pubnub.config.connect_timeout, + sock_connect=self.pubnub.config.connect_timeout + ), connector=self._connector ) async def close_session(self): - if self._session is not None: - await self._session.close() + if self._session is not None and not self._is_closing: + self._is_closing = True + try: + # Cancel any pending requests + if hasattr(self._session, '_connector'): + for task in asyncio.all_tasks(): + if not task.done() and task is not asyncio.current_task(): + task.cancel() + + # Close session and connector + await self._session.close() + await self._connector.close() + except Exception as e: + logger.error(f"Error during session cleanup: {str(e)}") + finally: + self._session = None + self._is_closing = False async def set_connector(self, connector): - await self._session.aclose() + if self._session is not None: + await self.close_session() self._connector = connector await self.create_session() diff --git a/pubnub/request_handlers/async_httpx.py b/pubnub/request_handlers/async_httpx.py index ea1d5149..cf406b51 100644 --- a/pubnub/request_handlers/async_httpx.py +++ b/pubnub/request_handlers/async_httpx.py @@ -24,33 +24,85 @@ def close(self): self.is_closed = True asyncio.create_task(super().aclose()) + async def aclose(self): + self.is_closed = True + await super().aclose() + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.aclose() + class AsyncHttpxRequestHandler(BaseRequestHandler): """ PubNub Python SDK asychronous requests handler based on the `httpx` HTTP library. """ ENDPOINT_THREAD_COUNTER: int = 0 _connector: httpx.AsyncHTTPTransport = None _session: httpx.AsyncClient = None + _is_closing: bool = False + _max_connections: int = 100 + _connection_timeout: float = 30.0 def __init__(self, pubnub): self.pubnub = pubnub - self._connector = PubNubAsyncHTTPTransport(verify=True, http2=True) + self._connector = PubNubAsyncHTTPTransport( + verify=True, + http2=True, + limits=httpx.Limits(max_connections=self._max_connections) + ) + self._is_closing = False async def create_session(self): - self._session = httpx.AsyncClient( - timeout=httpx.Timeout(self.pubnub.config.connect_timeout), - transport=self._connector - ) + if self._session is None and not self._is_closing: + self._session = httpx.AsyncClient( + timeout=httpx.Timeout( + connect=self._connection_timeout, + read=self.pubnub.config.connect_timeout, + write=self.pubnub.config.connect_timeout, + pool=self._connection_timeout + ), + transport=self._connector, + http2=True + ) async def close_session(self): - if self._session is not None: - self._connector.close() - await self._session.aclose() + if self._session is not None and not self._is_closing: + self._is_closing = True + try: + # Cancel any pending requests + if hasattr(self._session, '_transport'): + for task in asyncio.all_tasks(): + if not task.done() and task is not asyncio.current_task(): + task.cancel() + + # Close transport and session + await self._connector.aclose() + await self._session.aclose() + except Exception as e: + logger.error(f"Error during session cleanup: {str(e)}") + finally: + self._session = None + self._is_closing = False async def set_connector(self, connector): - await self._session.aclose() + if self._session is not None: + await self.close_session() self._connector = connector await self.create_session() + async def __aenter__(self): + await self.create_session() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close_session() + + def __del__(self): + ... + # if self._session is not None and not self._is_closing: + # asyncio.create_task(self.close_session()) + def sync_request(self, **_): raise NotImplementedError("sync_request is not implemented for asyncio handler") @@ -123,13 +175,17 @@ async def async_request(self, options_func, cancellation_event): except Exception as e: logger.error("session.request exception: %s" % str(e)) raise + try: + response_body = await response.aread() + except Exception as e: + logger.error(f"Error reading response body: {str(e)}") + response_body = None - response_body = response.read() if not options.non_json_response: body = response_body else: if isinstance(response.content, bytes): - body = response.content # TODO: simplify this logic within the v5 release + body = response.content else: body = response_body @@ -192,7 +248,6 @@ async def async_request(self, options_func, cancellation_event): logger.debug(data) if response.status_code not in (200, 307, 204): - if response.status_code >= 500: err = PNERR_SERVER_ERROR else: diff --git a/pubnub/request_handlers/httpx.py b/pubnub/request_handlers/httpx.py index 8da2da74..31cfcab0 100644 --- a/pubnub/request_handlers/httpx.py +++ b/pubnub/request_handlers/httpx.py @@ -28,7 +28,7 @@ class HttpxRequestHandler(BaseRequestHandler): ENDPOINT_THREAD_COUNTER: int = 0 def __init__(self, pubnub): - self.session = httpx.Client() + self.session = httpx.Client(http2=True) self.pubnub = pubnub