Skip to content

Commit

Permalink
Remove PipelineExecutor classes (#253)
Browse files Browse the repository at this point in the history
PipelineExecutor is from Rechunker, but it's not used in practice
since Cubed has its own DagExecutor, which is more general (it has
callbacks, for example).
  • Loading branch information
tomwhite authored Jul 10, 2023
1 parent 4c5b29c commit 2eed1bb
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 214 deletions.
46 changes: 12 additions & 34 deletions cubed/core/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from cubed.runtime.pipeline import already_computed
from cubed.storage.zarr import LazyZarrArray
from cubed.utils import chunk_memory, extract_stack_summaries, join_path, memory_repr
from cubed.vendor.rechunker.types import PipelineExecutor, Stage
from cubed.vendor.rechunker.types import Stage

# A unique ID with sensible ordering, used for making directory names
CONTEXT_ID = f"cubed-{datetime.now().strftime('%Y%m%dT%H%M%S')}-{uuid.uuid4()}"
Expand Down Expand Up @@ -180,39 +180,17 @@ def execute(
dag = self.optimize().dag if optimize_graph else self.dag.copy()
dag = self.create_lazy_zarr_arrays(dag)

if isinstance(executor, PipelineExecutor):
while len(dag) > 0:
# Find nodes (and their pipelines) that have no dependencies
no_dep_nodes = [x for x in dag.nodes() if dag.in_degree(x) == 0]
pipelines = [
p
for (n, p) in nx.get_node_attributes(dag, "pipeline").items()
if n in no_dep_nodes
]

# and execute them in parallel
if len(pipelines) > 0:
plan = executor.pipelines_to_plan(pipelines)
executor.execute_plan(plan, **kwargs)

# Remove them from the DAG, and repeat
dag.remove_nodes_from(no_dep_nodes)

else:
if callbacks is not None:
[
callback.on_compute_start(dag, resume=resume)
for callback in callbacks
]
executor.execute_dag(
dag,
callbacks=callbacks,
array_names=array_names,
resume=resume,
**kwargs,
)
if callbacks is not None:
[callback.on_compute_end(dag) for callback in callbacks]
if callbacks is not None:
[callback.on_compute_start(dag, resume=resume) for callback in callbacks]
executor.execute_dag(
dag,
callbacks=callbacks,
array_names=array_names,
resume=resume,
**kwargs,
)
if callbacks is not None:
[callback.on_compute_end(dag) for callback in callbacks]

def num_tasks(self, optimize_graph=True, resume=None):
"""Return the number of tasks needed to execute this plan."""
Expand Down
53 changes: 1 addition & 52 deletions cubed/runtime/executors/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,7 @@
from cubed.core.array import TaskEndEvent
from cubed.core.plan import visit_nodes
from cubed.runtime.types import DagExecutor
from cubed.vendor.rechunker.types import (
Config,
NoArgumentStageFunction,
ParallelPipelines,
PipelineExecutor,
Stage,
)
from cubed.vendor.rechunker.types import Config, NoArgumentStageFunction, Stage

from ..utils import gensym

Expand Down Expand Up @@ -86,51 +80,6 @@ def expand(self, pcoll):
)


class BeamPipelineExecutor(PipelineExecutor[List[beam.PTransform]]):
def pipelines_to_plan(self, pipelines: ParallelPipelines) -> List[beam.PTransform]:
start = "Start" >> beam.Create([-1])

pcolls = []

for pipeline in pipelines:
pcoll = start
for step, stage in enumerate(pipeline.stages):
if stage.mappable is not None:
pcoll |= stage.name >> _SingleArgumentStage(
step, stage, pipeline.config
)
else:
pcoll |= stage.name >> beam.Map(
_no_arg_stage,
current=step,
fun=stage.function,
config=pipeline.config,
)

# This prevents fusion:
# https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#preventing-fusion
# Avoiding fusion on Dataflow is necessary to ensure that stages execute serially.
pcoll |= f"Reshuffle_{step:03d}" >> beam.Reshuffle()

pcolls.append(pcoll)

return pcolls

def execute_plan(self, plan: List[beam.PTransform], **kwargs):
with beam.Pipeline(**kwargs) as pipeline:
pcolls = []
for ptran in plan:
pcoll = pipeline | ptran
pcolls.append(pcoll)
pcolls | beam.Flatten()

# Print metrics at end
# result = pipeline.run()
# counters = result.metrics().query(beam.metrics.MetricsFilter())['counters']
# for metric in counters:
# print(metric)


class BeamDagExecutor(DagExecutor):
"""An execution engine that uses Apache Beam."""

Expand Down
74 changes: 0 additions & 74 deletions cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import copy
import logging
import time
from functools import partial
from typing import Callable, Iterable

from lithops.executors import FunctionExecutor
from lithops.wait import ALWAYS, ANY_COMPLETED
Expand All @@ -12,43 +10,9 @@
from cubed.runtime.executors.lithops_retries import map_with_retries, wait_with_retries
from cubed.runtime.types import DagExecutor
from cubed.runtime.utils import handle_callbacks
from cubed.vendor.rechunker.types import ParallelPipelines, PipelineExecutor

logger = logging.getLogger(__name__)

# Lithops represents delayed execution tasks as functions that require
# a FunctionExecutor.
Task = Callable[[FunctionExecutor], None]


class LithopsPipelineExecutor(PipelineExecutor[Task]):
"""An execution engine based on Lithops."""

def __init__(self, **kwargs):
self.kwargs = kwargs

def pipelines_to_plan(self, pipelines: ParallelPipelines) -> Task:
tasks = []
for pipeline in pipelines:
stage_tasks = []
for stage in pipeline.stages:
if stage.mappable is not None:
stage_func = build_stage_mappable_func(stage, pipeline.config)
stage_tasks.append(stage_func)
else:
stage_func = build_stage_func(stage, pipeline.config)
stage_tasks.append(stage_func)

# Stages for a single pipeline must be executed in series
tasks.append(partial(_execute_in_series, stage_tasks))

return partial(_execute_in_series, tasks)

def execute_plan(self, plan: Task, **kwargs):
merged_kwargs = {**self.kwargs, **kwargs}
with FunctionExecutor(**merged_kwargs) as executor:
plan(executor)


def run_func(input, func=None, config=None, name=None):
result = func(input, config=config)
Expand Down Expand Up @@ -198,44 +162,6 @@ def standardise_lithops_stats(stats):
)


