Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Jul 14, 2023
1 parent 44153af commit 3b457c1
Showing 1 changed file with 3 additions and 7 deletions.
10 changes: 3 additions & 7 deletions cubed/runtime/executors/modal_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import copy
import time
from asyncio.exceptions import TimeoutError
from typing import Any, AsyncIterator, Dict, Iterable, Optional, Sequence
from functools import partial
from typing import Any, AsyncIterator, Dict, Iterable, Optional, Sequence

from aiostream import stream
from modal.exception import ConnectionError
Expand Down Expand Up @@ -166,19 +166,15 @@ async def async_execute_dag(
if not compute_arrays_in_parallel:
# run one pipeline at a time
for name, node in visit_nodes(dag, resume=resume):
st = pipeline_to_stream(
app_function, name, node["pipeline"], **kwargs
)
st = pipeline_to_stream(app_function, name, node["pipeline"], **kwargs)
async with st.stream() as streamer:
async for _, stats in streamer:
handle_callbacks(callbacks, stats)
else:
for gen in visit_node_generations(dag, resume=resume):
# run pipelines in the same topological generation in parallel by merging their streams
streams = [
pipeline_to_stream(
app_function, name, node["pipeline"], **kwargs
)
pipeline_to_stream(app_function, name, node["pipeline"], **kwargs)
for name, node in gen
]
merged_stream = stream.merge(*streams)
Expand Down

0 comments on commit 3b457c1

Please sign in to comment.