Skip to content

Commit a8ae24f

Browse files
committed
Self-review
1 parent 3644a10 commit a8ae24f

17 files changed

+314
-293
lines changed

distributed/cli/tests/test_dask_scheduler.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,6 @@ async def f():
3838
response = requests.get("http://127.0.0.1:8787/status/")
3939
response.raise_for_status()
4040

41-
with pytest.raises(Exception):
42-
response = requests.get("http://127.0.0.1:9786/info.json")
43-
4441

4542
def test_hostport(loop):
4643
with popen(["dask-scheduler", "--no-dashboard", "--host", "127.0.0.1:8978"]):
@@ -55,9 +52,8 @@ async def f():
5552

5653

5754
def test_no_dashboard(loop):
58-
pytest.importorskip("bokeh")
59-
with popen(["dask-scheduler", "--no-dashboard"]) as proc:
60-
with Client("127.0.0.1:%d" % Scheduler.default_port, loop=loop) as c:
55+
with popen(["dask-scheduler", "--no-dashboard"]):
56+
with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop):
6157
response = requests.get("http://127.0.0.1:8787/status/")
6258
assert response.status_code == 404
6359

distributed/cli/tests/test_dask_worker.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -256,15 +256,13 @@ def test_nprocs_auto(loop):
256256

257257
def test_nprocs_expands_name(loop):
258258
with popen(["dask-scheduler", "--no-dashboard"]):
259-
with popen(
260-
["dask-worker", "127.0.0.1:8786", "--nprocs", "2", "--name", "0"]
261-
) as worker:
259+
with popen(["dask-worker", "127.0.0.1:8786", "--nprocs", "2", "--name", "0"]):
262260
with popen(["dask-worker", "127.0.0.1:8786", "--nprocs", "2"]):
263261
with Client("tcp://127.0.0.1:8786", loop=loop) as c:
264262
start = time()
265263
while len(c.scheduler_info()["workers"]) < 4:
266264
sleep(0.2)
267-
assert time() < start + 10
265+
assert time() < start + 30
268266

269267
info = c.scheduler_info()
270268
names = [d["name"] for d in info["workers"].values()]

distributed/client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1467,10 +1467,10 @@ def close(self, timeout=no_default):
14671467
pc.stop()
14681468

14691469
if self.asynchronous:
1470-
future = self._close()
1470+
coro = self._close()
14711471
if timeout:
1472-
future = asyncio.wait_for(future, timeout)
1473-
return future
1472+
coro = asyncio.wait_for(coro, timeout)
1473+
return coro
14741474

14751475
if self._start_arg is None:
14761476
with suppress(AttributeError):

distributed/comm/tests/test_ws.py

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -95,33 +95,26 @@ async def test_expect_scheduler_ssl_when_sharing_server():
9595
pass
9696

9797

98-
@gen_cluster(nthreads=[], scheduler_kwargs={"protocol": "ws://"})
99-
async def test_roundtrip(s, a, b):
100-
async with Worker(s.address) as w:
101-
async with Client(s.address, asynchronous=True) as c:
102-
assert c.scheduler.address.startswith("ws://")
103-
assert w.address.startswith("ws://")
104-
future = c.submit(inc, 1)
105-
result = await future
106-
assert result == 2
107-
108-
109-
@gen_cluster(nthreads=[], scheduler_kwargs={"protocol": "ws://"})
110-
async def test_collections(s):
98+
@gen_cluster(client=True, scheduler_kwargs={"protocol": "ws://"})
99+
async def test_roundtrip(c, s, a, b):
100+
assert a.address.startswith("ws://")
101+
assert b.address.startswith("ws://")
102+
assert c.scheduler.address.startswith("ws://")
103+
assert await c.submit(inc, 1) == 2
104+
105+
106+
@gen_cluster(client=True, scheduler_kwargs={"protocol": "ws://"})
107+
async def test_collections(c, s, a, b):
111108
da = pytest.importorskip("dask.array")
112-
async with Worker(s.address), Worker(s.address):
113-
async with Client(s.address, asynchronous=True):
114-
x = da.random.random((1000, 1000), chunks=(100, 100))
115-
x = x + x.T
116-
await x.persist()
109+
x = da.random.random((1000, 1000), chunks=(100, 100))
110+
x = x + x.T
111+
await x.persist()
117112

118113

