Skip to content

Commit

Permalink
Coiled Functions executor (#260)
Browse files Browse the repository at this point in the history
* first attempt

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* notimplementederror for a non-mappable stage

* add example of running cubed via coiled on aws

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* get callbacks working

* add option to install coiled dependencies

* rename requirements.txt

* make mypy ignore missing stubs for coiled

* ignore for purposes of test coverage

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
TomNicholas and pre-commit-ci[bot] authored Jul 24, 2023
1 parent ba9e63a commit e2bc217
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 0 deletions.
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ omit =
cubed/array_api/*
cubed/extensions/*
cubed/runtime/executors/beam.py
cubed/runtime/executors/coiled.py
cubed/runtime/executors/dask.py
cubed/runtime/executors/lithops.py
cubed/runtime/executors/modal*.py
Expand Down
52 changes: 52 additions & 0 deletions cubed/runtime/executors/coiled.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Any, Mapping, Optional, Sequence

import coiled
from dask.distributed import as_completed
from networkx import MultiDiGraph

from cubed.core.array import Callback
from cubed.core.plan import visit_nodes
from cubed.runtime.types import DagExecutor
from cubed.runtime.utils import execution_stats, handle_callbacks


def exec_stage_func(func, m, coiled_kwargs, **kwargs):
# TODO would be good to give the dask tasks useful names

# coiled_kwargs are tokenized by coiled.run, so each stage will reconnect to same cluster
return coiled.run(**coiled_kwargs)(execution_stats(func)).submit(m, **kwargs)


class CoiledFunctionsDagExecutor(DagExecutor):
"""An execution engine that uses Coiled Functions."""

def execute_dag(
self,
dag: MultiDiGraph,
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
**coiled_kwargs: Mapping[str, Any],
) -> None:
# Note this currently only builds the task graph for each stage once it gets to that stage in computation
for name, node in visit_nodes(dag, resume=resume):
pipeline = node["pipeline"]
for stage in pipeline.stages:
if stage.mappable is not None:
futures = []
for m in stage.mappable:
future_func = exec_stage_func(
stage.function, m, coiled_kwargs, config=pipeline.config
)
futures.append(future_func)
else:
raise NotImplementedError()

# gather the results of the coiled functions
ac = as_completed(futures)
if callbacks is not None:
for future in ac:
result, stats = future.result()
if name is not None:
stats["array_name"] = name
handle_callbacks(callbacks, stats)
29 changes: 29 additions & 0 deletions examples/coiled/aws/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Examples running Cubed on Coiled

## Pre-requisites

1. A [Coiled account](https://coiled.io/)
2. An AWS account (for S3 storage)

## Set up

1. Save your aws credentials in a ``~/.aws/credentials`` file locally, following [Coiled's instructions on accessing remote data](https://docs.coiled.io/user_guide/remote-data-access.html).
2. Create a new S3 bucket (called `cubed-<username>-temp`, for example) in the `us-east-1` region. This will be used for intermediate data.
3. Install a Python environment with the coiled package in it by running the following from this directory:

```shell
conda create --name cubed-coiled-examples -y python=3.8
conda activate cubed-coiled-examples
pip install 'cubed[coiled]'
export CUBED_COILED_REQUIREMENTS_FILE=$(pwd)/requirements.txt
```

## Examples

Start with the simplest example:

```shell
python coiled-add-asarray.py "s3://cubed-$USER-temp"
```

If successful it should print a 4x4 matrix.
28 changes: 28 additions & 0 deletions examples/coiled/aws/coiled-add-asarray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import sys

import cubed
import cubed.array_api as xp
from cubed.runtime.executors.coiled import CoiledFunctionsDagExecutor

if __name__ == "__main__":
tmp_path = sys.argv[1]
spec = cubed.Spec(tmp_path, allowed_mem=100000)
executor = CoiledFunctionsDagExecutor()
a = xp.asarray(
[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]],
chunks=(2, 2),
spec=spec,
)
b = xp.asarray(
[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]],
chunks=(2, 2),
spec=spec,
)
c = xp.add(a, b)
res = c.compute(
executor=executor,
memory="1 GiB", # must be greater than allowed_mem
compute_purchase_option="spot_with_fallback", # recommended
account="dask", # dask maintainers account - change this to your account
)
print(res)
4 changes: 4 additions & 0 deletions examples/coiled/aws/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
coiled
cubed
s3fs
tqdm
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ modal = [
"modal-client",
"s3fs",
]
coiled = [
"cubed[diagnostics]",
"coiled",
"dask[complete]",
"s3fs",
]
test = [
"apache-beam", # beam but not gcsfs as tests use local beam runner
"cubed[diagnostics,lithops]", # modal tests separate due to conflicting package reqs
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ show_error_codes = True
ignore_missing_imports = True
[mypy-aiostream.*]
ignore_missing_imports = True
[mypy-coiled.*]
ignore_missing_imports = True
[mypy-dask.*]
ignore_missing_imports = True
[mypy-distributed.*]
Expand Down

0 comments on commit e2bc217

Please sign in to comment.