From 3b457c1ba7a47e32c4f05f1dbd9f6b42c6d44fa1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 14 Jul 2023 16:10:21 +0000 Subject: [PATCH] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- cubed/runtime/executors/modal_async.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/cubed/runtime/executors/modal_async.py b/cubed/runtime/executors/modal_async.py index af995d55..c327b47f 100644 --- a/cubed/runtime/executors/modal_async.py +++ b/cubed/runtime/executors/modal_async.py @@ -2,8 +2,8 @@ import copy import time from asyncio.exceptions import TimeoutError -from typing import Any, AsyncIterator, Dict, Iterable, Optional, Sequence from functools import partial +from typing import Any, AsyncIterator, Dict, Iterable, Optional, Sequence from aiostream import stream from modal.exception import ConnectionError @@ -166,9 +166,7 @@ async def async_execute_dag( if not compute_arrays_in_parallel: # run one pipeline at a time for name, node in visit_nodes(dag, resume=resume): - st = pipeline_to_stream( - app_function, name, node["pipeline"], **kwargs - ) + st = pipeline_to_stream(app_function, name, node["pipeline"], **kwargs) async with st.stream() as streamer: async for _, stats in streamer: handle_callbacks(callbacks, stats) @@ -176,9 +174,7 @@ async def async_execute_dag( for gen in visit_node_generations(dag, resume=resume): # run pipelines in the same topological generation in parallel by merging their streams streams = [ - pipeline_to_stream( - app_function, name, node["pipeline"], **kwargs - ) + pipeline_to_stream(app_function, name, node["pipeline"], **kwargs) for name, node in gen ] merged_stream = stream.merge(*streams)