119-
@gen_cluster(nthreads=[], scheduler_kwargs={"protocol": "ws://"})
120-
async def test_large_transfer(s):
114+
@gen_cluster(client=True, scheduler_kwargs={"protocol": "ws://"})
115+
async def test_large_transfer(c, s, a, b):
121116
np = pytest.importorskip("numpy")
122-
async with Worker(s.address, protocol="ws://"):
123-
async with Client(s.address, asynchronous=True) as c:
124-
await c.scatter(np.random.random(1_000_000))
117+
await c.scatter(np.random.random(1_000_000))
125118

126119

127120
@pytest.mark.asyncio
@@ -181,7 +174,9 @@ async def test_connection_made_with_extra_conn_args(cleanup, protocol, security)
181174
@gen_test()
182175
async def test_quiet_close():
183176
with warnings.catch_warnings(record=True) as record:
184-
async with Client(protocol="ws", processes=False, asynchronous=True) as c:
177+
async with Client(
178+
protocol="ws", processes=False, asynchronous=True, dashboard_address=":0"
179+
):
185180
pass
186181

187182
# For some reason unrelated @coroutine warnings are showing up

distributed/deploy/tests/test_adaptive.py

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,15 @@
77

88
import dask
99

10-
from distributed import Adaptive, Client, LocalCluster, SpecCluster, Worker, wait
10+
from distributed import (
11+
Adaptive,
12+
Client,
13+
LocalCluster,
14+
Scheduler,
15+
SpecCluster,
16+
Worker,
17+
wait,
18+
)
1119
from distributed.compatibility import WINDOWS
1220
from distributed.metrics import time
1321
from distributed.utils_test import async_wait_for, clean, gen_test, slowinc
@@ -18,7 +26,7 @@ def test_adaptive_local_cluster(loop):
1826
n_workers=0,
1927
scheduler_port=0,
2028
silence_logs=False,
21-
dashboard_address=None,
29+
dashboard_address=":0",
2230
loop=loop,
2331
) as cluster:
2432
alc = cluster.adapt(interval="100 ms")
@@ -48,7 +56,7 @@ async def test_adaptive_local_cluster_multi_workers():
4856
scheduler_port=0,
4957
silence_logs=False,
5058
processes=False,
51-
dashboard_address=None,
59+
dashboard_address=":0",
5260
asynchronous=True,
5361
) as cluster:
5462

@@ -76,8 +84,8 @@ async def test_adaptive_local_cluster_multi_workers():
7684

7785

7886
@pytest.mark.xfail(reason="changed API")
79-
@pytest.mark.asyncio
80-
async def test_adaptive_scale_down_override(cleanup):
87+
@gen_test()
88+
async def test_adaptive_scale_down_override():
8189
class TestAdaptive(Adaptive):
8290
def __init__(self, *args, **kwargs):
8391
self.min_size = kwargs.pop("min_size", 0)
@@ -95,7 +103,9 @@ class TestCluster(LocalCluster):
95103
def scale_up(self, n, **kwargs):
96104
assert False
97105

