Skip to content

Commit

Permalink
Reusable async client (opensearch-project#639)
Browse files Browse the repository at this point in the history
* set aiohttp.ClientSession to None after close()

Signed-off-by: odlmarce <[email protected]>

* add test

Signed-off-by: odlmarce <[email protected]>

* update changelog

Signed-off-by: odlmarce <[email protected]>

* update changelog + format
Signed-off-by: odlmarce <[email protected]>

* update changelog
Signed-off-by: odlmarce <[email protected]>

* add tests using `with` and synchronous client
Signed-off-by: odlmarce <[email protected]>

* fix `urllib3.exceptions.ClosedPoolError` breaking synchronous client after `close`
Signed-off-by: odlmarce <[email protected]>

* update changelog
Signed-off-by: odlmarce <[email protected]>

* separate tests
Signed-off-by: odlmarce <[email protected]>

* refactor pool factory as lambda
Signed-off-by: odlmarce <[email protected]>

---------

Signed-off-by: odlmarce <[email protected]>
  • Loading branch information
odelmarcelle authored Jan 2, 2024
1 parent 7b0b58d commit 3eba72c
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Removed unnecessary `# -*- coding: utf-8 -*-` headers from .py files ([#615](https://github.com/opensearch-project/opensearch-py/pull/615), [#617](https://github.com/opensearch-project/opensearch-py/pull/617))
### Fixed
- Fix KeyError when scroll return no hits ([#616](https://github.com/opensearch-project/opensearch-py/pull/616))
- Fix reuse of `OpenSearch` using `Urllib3HttpConnection` and `AsyncOpenSearch` after calling `close` ([#639](https://github.com/opensearch-project/opensearch-py/pull/639))
### Security
### Dependencies
- Bumps `pytest-asyncio` from <=0.21.1 to <=0.23.2
Expand Down
1 change: 1 addition & 0 deletions opensearchpy/_async/http_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ async def close(self) -> Any:
"""
if self.session:
await self.session.close()
self.session = None

async def _create_aiohttp_session(self) -> Any:
"""Creates an aiohttp.ClientSession(). This is delayed until
Expand Down
1 change: 1 addition & 0 deletions opensearchpy/connection/http_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ async def close(self) -> Any:
"""
if self.session:
await self.session.close()
self.session = None

async def _create_aiohttp_session(self) -> Any:
"""Creates an aiohttp.ClientSession(). This is delayed until
Expand Down
14 changes: 12 additions & 2 deletions opensearchpy/connection/http_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,13 @@ def __init__(
if pool_maxsize and isinstance(pool_maxsize, int):
kw["maxsize"] = pool_maxsize

self.pool = pool_class(
self._urllib3_pool_factory = lambda: pool_class(
self.hostname, port=self.port, timeout=self.timeout, **kw
)
self._create_urllib3_pool()

def _create_urllib3_pool(self) -> None:
self.pool = self._urllib3_pool_factory() # type: ignore

def perform_request(
self,
Expand All @@ -228,6 +232,10 @@ def perform_request(
ignore: Collection[int] = (),
headers: Optional[Mapping[str, str]] = None,
) -> Any:
if self.pool is None:
self._create_urllib3_pool()
assert self.pool is not None

url = self.url_prefix + url
if params:
url = "%s?%s" % (url, urlencode(params))
Expand Down Expand Up @@ -305,4 +313,6 @@ def close(self) -> None:
"""
Explicitly closes connection
"""
self.pool.close()
if self.pool:
self.pool.close()
self.pool = None
12 changes: 12 additions & 0 deletions test_opensearchpy/test_async/test_server/test_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,15 @@ async def test_aiohttp_connection_works_without_yarl(

resp = await async_client.info(pretty=True)
assert isinstance(resp, dict)


class TestClose:
async def test_close_doesnt_break_client(self, async_client: Any) -> None:
await async_client.cluster.health()
await async_client.close()
await async_client.cluster.health()

async def test_with_doesnt_break_client(self, async_client: Any) -> None:
for _ in range(2):
async with async_client as client:
await client.cluster.health()
12 changes: 12 additions & 0 deletions test_opensearchpy/test_server/test_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,15 @@ def test_bulk_works_with_bytestring_body(self) -> None:

self.assertFalse(response["errors"])
self.assertEqual(1, len(response["items"]))


class TestClose(OpenSearchTestCase):
def test_close_doesnt_break_client(self) -> None:
self.client.cluster.health()
self.client.close()
self.client.cluster.health()

def test_with_doesnt_break_client(self) -> None:
for _ in range(2):
with self.client as client:
client.cluster.health()

0 comments on commit 3eba72c

Please sign in to comment.