Skip to content

Commit

Permalink
Don't materialize input iterables to map_unordered (#242)
Browse files Browse the repository at this point in the history
(although Lithops will still materialize internally, see #239)
  • Loading branch information
tomwhite authored Jul 1, 2023
1 parent a058f2e commit 738c6ee
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 5 deletions.
4 changes: 2 additions & 2 deletions cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def execute_dag(dag, callbacks=None, array_names=None, resume=None, **kwargs):
for _, stats in map_unordered(
executor,
run_func,
list(stage.mappable),
stage.mappable,
func=stage.function,
config=pipeline.config,
name=name,
Expand Down Expand Up @@ -229,7 +229,7 @@ def stage_func(lithops_function_executor):
for _, stats in map_unordered(
lithops_function_executor,
sf,
list(stage.mappable),
stage.mappable,
use_backups=use_backups,
return_stats=True,
):
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def execute_dag(
if stage.mappable is not None:
task_create_tstamp = time.time()
for _, stats in app_function.map(
list(stage.mappable),
stage.mappable,
order_outputs=False,
kwargs=dict(func=stage.function, config=pipeline.config),
):
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/modal_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async def async_execute_dag(
if stage.mappable is not None:
async for _, stats in map_unordered(
app_function,
list(stage.mappable),
stage.mappable,
return_stats=True,
name=name,
func=stage.function,
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/python_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def pipeline_to_stream(concurrent_executor, name, pipeline, **kwargs):
map_unordered,
concurrent_executor,
run_func,
list(stage.mappable),
stage.mappable,
return_stats=True,
name=name,
func=stage.function,
Expand Down

0 comments on commit 738c6ee

Please sign in to comment.