Skip to content

Commit

Permalink
Move import
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Sep 8, 2023
1 parent fc3a456 commit f94b093
Showing 1 changed file with 3 additions and 34 deletions.
37 changes: 3 additions & 34 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@
import dask
from dask.base import collections_to_dsk, normalize_token, tokenize
from dask.core import flatten, validate_key
from dask.delayed import Delayed
from dask.highlevelgraph import HighLevelGraph
from dask.optimization import SubgraphCallable
from dask.typing import DaskGraph, Key, NewDaskCollection
from dask.typing import DaskGraph, Key, all_collections_newstyle

Check warning on line 38 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L38

Added line #L38 was not covered by tests
from dask.utils import (
apply,
ensure_dict,
Expand Down Expand Up @@ -3437,23 +3436,7 @@ def compute(
else a
for a in collections
)

new_style = False
filtered_collections = [
a
for a in collections
if dask.is_dask_collection(a)
# Delayed always returns True when checked if instance of a Protocol
and not isinstance(a, Delayed)
]
if filtered_collections and all(
isinstance(a, NewDaskCollection) for a in filtered_collections
):
new_style = True
elif any(isinstance(a, NewDaskCollection) for a in filtered_collections):
raise RuntimeError(
"Provided multiple collections to dask.compute but mixed old and new style collections."
)
new_style = all_collections_newstyle(collections)
if new_style:
futures = []
for collection in collections:
Expand Down Expand Up @@ -3606,21 +3589,7 @@ def persist(

assert all(map(dask.is_dask_collection, collections))

new_style = False
if all(
isinstance(a, NewDaskCollection)
for a in collections
if dask.is_dask_collection(a) and not isinstance(a, Delayed)
):
new_style = True
elif any(
isinstance(a, NewDaskCollection)
for a in collections
if dask.is_dask_collection(a)
):
raise RuntimeError(
"Provided multiple collections to dask.compute but mixed old and new style collections."
)
new_style = all_collections_newstyle(collections)

if new_style:
result = []
Expand Down

0 comments on commit f94b093

Please sign in to comment.