From b25cf437078fc53adaec81e141c1a5627a6d2a5d Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 4 Sep 2024 09:56:22 -0500 Subject: [PATCH 1/2] Use new tokenize module --- distributed/client.py | 3 ++- distributed/diagnostics/progress.py | 2 +- distributed/protocol/serialize.py | 4 ++-- distributed/scheduler.py | 2 +- distributed/shuffle/_merge.py | 3 ++- distributed/shuffle/_rechunk.py | 2 +- distributed/shuffle/_shuffle.py | 2 +- distributed/tests/test_client.py | 4 ++-- 8 files changed, 12 insertions(+), 10 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 5c3268f3cf8..b038c0e46fb 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -44,11 +44,12 @@ from tlz import first, groupby, merge, partition_all, valmap import dask -from dask.base import collections_to_dsk, tokenize +from dask.base import collections_to_dsk from dask.core import flatten, validate_key from dask.highlevelgraph import HighLevelGraph from dask.layers import Layer from dask.optimization import SubgraphCallable +from dask.tokenize import tokenize from dask.typing import Key, NoDefault, no_default from dask.utils import ( apply, diff --git a/distributed/diagnostics/progress.py b/distributed/diagnostics/progress.py index 3fd7bbbd07f..1712cc5df32 100644 --- a/distributed/diagnostics/progress.py +++ b/distributed/diagnostics/progress.py @@ -9,7 +9,7 @@ from tlz import groupby, valmap -from dask.base import tokenize +from dask.tokenize import tokenize from dask.utils import key_split from distributed.diagnostics.plugin import SchedulerPlugin diff --git a/distributed/protocol/serialize.py b/distributed/protocol/serialize.py index a2e593e06af..4ad5c3f98a2 100644 --- a/distributed/protocol/serialize.py +++ b/distributed/protocol/serialize.py @@ -14,8 +14,8 @@ import msgpack import dask -from dask.base import normalize_token from dask.sizeof import sizeof +from dask.tokenize import normalize_token from dask.utils import typename from distributed.metrics import context_meter @@ -776,7 +776,7 @@ def register_serialization_lazy(toplevel, func): @partial(normalize_token.register, Serialized) def normalize_Serialized(o): - return [o.header] + o.frames # for dask.base.tokenize + return [o.header] + o.frames # for dask.tokenize.tokenize # Teach serialize how to handle bytes diff --git a/distributed/scheduler.py b/distributed/scheduler.py index fa965c69103..647f808ed80 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -54,8 +54,8 @@ import dask import dask.utils -from dask.base import TokenizationError, normalize_token, tokenize from dask.core import get_deps, iskey, validate_key +from dask.tokenize import TokenizationError, normalize_token, tokenize from dask.typing import Key, no_default from dask.utils import ( _deprecated, diff --git a/distributed/shuffle/_merge.py b/distributed/shuffle/_merge.py index eb248b4e229..0fb403a3f82 100644 --- a/distributed/shuffle/_merge.py +++ b/distributed/shuffle/_merge.py @@ -5,9 +5,10 @@ from typing import TYPE_CHECKING, Any import dask -from dask.base import is_dask_collection, tokenize +from dask.base import is_dask_collection from dask.highlevelgraph import HighLevelGraph from dask.layers import Layer +from dask.tokenize import tokenize from distributed.shuffle._arrow import check_minimal_arrow_version from distributed.shuffle._core import ShuffleId, barrier_key, get_worker_plugin diff --git a/distributed/shuffle/_rechunk.py b/distributed/shuffle/_rechunk.py index 20b37150852..15e3ea1d78a 100644 --- a/distributed/shuffle/_rechunk.py +++ b/distributed/shuffle/_rechunk.py @@ -121,9 +121,9 @@ import dask import dask.config -from dask.base import tokenize from dask.highlevelgraph import HighLevelGraph from dask.layers import Layer +from dask.tokenize import tokenize from dask.typing import Key from dask.utils import parse_bytes diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py index 4a21909fa05..154f25c4bab 100644 --- a/distributed/shuffle/_shuffle.py +++ b/distributed/shuffle/_shuffle.py @@ -21,9 +21,9 @@ from tornado.ioloop import IOLoop import dask -from dask.base import tokenize from dask.highlevelgraph import HighLevelGraph from dask.layers import Layer +from dask.tokenize import tokenize from dask.typing import Key from distributed.core import PooledRPCCall diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index ab676b51a7a..0016294e708 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -43,6 +43,7 @@ import dask.bag as db from dask import delayed from dask.optimization import SubgraphCallable +from dask.tokenize import tokenize from dask.utils import get_default_shuffle_method, parse_timedelta, tmpfile from distributed import ( @@ -73,7 +74,6 @@ futures_of, get_task_metadata, temp_default_client, - tokenize, wait, ) from distributed.cluster_dump import load_cluster_dump @@ -1127,7 +1127,7 @@ async def test_scatter_non_list(c, s, a, b): @gen_cluster(client=True) async def test_scatter_tokenize_local(c, s, a, b): - from dask.base import normalize_token + from dask.tokenize import normalize_token class MyObj: pass From bef93f11637871f49b91c13bcfa71e4b7c79291c Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 4 Sep 2024 10:10:50 -0500 Subject: [PATCH 2/2] [skip-caching]