Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate loop parameter (adopt to Python 3.8) #198

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions aioelasticsearch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ def __init__(
loop=None,
**kwargs
):
if loop is None:
loop = asyncio.get_event_loop()

self.loop = loop
self.loop = asyncio.get_event_loop()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't sure about this: left for backward compatibility


kwargs['loop'] = self.loop

Expand Down
5 changes: 2 additions & 3 deletions aioelasticsearch/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(
maxsize=10,
headers=None,
*,
loop,
loop=None,
**kwargs
):
super().__init__(host=host, port=port, use_ssl=use_ssl, **kwargs)
Expand All @@ -31,7 +31,7 @@ def __init__(
self.headers = headers
self.headers.setdefault('Content-Type', 'application/json')

self.loop = loop
self.loop = asyncio.get_event_loop()

if http_auth is not None:
if isinstance(http_auth, aiohttp.BasicAuth):
Expand Down Expand Up @@ -67,7 +67,6 @@ def __init__(
connector=aiohttp.TCPConnector(
limit=maxsize,
use_dns_cache=kwargs.get('use_dns_cache', False),
loop=self.loop,
**kwargs,
),
)
Expand Down
12 changes: 6 additions & 6 deletions aioelasticsearch/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ def __init__(
selector_class=RoundRobinSelector,
randomize_hosts=True,
*,
loop,
loop=None,
**kwargs
):
self._dead_timeout = dead_timeout
self.timeout_cutoff = timeout_cutoff
self.connection_opts = connections
self.connections = [c for (c, _) in connections]
self.orig_connections = set(self.connections)
self.dead = asyncio.PriorityQueue(len(self.connections), loop=loop)
self.dead = asyncio.PriorityQueue(len(self.connections))
self.dead_count = collections.Counter()

self.loop = loop
self.loop = asyncio.get_event_loop()

if randomize_hosts:
random.shuffle(self.connections)
Expand Down Expand Up @@ -111,18 +111,18 @@ async def close(self, *, skip=frozenset()):
self.orig_connections - skip
]

await asyncio.gather(*coros, loop=self.loop)
await asyncio.gather(*coros)


class DummyConnectionPool(AIOHttpConnectionPool):

def __init__(self, connections, *, loop, **kwargs):
def __init__(self, connections, *, loop=None, **kwargs):
if len(connections) != 1:
raise ImproperlyConfigured(
'DummyConnectionPool needs exactly one connection defined.',
)

self.loop = loop
self.loop = asyncio.get_event_loop()

self.connection_opts = connections
self.connection = connections[0][0]
Expand Down
12 changes: 5 additions & 7 deletions aioelasticsearch/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ def __init__(
retry_on_timeout=False,
send_get_body_as='GET',
*,
loop,
loop=None,
**kwargs
):
self.loop = loop
self.loop = asyncio.get_event_loop()
self._closed = False

