Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coiled Functions executor #260

Merged
merged 12 commits into from
Jul 24, 2023
Merged
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ omit =
cubed/array_api/*
cubed/extensions/*
cubed/runtime/executors/beam.py
cubed/runtime/executors/coiled.py
cubed/runtime/executors/dask.py
cubed/runtime/executors/lithops.py
cubed/runtime/executors/modal*.py
Expand Down
52 changes: 52 additions & 0 deletions cubed/runtime/executors/coiled.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Any, Mapping, Optional, Sequence

import coiled
from dask.distributed import as_completed
from networkx import MultiDiGraph

from cubed.core.array import Callback
from cubed.core.plan import visit_nodes
from cubed.runtime.types import DagExecutor
from cubed.runtime.utils import execution_stats, handle_callbacks


def exec_stage_func(func, m, coiled_kwargs, **kwargs):
# TODO would be good to give the dask tasks useful names

# coiled_kwargs are tokenized by coiled.run, so each stage will reconnect to same cluster
return coiled.run(**coiled_kwargs)(execution_stats(func)).submit(m, **kwargs)


class CoiledFunctionsDagExecutor(DagExecutor):
"""An execution engine that uses Coiled Functions."""

def execute_dag(
self,
dag: MultiDiGraph,
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
**coiled_kwargs: Mapping[str, Any],
) -> None:
# Note this currently only builds the task graph for each stage once it gets to that stage in computation
for name, node in visit_nodes(dag, resume=resume):
pipeline = node["pipeline"]
for stage in pipeline.stages:
if stage.mappable is not None:
futures = []
for m in stage.mappable:
future_func = exec_stage_func(
stage.function, m, coiled_kwargs, config=pipeline.config
)
futures.append(future_func)
else:
raise NotImplementedError()

# gather the results of the coiled functions
ac = as_completed(futures)
if callbacks is not None:
for future in ac:
result, stats = future.result()
if name is not None:
stats["array_name"] = name
handle_callbacks(callbacks, stats)
29 changes: 29 additions & 0 deletions examples/coiled/aws/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Examples running Cubed on Coiled

## Pre-requisites

1. A [Coiled account](https://coiled.io/)
2. An AWS account (for S3 storage)

## Set up

1. Save your aws credentials in a ``~/.aws/credentials`` file locally, following [Coiled's instructions on accessing remote data](https://docs.coiled.io/user_guide/remote-data-access.html).
2. Create a new S3 bucket (called `cubed-<username>-temp`, for example) in the `us-east-1` region. This will be used for intermediate data.
3. Install a Python environment with the coiled package in it by running the following from this directory:

```shell
conda create --name cubed-coiled-examples -y python=3.8
conda activate cubed-coiled-examples
pip install 'cubed[coiled]'
export CUBED_COILED_REQUIREMENTS_FILE=$(pwd)/requirements.txt
```

## Examples

Start with the simplest example:

```shell
python coiled-add-asarray.py "s3://cubed-$USER-temp"
```

If successful it should print a 4x4 matrix.
28 changes: 28 additions & 0 deletions examples/coiled/aws/coiled-add-asarray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import sys

import cubed
import cubed.array_api as xp
from cubed.runtime.executors.coiled import CoiledFunctionsDagExecutor

if __name__ == "__main__":
tmp_path = sys.argv[1]
spec = cubed.Spec(tmp_path, allowed_mem=100000)
executor = CoiledFunctionsDagExecutor()
a = xp.asarray(
[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]],
chunks=(2, 2),
spec=spec,
)
b = xp.asarray(
[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]],
chunks=(2, 2),
spec=spec,
)
c = xp.add(a, b)
res = c.compute(
executor=executor,
memory="1 GiB", # must be greater than allowed_mem
compute_purchase_option="spot_with_fallback", # recommended
account="dask", # dask maintainers account - change this to your account
)
print(res)
4 changes: 4 additions & 0 deletions examples/coiled/aws/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
coiled
cubed
s3fs
tqdm
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ modal = [
"modal-client",
"s3fs",
]
coiled = [
"cubed[diagnostics]",
"coiled",
"dask[complete]",
"s3fs",
]
test = [
"apache-beam", # beam but not gcsfs as tests use local beam runner
"cubed[diagnostics,lithops]", # modal tests separate due to conflicting package reqs
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ show_error_codes = True
ignore_missing_imports = True
[mypy-aiostream.*]
ignore_missing_imports = True
[mypy-coiled.*]
ignore_missing_imports = True
[mypy-dask.*]
ignore_missing_imports = True
[mypy-distributed.*]
Expand Down