98-
async with TestCluster(n_workers=10, processes=False, asynchronous=True) as cluster:
106+
async with TestCluster(
107+
n_workers=10, processes=False, asynchronous=True, dashboard_address=":0"
108+
) as cluster:
99109
ta = cluster.adapt(
100110
min_size=2, interval=0.1, scale_factor=2, Adaptive=TestAdaptive
101111
)
@@ -113,7 +123,7 @@ async def test_min_max():
113123
scheduler_port=0,
114124
silence_logs=False,
115125
processes=False,
116-
dashboard_address=None,
126+
dashboard_address=":0",
117127
asynchronous=True,
118128
threads_per_worker=1,
119129
)
@@ -169,7 +179,7 @@ async def test_avoid_churn(cleanup):
169179
processes=False,
170180
scheduler_port=0,
171181
silence_logs=False,
172-
dashboard_address=None,
182+
dashboard_address=":0",
173183
) as cluster:
174184
async with Client(cluster, asynchronous=True) as client:
175185
adapt = cluster.adapt(interval="20 ms", wait_count=5)
@@ -194,7 +204,7 @@ async def test_adapt_quickly():
194204
processes=False,
195205
scheduler_port=0,
196206
silence_logs=False,
197-
dashboard_address=None,
207+
dashboard_address=":0",
198208
)
199209
client = await Client(cluster, asynchronous=True)
200210
adapt = cluster.adapt(interval="20 ms", wait_count=5, maximum=10)
@@ -247,7 +257,7 @@ async def test_adapt_down():
247257
processes=False,
248258
scheduler_port=0,
249259
silence_logs=False,
250-
dashboard_address=None,
260+
dashboard_address=":0",
251261
) as cluster:
252262
async with Client(cluster, asynchronous=True) as client:
253263
cluster.adapt(interval="20ms", maximum=5)
@@ -274,7 +284,7 @@ async def test_no_more_workers_than_tasks():
274284
scheduler_port=0,
275285
silence_logs=False,
276286
processes=False,
277-
dashboard_address=None,
287+
dashboard_address=":0",
278288
asynchronous=True,
279289
) as cluster:
280290
adapt = cluster.adapt(minimum=0, maximum=4, interval="10 ms")
@@ -287,7 +297,7 @@ def test_basic_no_loop(loop):
287297
with clean(threads=False):
288298
try:
289299
with LocalCluster(
290-
0, scheduler_port=0, silence_logs=False, dashboard_address=None
300+
0, scheduler_port=0, silence_logs=False, dashboard_address=":0"
291301
) as cluster:
292302
with Client(cluster) as client:
293303
cluster.adapt()
@@ -311,7 +321,7 @@ async def test_target_duration():
311321
processes=False,
312322
scheduler_port=0,
313323
silence_logs=False,
314-
dashboard_address=None,
324+
dashboard_address=":0",
315325
) as cluster:
316326
adapt = cluster.adapt(interval="20ms", minimum=2, target_duration="5s")
317327
async with Client(cluster, asynchronous=True) as client:
@@ -327,6 +337,7 @@ async def test_target_duration():
327337
async def test_worker_keys(cleanup):
328338
"""Ensure that redefining adapt with a lower maximum removes workers"""
329339
async with SpecCluster(
340+
scheduler={"cls": Scheduler, "options": {"port": 0, "dashboard_address": ":0"}},
330341
workers={
331342
"a-1": {"cls": Worker},
332343
"a-2": {"cls": Worker},
@@ -360,7 +371,7 @@ async def test_adapt_cores_memory(cleanup):
360371
scheduler_port=0,
361372
silence_logs=False,
362373
processes=False,
363-
dashboard_address=None,
374+
dashboard_address=":0",
364375
asynchronous=True,
365376
) as cluster:
366377
adapt = cluster.adapt(minimum_cores=3, maximum_cores=9)
@@ -401,7 +412,7 @@ async def test_update_adaptive(cleanup):
401412
scheduler_port=0,
402413
silence_logs=False,
403414
processes=False,
404-
dashboard_address=None,
415+
dashboard_address=":0",
405416
asynchronous=True,
406417
) as cluster:
407418
first = cluster.adapt(maxmimum=1)
@@ -415,7 +426,11 @@ async def test_update_adaptive(cleanup):
415426
async def test_adaptive_no_memory_limit(cleanup):
416427
"""Make sure that adapt() does not keep creating workers when no memory limit is set."""
417428
async with LocalCluster(
418-
n_workers=0, threads_per_worker=1, memory_limit=0, asynchronous=True
429+
n_workers=0,
430+
threads_per_worker=1,
431+
memory_limit=0,
432+
asynchronous=True,
433+
dashboard_address=":0",
419434
) as cluster:
420435
cluster.adapt(minimum=1, maximum=10, interval="1 ms")
421436
async with Client(cluster, asynchronous=True) as client:
@@ -447,7 +462,9 @@ async def _():
447462

448463
return self.sync(_)
449464

450-
async with RequiresAwaitCluster(n_workers=0, asynchronous=True) as cluster:
465+
async with RequiresAwaitCluster(
466+
n_workers=0, asynchronous=True, dashboard_address=":0"
467+
) as cluster:
451468
async with Client(cluster, asynchronous=True) as client:
452469
futures = client.map(slowinc, range(5), delay=0.05)
453470
assert len(cluster.workers) == 0
@@ -465,7 +482,9 @@ async def test_adaptive_stopped():
465482
We should ensure that the adapt PC is actually stopped once the cluster
466483
stops.
467484
"""
468-
async with LocalCluster(n_workers=0, asynchronous=True) as cluster:
485+
async with LocalCluster(
486+
n_workers=0, asynchronous=True, dashboard_address=":0"
487+
) as cluster:
469488
instance = cluster.adapt(interval="10ms")
470489
assert instance.periodic_callback is not None
471490

0 commit comments

Comments
 (0)