_serializers = DEFAULT_SERIALIZERS.copy()
Expand Down Expand Up @@ -71,7 +71,7 @@ def __init__(
# store all strategies...
self.connection_pool_class = connection_pool_class
self.connection_class = connection_class
self._connection_pool_lock = asyncio.Lock(loop=self.loop)
self._connection_pool_lock = asyncio.Lock()

# ...save kwargs to be passed to the connections
self.kwargs = kwargs
Expand All @@ -92,8 +92,7 @@ def _initial_sniff_reset(fut):

task = self.sniff_hosts(initial=True)

self.initial_sniff_task = asyncio.ensure_future(task,
loop=self.loop)
self.initial_sniff_task = asyncio.ensure_future(task)
self.initial_sniff_task.add_done_callback(_initial_sniff_reset)

def set_connections(self, hosts):
Expand Down Expand Up @@ -125,7 +124,6 @@ def _create_connection(host):
if len(connections) == 1:
self.connection_pool = DummyConnectionPool(
connections,
loop=self.loop,
**self.kwargs
)
else:
Expand Down Expand Up @@ -219,7 +217,7 @@ async def _initial_sniff_wrapper():

coros.append(self.connection_pool.close())

await asyncio.gather(*coros, loop=self.loop)
await asyncio.gather(*coros)
self._closed = True

async def get_connection(self):
Expand Down
43 changes: 7 additions & 36 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,6 @@
import aioelasticsearch


@pytest.fixture
def loop(request):
asyncio.set_event_loop(None)

loop = asyncio.new_event_loop()

yield loop

if not loop._closed:
loop.call_soon(loop.stop)
loop.run_forever()
loop.close()

gc.collect()
asyncio.set_event_loop(None)


@pytest.fixture(scope='session')
def session_id():
'''Unique session identifier, random string.'''
Expand Down Expand Up @@ -157,21 +140,20 @@ def es_server(es_clean, es_container):


@pytest.fixture
def es(es_server, auto_close, loop):
def es(es_server, auto_close):
es = aioelasticsearch.Elasticsearch(
hosts=[{
'host': es_server['host'],
'port': es_server['port'],
}],
http_auth=es_server['auth'],
loop=loop,
)

return auto_close(es)


@pytest.fixture
def auto_close(loop):
def auto_close():
close_list = []

def f(arg):
Expand All @@ -180,40 +162,29 @@ def f(arg):

yield f

loop = asyncio.get_event_loop()
for arg in close_list:
loop.run_until_complete(arg.close())


@pytest.mark.tryfirst
def pytest_pycollect_makeitem(collector, name, obj):
if collector.funcnamefilter(name):
item = pytest.Function(name, parent=collector)

if 'run_loop' in item.keywords:
return list(collector._genfunctions(name, obj))


@pytest.mark.tryfirst
def pytest_pyfunc_call(pyfuncitem):
if 'run_loop' in pyfuncitem.keywords:
if asyncio.iscoroutinefunction(pyfuncitem.obj):
funcargs = pyfuncitem.funcargs

loop = funcargs['loop']

testargs = {
arg: funcargs[arg]
for arg in pyfuncitem._fixtureinfo.argnames
}

assert asyncio.iscoroutinefunction(pyfuncitem.obj)

loop = asyncio.get_event_loop()
loop.run_until_complete(pyfuncitem.obj(**testargs))

return True


@pytest.fixture
def populate(es, loop):
def populate(es):
async def do(index, n, body):
coros = []

Expand All @@ -228,6 +199,6 @@ async def do(index, n, body):
),
)

await asyncio.gather(*coros, loop=loop)
await asyncio.gather(*coros)
await es.indices.refresh()
return do
12 changes: 0 additions & 12 deletions tests/test_aioelasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,14 @@
from aioelasticsearch import Elasticsearch


@pytest.mark.run_loop
async def test_ping(es):
ping = await es.ping()

assert ping


@pytest.mark.run_loop
@asyncio.coroutine
def test_ping2(es):
ping = yield from es.ping()

assert ping


def test_elastic_default_loop(auto_close, loop):
asyncio.set_event_loop(loop)

es = Elasticsearch()

auto_close(es)

assert es.loop is loop
84 changes: 35 additions & 49 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,101 +10,87 @@
SSLError)


@pytest.mark.run_loop
async def test_default_headers(auto_close, loop):
conn = auto_close(AIOHttpConnection(loop=loop))
async def test_default_headers(auto_close):
conn = auto_close(AIOHttpConnection())
assert conn.headers == {'Content-Type': 'application/json'}


@pytest.mark.run_loop
async def test_custom_headers(auto_close, loop):
conn = auto_close(AIOHttpConnection(headers={'X-Custom': 'value'},
loop=loop))
async def test_custom_headers(auto_close):
conn = auto_close(AIOHttpConnection(headers={'X-Custom': 'value'}))
assert conn.headers == {'Content-Type': 'application/json',
'X-Custom': 'value'}


@pytest.mark.run_loop
async def test_auth_no_auth(auto_close, loop):
conn = auto_close(AIOHttpConnection(loop=loop))
async def test_auth_no_auth(auto_close):
conn = auto_close(AIOHttpConnection())
assert conn.http_auth is None