def build_stage_mappable_func(
stage, config, name=None, callbacks=None, use_backups=False
):
def sf(mappable):
return stage.function(mappable, config=config)

def stage_func(lithops_function_executor):
for _, stats in map_unordered(
lithops_function_executor,
sf,
stage.mappable,
use_backups=use_backups,
return_stats=True,
):
stats["array_name"] = name
handle_callbacks(callbacks, stats)

return stage_func


def build_stage_func(stage, config):
def sf():
return stage.function(config=config)

def stage_func(lithops_function_executor):
futures = lithops_function_executor.call_async(sf, ())
lithops_function_executor.get_result(futures)

return stage_func


def _execute_in_series(
tasks: Iterable[Task], lithops_function_executor: FunctionExecutor
) -> None:
for task in tasks:
task(lithops_function_executor)


class LithopsDagExecutor(DagExecutor):
"""An execution engine that uses Lithops."""

Expand Down
7 changes: 1 addition & 6 deletions cubed/runtime/types.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
from typing import Union

from cubed.vendor.rechunker.types import PipelineExecutor


class DagExecutor:
def execute_dag(self, dag, **kwargs):
raise NotImplementedError # pragma: no cover


Executor = Union[PipelineExecutor, DagExecutor]
Executor = DagExecutor
4 changes: 2 additions & 2 deletions cubed/tests/primitive/test_blockwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from numpy.testing import assert_array_equal

from cubed.primitive.blockwise import blockwise, make_blockwise_function
from cubed.runtime.executors.python import PythonDagExecutor
from cubed.tests.utils import create_zarr, execute_pipeline
from cubed.vendor.dask.blockwise import make_blockwise_graph
from cubed.vendor.rechunker.executors.python import PythonPipelineExecutor


@pytest.fixture(scope="module", params=[PythonPipelineExecutor()])
@pytest.fixture(scope="module", params=[PythonDagExecutor()])
def executor(request):
return request.param

Expand Down
4 changes: 2 additions & 2 deletions cubed/tests/primitive/test_rechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from numpy.testing import assert_array_equal

from cubed.primitive.rechunk import rechunk
from cubed.runtime.executors.python import PythonDagExecutor
from cubed.tests.utils import execute_pipeline
from cubed.vendor.rechunker.executors.python import PythonPipelineExecutor


@pytest.fixture(scope="module", params=[PythonPipelineExecutor()])
@pytest.fixture(scope="module", params=[PythonDagExecutor()])
def executor(request):
return request.param

Expand Down
23 changes: 8 additions & 15 deletions cubed/tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
import platform
from typing import Iterable

import networkx as nx
import numpy as np
import zarr

from cubed.core.array import Callback
from cubed.runtime.executors.python import PythonDagExecutor
from cubed.runtime.executors.python_async import AsyncPythonDagExecutor
from cubed.vendor.rechunker.executors.python import PythonPipelineExecutor

LITHOPS_LOCAL_CONFIG = {"lithops": {"backend": "localhost", "storage": "localhost"}}

ALL_EXECUTORS = [
PythonPipelineExecutor(),
PythonDagExecutor(),
]
ALL_EXECUTORS = [PythonDagExecutor()]

# don't run all tests on every executor as it's too slow, so just have a subset
MAIN_EXECUTORS = [PythonPipelineExecutor(), PythonDagExecutor()]
MAIN_EXECUTORS = [PythonDagExecutor()]


if platform.system() != "Windows":
Expand All @@ -26,23 +23,18 @@


try:
from cubed.runtime.executors.beam import BeamDagExecutor, BeamPipelineExecutor
from cubed.runtime.executors.beam import BeamDagExecutor

ALL_EXECUTORS.append(BeamDagExecutor())
ALL_EXECUTORS.append(BeamPipelineExecutor())

MAIN_EXECUTORS.append(BeamDagExecutor())
except ImportError:
pass

try:
from cubed.runtime.executors.lithops import (
LithopsDagExecutor,
LithopsPipelineExecutor,
)
from cubed.runtime.executors.lithops import LithopsDagExecutor

ALL_EXECUTORS.append(LithopsDagExecutor(config=LITHOPS_LOCAL_CONFIG))
ALL_EXECUTORS.append(LithopsPipelineExecutor(config=LITHOPS_LOCAL_CONFIG))

MAIN_EXECUTORS.append(LithopsDagExecutor(config=LITHOPS_LOCAL_CONFIG))
except ImportError:
Expand Down Expand Up @@ -95,5 +87,6 @@ def create_zarr(a, /, store, *, dtype=None, chunks=None):

def execute_pipeline(pipeline, executor):
"""Executes a pipeline"""
plan = executor.pipelines_to_plan([pipeline])
executor.execute_plan(plan)
dag = nx.MultiDiGraph()
dag.add_node("node", pipeline=pipeline)
executor.execute_dag(dag)
Empty file.
29 changes: 0 additions & 29 deletions cubed/vendor/rechunker/executors/python.py

This file was deleted.

0 comments on commit 2eed1bb

Please sign in to comment.