diff --git a/aioelasticsearch/__init__.py b/aioelasticsearch/__init__.py index 9f60f0df..a8d31349 100644 --- a/aioelasticsearch/__init__.py +++ b/aioelasticsearch/__init__.py @@ -23,10 +23,7 @@ def __init__( loop=None, **kwargs ): - if loop is None: - loop = asyncio.get_event_loop() - - self.loop = loop + self.loop = asyncio.get_event_loop() kwargs['loop'] = self.loop diff --git a/aioelasticsearch/connection.py b/aioelasticsearch/connection.py index 4903d517..764bfb32 100644 --- a/aioelasticsearch/connection.py +++ b/aioelasticsearch/connection.py @@ -21,7 +21,7 @@ def __init__( maxsize=10, headers=None, *, - loop, + loop=None, **kwargs ): super().__init__(host=host, port=port, use_ssl=use_ssl, **kwargs) @@ -31,7 +31,7 @@ def __init__( self.headers = headers self.headers.setdefault('Content-Type', 'application/json') - self.loop = loop + self.loop = asyncio.get_event_loop() if http_auth is not None: if isinstance(http_auth, aiohttp.BasicAuth): @@ -67,7 +67,6 @@ def __init__( connector=aiohttp.TCPConnector( limit=maxsize, use_dns_cache=kwargs.get('use_dns_cache', False), - loop=self.loop, **kwargs, ), ) diff --git a/aioelasticsearch/pool.py b/aioelasticsearch/pool.py index 06ec1254..df126633 100644 --- a/aioelasticsearch/pool.py +++ b/aioelasticsearch/pool.py @@ -20,7 +20,7 @@ def __init__( selector_class=RoundRobinSelector, randomize_hosts=True, *, - loop, + loop=None, **kwargs ): self._dead_timeout = dead_timeout @@ -28,10 +28,10 @@ def __init__( self.connection_opts = connections self.connections = [c for (c, _) in connections] self.orig_connections = set(self.connections) - self.dead = asyncio.PriorityQueue(len(self.connections), loop=loop) + self.dead = asyncio.PriorityQueue(len(self.connections)) self.dead_count = collections.Counter() - self.loop = loop + self.loop = asyncio.get_event_loop() if randomize_hosts: random.shuffle(self.connections) @@ -111,18 +111,18 @@ async def close(self, *, skip=frozenset()): self.orig_connections - skip ] - await asyncio.gather(*coros, loop=self.loop) + await asyncio.gather(*coros) class DummyConnectionPool(AIOHttpConnectionPool): - def __init__(self, connections, *, loop, **kwargs): + def __init__(self, connections, *, loop=None, **kwargs): if len(connections) != 1: raise ImproperlyConfigured( 'DummyConnectionPool needs exactly one connection defined.', ) - self.loop = loop + self.loop = asyncio.get_event_loop() self.connection_opts = connections self.connection = connections[0][0] diff --git a/aioelasticsearch/transport.py b/aioelasticsearch/transport.py index 8fe11127..634fb59c 100644 --- a/aioelasticsearch/transport.py +++ b/aioelasticsearch/transport.py @@ -34,10 +34,10 @@ def __init__( retry_on_timeout=False, send_get_body_as='GET', *, - loop, + loop=None, **kwargs ): - self.loop = loop + self.loop = asyncio.get_event_loop() self._closed = False _serializers = DEFAULT_SERIALIZERS.copy() @@ -71,7 +71,7 @@ def __init__( # store all strategies... self.connection_pool_class = connection_pool_class self.connection_class = connection_class - self._connection_pool_lock = asyncio.Lock(loop=self.loop) + self._connection_pool_lock = asyncio.Lock() # ...save kwargs to be passed to the connections self.kwargs = kwargs @@ -92,8 +92,7 @@ def _initial_sniff_reset(fut): task = self.sniff_hosts(initial=True) - self.initial_sniff_task = asyncio.ensure_future(task, - loop=self.loop) + self.initial_sniff_task = asyncio.ensure_future(task) self.initial_sniff_task.add_done_callback(_initial_sniff_reset) def set_connections(self, hosts): @@ -125,7 +124,6 @@ def _create_connection(host): if len(connections) == 1: self.connection_pool = DummyConnectionPool( connections, - loop=self.loop, **self.kwargs ) else: @@ -219,7 +217,7 @@ async def _initial_sniff_wrapper(): coros.append(self.connection_pool.close()) - await asyncio.gather(*coros, loop=self.loop) + await asyncio.gather(*coros) self._closed = True async def get_connection(self): diff --git a/tests/conftest.py b/tests/conftest.py index 321c7575..bb55f7cf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,23 +11,6 @@ import aioelasticsearch -@pytest.fixture -def loop(request): - asyncio.set_event_loop(None) - - loop = asyncio.new_event_loop() - - yield loop - - if not loop._closed: - loop.call_soon(loop.stop) - loop.run_forever() - loop.close() - - gc.collect() - asyncio.set_event_loop(None) - - @pytest.fixture(scope='session') def session_id(): '''Unique session identifier, random string.''' @@ -157,21 +140,20 @@ def es_server(es_clean, es_container): @pytest.fixture -def es(es_server, auto_close, loop): +def es(es_server, auto_close): es = aioelasticsearch.Elasticsearch( hosts=[{ 'host': es_server['host'], 'port': es_server['port'], }], http_auth=es_server['auth'], - loop=loop, ) return auto_close(es) @pytest.fixture -def auto_close(loop): +def auto_close(): close_list = [] def f(arg): @@ -180,40 +162,29 @@ def f(arg): yield f + loop = asyncio.get_event_loop() for arg in close_list: loop.run_until_complete(arg.close()) -@pytest.mark.tryfirst -def pytest_pycollect_makeitem(collector, name, obj): - if collector.funcnamefilter(name): - item = pytest.Function(name, parent=collector) - - if 'run_loop' in item.keywords: - return list(collector._genfunctions(name, obj)) - - @pytest.mark.tryfirst def pytest_pyfunc_call(pyfuncitem): - if 'run_loop' in pyfuncitem.keywords: + if asyncio.iscoroutinefunction(pyfuncitem.obj): funcargs = pyfuncitem.funcargs - loop = funcargs['loop'] - testargs = { arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames } - assert asyncio.iscoroutinefunction(pyfuncitem.obj) - + loop = asyncio.get_event_loop() loop.run_until_complete(pyfuncitem.obj(**testargs)) return True @pytest.fixture -def populate(es, loop): +def populate(es): async def do(index, n, body): coros = [] @@ -228,6 +199,6 @@ async def do(index, n, body): ), ) - await asyncio.gather(*coros, loop=loop) + await asyncio.gather(*coros) await es.indices.refresh() return do diff --git a/tests/test_aioelasticsearch.py b/tests/test_aioelasticsearch.py index 9733f96e..f6d16d39 100644 --- a/tests/test_aioelasticsearch.py +++ b/tests/test_aioelasticsearch.py @@ -5,26 +5,14 @@ from aioelasticsearch import Elasticsearch -@pytest.mark.run_loop async def test_ping(es): ping = await es.ping() assert ping -@pytest.mark.run_loop @asyncio.coroutine def test_ping2(es): ping = yield from es.ping() assert ping - - -def test_elastic_default_loop(auto_close, loop): - asyncio.set_event_loop(loop) - - es = Elasticsearch() - - auto_close(es) - - assert es.loop is loop diff --git a/tests/test_connection.py b/tests/test_connection.py index 006e7120..7a948b6c 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -10,86 +10,73 @@ SSLError) -@pytest.mark.run_loop -async def test_default_headers(auto_close, loop): - conn = auto_close(AIOHttpConnection(loop=loop)) +async def test_default_headers(auto_close): + conn = auto_close(AIOHttpConnection()) assert conn.headers == {'Content-Type': 'application/json'} -@pytest.mark.run_loop -async def test_custom_headers(auto_close, loop): - conn = auto_close(AIOHttpConnection(headers={'X-Custom': 'value'}, - loop=loop)) +async def test_custom_headers(auto_close): + conn = auto_close(AIOHttpConnection(headers={'X-Custom': 'value'})) assert conn.headers == {'Content-Type': 'application/json', 'X-Custom': 'value'} -@pytest.mark.run_loop -async def test_auth_no_auth(auto_close, loop): - conn = auto_close(AIOHttpConnection(loop=loop)) +async def test_auth_no_auth(auto_close): + conn = auto_close(AIOHttpConnection()) assert conn.http_auth is None -@pytest.mark.run_loop -async def test_ssl_context(auto_close, loop): +async def test_ssl_context(auto_close): context = ssl.create_default_context() conn = auto_close( - AIOHttpConnection(loop=loop, verify_certs=True, ssl_context=context) + AIOHttpConnection(verify_certs=True, ssl_context=context) ) assert conn.session.connector._ssl is context -@pytest.mark.run_loop -async def test_auth_str(auto_close, loop): +async def test_auth_str(auto_close): auth = aiohttp.BasicAuth('user', 'pass') - conn = auto_close(AIOHttpConnection(http_auth='user:pass', loop=loop)) + conn = auto_close(AIOHttpConnection(http_auth='user:pass')) assert conn.http_auth == auth -@pytest.mark.run_loop -async def test_auth_tuple(auto_close, loop): +async def test_auth_tuple(auto_close): auth = aiohttp.BasicAuth('user', 'pass') - conn = auto_close(AIOHttpConnection(http_auth=('user', 'pass'), loop=loop)) + conn = auto_close(AIOHttpConnection(http_auth=('user', 'pass'))) assert conn.http_auth == auth -@pytest.mark.run_loop -async def test_auth_basicauth(auto_close, loop): +async def test_auth_basicauth(auto_close): auth = aiohttp.BasicAuth('user', 'pass') - conn = auto_close(AIOHttpConnection(http_auth=auth, loop=loop)) + conn = auto_close(AIOHttpConnection(http_auth=auth)) assert conn.http_auth == auth -@pytest.mark.run_loop -async def test_auth_invalid(loop): +async def test_auth_invalid(): with pytest.raises(TypeError): - AIOHttpConnection(http_auth=object(), loop=loop) + AIOHttpConnection(http_auth=object()) -@pytest.mark.run_loop -async def test_explicit_session(auto_close, loop): - session = aiohttp.ClientSession(loop=loop) - conn = auto_close(AIOHttpConnection(session=session, loop=loop)) - assert conn.session is session +async def test_explicit_session(auto_close): + async with aiohttp.ClientSession() as session: + conn = auto_close(AIOHttpConnection(session=session)) + assert conn.session is session -@pytest.mark.run_loop -async def test_explicit_session_not_closed(loop): - session = aiohttp.ClientSession(loop=loop) - conn = AIOHttpConnection(session=session, loop=loop) - await conn.close() - assert not conn.session.closed and not session.closed +async def test_explicit_session_not_closed(): + async with aiohttp.ClientSession() as session: + conn = AIOHttpConnection(session=session) + await conn.close() + assert not conn.session.closed and not session.closed -@pytest.mark.run_loop -async def test_session_closed(loop): - conn = AIOHttpConnection(loop=loop) +async def test_session_closed(): + conn = AIOHttpConnection() await conn.close() assert conn.session.closed -@pytest.mark.run_loop -async def test_perform_request_ssl_error(auto_close, loop): +async def test_perform_request_ssl_error(auto_close): for exc, expected in [ (aiohttp.ClientConnectorCertificateError(mock.Mock(), mock.Mock()), SSLError), # noqa (aiohttp.ClientConnectorSSLError(mock.Mock(), mock.Mock()), SSLError), @@ -97,14 +84,13 @@ async def test_perform_request_ssl_error(auto_close, loop): (aiohttp.ClientError('Other'), ConnectionError), (asyncio.TimeoutError, ConnectionTimeout), ]: - session = aiohttp.ClientSession(loop=loop) + async with aiohttp.ClientSession() as session: - async def coro(*args, **Kwargs): - raise exc + async def coro(*args, **Kwargs): + raise exc - session._request = coro + session._request = coro - conn = auto_close(AIOHttpConnection(session=session, loop=loop, - use_ssl=True)) - with pytest.raises(expected): - await conn.perform_request('HEAD', '/') + conn = auto_close(AIOHttpConnection(session=session, use_ssl=True)) + with pytest.raises(expected): + await conn.perform_request('HEAD', '/') diff --git a/tests/test_pool.py b/tests/test_pool.py index 0284d00c..55db99ef 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -5,14 +5,12 @@ from aioelasticsearch.pool import DummyConnectionPool -@pytest.mark.run_loop -async def test_mark_dead_removed_connection(auto_close, es_server, loop): +async def test_mark_dead_removed_connection(auto_close, es_server): es = auto_close(Elasticsearch(hosts=[{'host': es_server['host'], 'port': es_server['port']}, {'host': 'unknown_host', 'port': 9200}], - http_auth=es_server['auth'], - loop=loop)) + http_auth=es_server['auth'])) conn = await es.transport.get_connection() pool = es.transport.connection_pool pool.mark_dead(conn) @@ -22,14 +20,12 @@ async def test_mark_dead_removed_connection(auto_close, es_server, loop): assert conn in pool.dead_count -@pytest.mark.run_loop -async def test_mark_live(auto_close, es_server, loop): +async def test_mark_live(auto_close, es_server): es = auto_close(Elasticsearch(hosts=[{'host': es_server['host'], 'port': es_server['port']}, {'host': 'unknown_host', 'port': 9200}], - http_auth=es_server['auth'], - loop=loop)) + http_auth=es_server['auth'])) conn = await es.transport.get_connection() pool = es.transport.connection_pool pool.mark_dead(conn) @@ -39,106 +35,90 @@ async def test_mark_live(auto_close, es_server, loop): assert conn not in pool.dead_count -@pytest.mark.run_loop -async def test_mark_live_not_dead(auto_close, es_server, loop): +async def test_mark_live_not_dead(auto_close, es_server): es = auto_close(Elasticsearch(hosts=[{'host': es_server['host'], 'port': es_server['port']}, {'host': 'unknown_host', 'port': 9200}], - http_auth=es_server['auth'], - loop=loop)) + http_auth=es_server['auth'])) conn = await es.transport.get_connection() pool = es.transport.connection_pool pool.mark_live(conn) assert conn not in pool.dead_count -@pytest.mark.run_loop -async def test_resurrect_empty(loop): +async def test_resurrect_empty(): conn1 = object() conn2 = object() conns = [(conn1, object()), (conn2, object())] - pool = AIOHttpConnectionPool(connections=conns, - randomize_hosts=False, loop=loop) + pool = AIOHttpConnectionPool(connections=conns, randomize_hosts=False) pool.resurrect() assert pool.connections == [conn1, conn2] -@pytest.mark.run_loop -async def test_resurrect_empty_force(loop): +async def test_resurrect_empty_force(): conn1 = object() conn2 = object() conns = [(conn1, object()), (conn2, object())] - pool = AIOHttpConnectionPool(connections=conns, - randomize_hosts=False, loop=loop) + pool = AIOHttpConnectionPool(connections=conns, randomize_hosts=False) assert pool.resurrect(force=True) in (conn1, conn2) -@pytest.mark.run_loop -async def test_resurrect_from_dead_not_ready_connection(loop): +async def test_resurrect_from_dead_not_ready_connection(): conn1 = object() conn2 = object() conns = [(conn1, object()), (conn2, object())] - pool = AIOHttpConnectionPool(connections=conns, - randomize_hosts=False, loop=loop) + pool = AIOHttpConnectionPool(connections=conns, randomize_hosts=False) pool.mark_dead(conn1) pool.resurrect() assert pool.connections == [conn2] -@pytest.mark.run_loop -async def test_resurrect_from_dead_ready_connection(loop): +async def test_resurrect_from_dead_ready_connection(): conn1 = object() conn2 = object() conns = [(conn1, object()), (conn2, object())] - pool = AIOHttpConnectionPool(connections=conns, - randomize_hosts=False, loop=loop) + pool = AIOHttpConnectionPool(connections=conns, randomize_hosts=False) pool.dead_timeout = lambda t: 0 pool.mark_dead(conn1) pool.resurrect() assert pool.connections == [conn2, conn1] -@pytest.mark.run_loop -async def test_get_connections_only_one_conn(loop): +async def test_get_connections_only_one_conn(): conn1 = object() conn2 = object() conns = [(conn1, object()), (conn2, object())] - pool = AIOHttpConnectionPool(connections=conns, - randomize_hosts=False, loop=loop) + pool = AIOHttpConnectionPool(connections=conns, randomize_hosts=False) pool.mark_dead(conn1) conn = pool.get_connection() assert conn is conn2 -@pytest.mark.run_loop -async def test_get_connections_no_conns(loop): +async def test_get_connections_no_conns(): conn1 = object() conn2 = object() conns = [(conn1, object()), (conn2, object())] - pool = AIOHttpConnectionPool(connections=conns, - randomize_hosts=False, loop=loop) + pool = AIOHttpConnectionPool(connections=conns, randomize_hosts=False) pool.mark_dead(conn1) pool.mark_dead(conn2) conn = pool.get_connection() assert conn in (conn1, conn2) -@pytest.mark.run_loop -async def test_dummy_improperly_configured(loop): +async def test_dummy_improperly_configured(): conn1 = object() conn2 = object() conns = [(conn1, object()), (conn2, object())] with pytest.raises(ImproperlyConfigured): - DummyConnectionPool(connections=conns, loop=loop) + DummyConnectionPool(connections=conns) -@pytest.mark.run_loop -async def test_dummy_mark_dead_and_live(loop): +async def test_dummy_mark_dead_and_live(): conn1 = object() conns = [(conn1, object())] - pool = DummyConnectionPool(connections=conns, loop=loop) + pool = DummyConnectionPool(connections=conns) pool.mark_dead(conn1) assert pool.connections == [conn1] diff --git a/tests/test_scan.py b/tests/test_scan.py index d6099c82..8b105a34 100644 --- a/tests/test_scan.py +++ b/tests/test_scan.py @@ -16,7 +16,6 @@ def test_scan_total_without_context_manager(es): scan.total -@pytest.mark.run_loop async def test_scan_async_for_without_context_manager(es): scan = Scan(es) @@ -32,7 +31,6 @@ def test_scan_scroll_id_without_context_manager(es): scan.scroll_id -@pytest.mark.run_loop async def test_scan_simple(es, populate): index = 'test_aioes' scroll_size = 3 @@ -61,7 +59,6 @@ async def test_scan_simple(es, populate): assert ids == {str(i) for i in range(10)} -@pytest.mark.run_loop async def test_scan_equal_chunks_for_loop(es, es_clean, populate): for n, scroll_size in [ (0, 1), # no results @@ -94,7 +91,6 @@ async def test_scan_equal_chunks_for_loop(es, es_clean, populate): assert len(ids) == n == scan.total['value'] -@pytest.mark.run_loop async def test_scan_no_mask_index(es): index = 'undefined-*' scroll_size = 3 @@ -112,8 +108,7 @@ async def test_scan_no_mask_index(es): assert cnt == 0 -@pytest.mark.run_loop -async def test_scan_no_scroll(es, loop, populate): +async def test_scan_no_scroll(es, populate): index = 'test_aioes' n = 10 scroll_size = 1 @@ -133,7 +128,6 @@ async def test_scan_no_scroll(es, loop, populate): doc -@pytest.mark.run_loop async def test_scan_no_index(es): index = 'undefined' scroll_size = 3 @@ -151,7 +145,6 @@ async def test_scan_no_index(es): assert cnt == 0 -@pytest.mark.run_loop async def test_scan_warning_on_failed_shards(es, populate, mocker): index = 'test_aioes' scroll_size = 3 @@ -180,7 +173,6 @@ async def test_scan_warning_on_failed_shards(es, populate, mocker): 'Scroll request has only succeeded on %d shards out of %d.', 4, 5) -@pytest.mark.run_loop async def test_scan_exception_on_failed_shards(es, populate, mocker): index = 'test_aioes' scroll_size = 3 diff --git a/tests/test_transport.py b/tests/test_transport.py index 38b3a78c..5e7ea235 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -21,68 +21,60 @@ async def perform_request(self, *args, **kwargs): return self.status, self.headers, self.data -@pytest.mark.run_loop -async def test_custom_serializers(auto_close, loop): +async def test_custom_serializers(auto_close): serializer = object() t = auto_close(AIOHttpTransport([{}], - serializers={'test': serializer}, - loop=loop)) + serializers={'test': serializer})) assert 'test' in t.deserializer.serializers assert t.deserializer.serializers['test'] is serializer -@pytest.mark.run_loop -async def test_no_sniff_on_start(auto_close, loop): - t = auto_close(AIOHttpTransport([{}], sniff_on_start=False, loop=loop)) +async def test_no_sniff_on_start(auto_close): + t = auto_close(AIOHttpTransport([{}], sniff_on_start=False)) assert t.initial_sniff_task is None -@pytest.mark.run_loop -async def test_sniff_on_start(auto_close, loop, es_server): +async def test_sniff_on_start(auto_close, es_server): t = auto_close(AIOHttpTransport([{'host': 'unknown_host', 'port': 9200}, {'host': es_server['host'], 'port': es_server['port']}], http_auth=es_server['auth'], - sniff_on_start=True, loop=loop)) + sniff_on_start=True)) assert t.initial_sniff_task is not None await t.initial_sniff_task assert t.initial_sniff_task is None assert len(t.connection_pool.connections) == 1 -@pytest.mark.run_loop -async def test_close_with_sniff_on_start(loop, es_server): +async def test_close_with_sniff_on_start(es_server): t = AIOHttpTransport([{'host': es_server['host'], 'port': es_server['port']}], http_auth=es_server['auth'], - sniff_on_start=True, loop=loop) + sniff_on_start=True) assert t.initial_sniff_task is not None await t.close() assert t.initial_sniff_task is None assert t._closed -@pytest.mark.run_loop -async def test_get_connection_with_sniff_on_start(auto_close, loop, es_server): +async def test_get_connection_with_sniff_on_start(auto_close, es_server): t = auto_close(AIOHttpTransport([{'host': es_server['host'], 'port': es_server['port']}], http_auth=es_server['auth'], - sniff_on_start=True, loop=loop)) + sniff_on_start=True)) conn = await t.get_connection() assert conn is not None assert t.initial_sniff_task is None -@pytest.mark.run_loop -async def test_get_connection_with_sniffer_timeout(auto_close, - loop, es_server): +async def test_get_connection_with_sniffer_timeout(auto_close, es_server): t = auto_close(AIOHttpTransport([{'host': 'unknown_host', 'port': 9200}, {'host': es_server['host'], 'port': es_server['port']}], http_auth=es_server['auth'], - sniffer_timeout=10, loop=loop)) + sniffer_timeout=10)) assert t.initial_sniff_task is None t.last_sniff -= 15 conn = await t.get_connection() @@ -91,50 +83,42 @@ async def test_get_connection_with_sniffer_timeout(auto_close, assert len(t.connection_pool.connections) == 1 -@pytest.mark.run_loop -async def test_get_connection_without_sniffer_timeout(auto_close, - loop, es_server): +async def test_get_connection_without_sniffer_timeout(auto_close, es_server): t = auto_close(AIOHttpTransport([{'host': 'unknown_host', 'port': 9200}, {'host': es_server['host'], 'port': es_server['port']}], http_auth=es_server['auth'], - sniffer_timeout=1e12, loop=loop)) + sniffer_timeout=1e12)) conn = await t.get_connection() assert conn is not None assert t.initial_sniff_task is None assert len(t.connection_pool.connections) == 2 -@pytest.mark.run_loop -async def test_sniff_hosts_error(auto_close, loop, es_server): +async def test_sniff_hosts_error(auto_close, es_server): t = auto_close(AIOHttpTransport([{'host': 'unknown_host', - 'port': 9200}], - loop=loop)) + 'port': 9200}])) with pytest.raises(TransportError): await t.sniff_hosts() -@pytest.mark.run_loop -async def test_sniff_hosts_no_hosts(auto_close, loop, es_server): +async def test_sniff_hosts_no_hosts(auto_close, es_server): t = auto_close(AIOHttpTransport([{'host': es_server['host'], 'port': es_server['port']}], - http_auth=es_server['auth'], - loop=loop)) + http_auth=es_server['auth'])) t.host_info_callback = lambda host_info, host: None with pytest.raises(TransportError): await t.sniff_hosts() -@pytest.mark.run_loop -async def test_mark_dead(auto_close, loop, es_server): +async def test_mark_dead(auto_close, es_server): t = auto_close(AIOHttpTransport([{'host': 'unknown_host', 'port': 9200}, {'host': es_server['host'], 'port': es_server['port']}], http_auth=es_server['auth'], - randomize_hosts=False, - loop=loop)) + randomize_hosts=False)) conn = t.connection_pool.connections[0] assert conn is not None assert conn.host == 'http://unknown_host:9200' @@ -142,8 +126,7 @@ async def test_mark_dead(auto_close, loop, es_server): assert len(t.connection_pool.connections) == 1 -@pytest.mark.run_loop -async def test_mark_dead_with_sniff(auto_close, loop, es_server): +async def test_mark_dead_with_sniff(auto_close, es_server): t = auto_close(AIOHttpTransport([{'host': 'unknown_host', 'port': 9200}, {'host': 'unknown_host2', @@ -152,8 +135,7 @@ async def test_mark_dead_with_sniff(auto_close, loop, es_server): 'port': es_server['port']}], http_auth=es_server['auth'], sniff_on_connection_fail=True, - randomize_hosts=False, - loop=loop)) + randomize_hosts=False)) conn = t.connection_pool.connections[0] assert conn is not None assert conn.host == 'http://unknown_host:9200' @@ -161,13 +143,11 @@ async def test_mark_dead_with_sniff(auto_close, loop, es_server): assert len(t.connection_pool.connections) == 1 -@pytest.mark.run_loop -async def test_send_get_body_as_post(es_server, auto_close, loop): +async def test_send_get_body_as_post(es_server, auto_close): cl = auto_close(Elasticsearch([{'host': es_server['host'], 'port': es_server['port']}], send_get_body_as='POST', - http_auth=es_server['auth'], - loop=loop)) + http_auth=es_server['auth'])) await cl.create('test', '1', {'val': '1'}) await cl.create('test', '2', {'val': '2'}) ret = await cl.mget( @@ -195,13 +175,11 @@ async def test_send_get_body_as_post(es_server, auto_close, loop): 'found': True}]} -@pytest.mark.run_loop -async def test_send_get_body_as_source(es_server, auto_close, loop): +async def test_send_get_body_as_source(es_server, auto_close): cl = auto_close(Elasticsearch([{'host': es_server['host'], 'port': es_server['port']}], send_get_body_as='source', - http_auth=es_server['auth'], - loop=loop)) + http_auth=es_server['auth'])) await cl.create('test', '1', {'val': '1'}) await cl.create('test', '2', {'val': '2'}) ret = await cl.mget( @@ -229,12 +207,10 @@ async def test_send_get_body_as_source(es_server, auto_close, loop): 'found': True}]} -@pytest.mark.run_loop -async def test_send_get_body_as_get(es_server, auto_close, loop): +async def test_send_get_body_as_get(es_server, auto_close): cl = auto_close(Elasticsearch([{'host': es_server['host'], 'port': es_server['port']}], - http_auth=es_server['auth'], - loop=loop)) + http_auth=es_server['auth'])) await cl.create('test', '1', {'val': '1'}) await cl.create('test', '2', {'val': '2'}) ret = await cl.mget( @@ -262,14 +238,11 @@ async def test_send_get_body_as_get(es_server, auto_close, loop): 'found': True}]} -@pytest.mark.run_loop -async def test_send_get_body_as_source_none_params(es_server, - auto_close, loop): +async def test_send_get_body_as_source_none_params(es_server, auto_close): cl = auto_close(Elasticsearch([{'host': es_server['host'], 'port': es_server['port']}], send_get_body_as='source', - http_auth=es_server['auth'], - loop=loop)) + http_auth=es_server['auth'])) await cl.create('test', '1', {'val': '1'}) await cl.create('test', '2', {'val': '2'}) ret = await cl.transport.perform_request( @@ -296,34 +269,29 @@ async def test_send_get_body_as_source_none_params(es_server, 'found': True}]} -@pytest.mark.run_loop async def test_set_connections_closed(es): await es.close() with pytest.raises(RuntimeError): es.transport.set_connections(['host1', 'host2']) -@pytest.mark.run_loop async def test_sniff_hosts_closed(es): await es.close() with pytest.raises(RuntimeError): await es.transport.sniff_hosts() -@pytest.mark.run_loop async def test_close_closed(es): await es.close() await es.close() -@pytest.mark.run_loop async def test_get_connection_closed(es): await es.close() with pytest.raises(RuntimeError): await es.transport.get_connection() -@pytest.mark.run_loop async def test_mark_dead_closed(es): await es.close() conn = object() @@ -331,29 +299,24 @@ async def test_mark_dead_closed(es): await es.transport.mark_dead(conn) -@pytest.mark.run_loop async def test_perform_request_closed(es): await es.close() with pytest.raises(RuntimeError): await es.transport.perform_request('GET', '/') -@pytest.mark.run_loop -async def test_request_error_404_on_head(loop, auto_close): +async def test_request_error_404_on_head(auto_close): exc = TransportError(404) - t = AIOHttpTransport([{}], connection_class=DummyConnection, loop=loop, - exception=exc) + t = AIOHttpTransport([{}], connection_class=DummyConnection, exception=exc) auto_close(t) ret = await t.perform_request('HEAD', '/') assert not ret -@pytest.mark.run_loop -async def test_request_connection_error(loop, auto_close): +async def test_request_connection_error(auto_close): exc = ConnectionError() - t = AIOHttpTransport([{}], connection_class=DummyConnection, loop=loop, - exception=exc) + t = AIOHttpTransport([{}], connection_class=DummyConnection, exception=exc) auto_close(t) with pytest.raises(ConnectionError): @@ -363,11 +326,9 @@ async def test_request_connection_error(loop, auto_close): assert len(conn.calls) == 3 -@pytest.mark.run_loop -async def test_request_connection_timeout(loop, auto_close): +async def test_request_connection_timeout(auto_close): exc = ConnectionTimeout() - t = AIOHttpTransport([{}], connection_class=DummyConnection, loop=loop, - exception=exc) + t = AIOHttpTransport([{}], connection_class=DummyConnection, exception=exc) auto_close(t) with pytest.raises(ConnectionTimeout): @@ -377,11 +338,10 @@ async def test_request_connection_timeout(loop, auto_close): assert len(conn.calls) == 1 -@pytest.mark.run_loop -async def test_request_connection_timeout_with_retry(loop, auto_close): +async def test_request_connection_timeout_with_retry(auto_close): exc = ConnectionTimeout() - t = AIOHttpTransport([{}], connection_class=DummyConnection, loop=loop, - exception=exc, retry_on_timeout=True) + t = AIOHttpTransport([{}], connection_class=DummyConnection, exception=exc, + retry_on_timeout=True) auto_close(t) with pytest.raises(ConnectionTimeout): @@ -391,11 +351,10 @@ async def test_request_connection_timeout_with_retry(loop, auto_close): assert len(conn.calls) == 3 -@pytest.mark.run_loop -async def test_request_retry_on_status(loop, auto_close): +async def test_request_retry_on_status(auto_close): exc = TransportError(500) - t = AIOHttpTransport([{}], connection_class=DummyConnection, loop=loop, - exception=exc, retry_on_status=(500,)) + t = AIOHttpTransport([{}], connection_class=DummyConnection, exception=exc, + retry_on_status=(500,)) auto_close(t) with pytest.raises(TransportError): @@ -405,23 +364,19 @@ async def test_request_retry_on_status(loop, auto_close): assert len(conn.calls) == 3 -@pytest.mark.run_loop -async def test_request_without_data(loop, auto_close): - t = AIOHttpTransport([{}], connection_class=DummyConnection, loop=loop, - data='') +async def test_request_without_data(auto_close): + t = AIOHttpTransport([{}], connection_class=DummyConnection, data='') auto_close(t) ret = await t.perform_request('GET', '/') assert ret == '' -@pytest.mark.run_loop -async def test_request_headers(loop, auto_close, es_server, mocker): +async def test_request_headers(auto_close, es_server, mocker): t = auto_close(AIOHttpTransport( [{'host': es_server['host'], 'port': es_server['port']}], http_auth=es_server['auth'], - loop=loop, headers={'H1': 'V1', 'H2': 'V2'}, ))