@pytest.mark.run_loop
async def test_ssl_context(auto_close, loop):
async def test_ssl_context(auto_close):
context = ssl.create_default_context()
conn = auto_close(
AIOHttpConnection(loop=loop, verify_certs=True, ssl_context=context)
AIOHttpConnection(verify_certs=True, ssl_context=context)
)
assert conn.session.connector._ssl is context


@pytest.mark.run_loop
async def test_auth_str(auto_close, loop):
async def test_auth_str(auto_close):
auth = aiohttp.BasicAuth('user', 'pass')
conn = auto_close(AIOHttpConnection(http_auth='user:pass', loop=loop))
conn = auto_close(AIOHttpConnection(http_auth='user:pass'))
assert conn.http_auth == auth


@pytest.mark.run_loop
async def test_auth_tuple(auto_close, loop):
async def test_auth_tuple(auto_close):
auth = aiohttp.BasicAuth('user', 'pass')
conn = auto_close(AIOHttpConnection(http_auth=('user', 'pass'), loop=loop))
conn = auto_close(AIOHttpConnection(http_auth=('user', 'pass')))
assert conn.http_auth == auth


@pytest.mark.run_loop
async def test_auth_basicauth(auto_close, loop):
async def test_auth_basicauth(auto_close):
auth = aiohttp.BasicAuth('user', 'pass')
conn = auto_close(AIOHttpConnection(http_auth=auth, loop=loop))
conn = auto_close(AIOHttpConnection(http_auth=auth))
assert conn.http_auth == auth


@pytest.mark.run_loop
async def test_auth_invalid(loop):
async def test_auth_invalid():
with pytest.raises(TypeError):
AIOHttpConnection(http_auth=object(), loop=loop)
AIOHttpConnection(http_auth=object())


@pytest.mark.run_loop
async def test_explicit_session(auto_close, loop):
session = aiohttp.ClientSession(loop=loop)
conn = auto_close(AIOHttpConnection(session=session, loop=loop))
assert conn.session is session
async def test_explicit_session(auto_close):
async with aiohttp.ClientSession() as session:
conn = auto_close(AIOHttpConnection(session=session))
assert conn.session is session


@pytest.mark.run_loop
async def test_explicit_session_not_closed(loop):
session = aiohttp.ClientSession(loop=loop)
conn = AIOHttpConnection(session=session, loop=loop)
await conn.close()
assert not conn.session.closed and not session.closed
async def test_explicit_session_not_closed():
async with aiohttp.ClientSession() as session:
conn = AIOHttpConnection(session=session)
await conn.close()
assert not conn.session.closed and not session.closed


@pytest.mark.run_loop
async def test_session_closed(loop):
conn = AIOHttpConnection(loop=loop)
async def test_session_closed():
conn = AIOHttpConnection()
await conn.close()
assert conn.session.closed


@pytest.mark.run_loop
async def test_perform_request_ssl_error(auto_close, loop):
async def test_perform_request_ssl_error(auto_close):
for exc, expected in [
(aiohttp.ClientConnectorCertificateError(mock.Mock(), mock.Mock()), SSLError), # noqa
(aiohttp.ClientConnectorSSLError(mock.Mock(), mock.Mock()), SSLError),
(aiohttp.ClientSSLError(mock.Mock(), mock.Mock()), SSLError),
(aiohttp.ClientError('Other'), ConnectionError),
(asyncio.TimeoutError, ConnectionTimeout),
]:
session = aiohttp.ClientSession(loop=loop)
async with aiohttp.ClientSession() as session:

async def coro(*args, **Kwargs):
raise exc
async def coro(*args, **Kwargs):
raise exc

session._request = coro
session._request = coro

conn = auto_close(AIOHttpConnection(session=session, loop=loop,
use_ssl=True))
with pytest.raises(expected):
await conn.perform_request('HEAD', '/')
conn = auto_close(AIOHttpConnection(session=session, use_ssl=True))
with pytest.raises(expected):
await conn.perform_request('HEAD', '/')
Loading