-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Coiled Functions executor #260
Coiled Functions executor #260
Conversation
for more information, see https://pre-commit.ci
…holas/cubed into coiled_functions_executor
It works! See notebook here @jrbourbeau I have no idea what changed with the bucket but when I tried to re-run the same thing today it worked first time 🤔 The Coiled cluster ID I used was Bits that currently don't work very well / I couldn't work out how to do:
|
You could call
This relies on callbacks, so it won't work without those being implemented. |
Sometimes when I try to run with Coiled I get this error from Cubed and I'm unsure why. This is what James and I saw last week. This literally seems to happen intermittently without me changing any credentials. --------------------------------------------------------------------------
ArrayNotFoundError Traceback (most recent call last)
Cell In[9], line 1
----> 1 c.compute(
2 executor=coiledexecutor,
3 memory='1 GB',
4 account='dask',
5 )
File ~/Documents/Work/Code/cubed/cubed/core/array.py:137, in CoreArray.compute(self, executor, callbacks, optimize_graph, resume, **kwargs)
127 def compute(
128 self,
129 *,
(...)
134 **kwargs,
135 ):
136 """Compute this array, and any arrays that it depends on."""
--> 137 result = compute(
138 self,
139 executor=executor,
140 callbacks=callbacks,
141 optimize_graph=optimize_graph,
142 resume=resume,
143 **kwargs,
144 )
145 if result:
146 return result[0]
File ~/Documents/Work/Code/cubed/cubed/core/array.py:420, in compute(executor, callbacks, optimize_graph, resume, *arrays, **kwargs)
410 plan.execute(
411 executor=executor,
412 callbacks=callbacks,
(...)
416 **kwargs,
417 )
419 if _return_in_memory_array:
--> 420 return tuple(a._read_stored() for a in arrays)
File ~/Documents/Work/Code/cubed/cubed/core/array.py:420, in <genexpr>(.0)
410 plan.execute(
411 executor=executor,
412 callbacks=callbacks,
(...)
416 **kwargs,
417 )
419 if _return_in_memory_array:
--> 420 return tuple(a._read_stored() for a in arrays)
File ~/Documents/Work/Code/cubed/cubed/core/array.py:122, in CoreArray._read_stored(self)
118 def _read_stored(self):
119 # Only works if the array has been computed
120 if self.size > 0:
121 # read back from zarr
--> 122 return self.zarray[...]
123 else:
124 # this case fails for zarr, so just return an empty array of the correct shape
125 return np.empty(self.shape, dtype=self.dtype)
File ~/Documents/Work/Code/cubed/cubed/core/array.py:61, in CoreArray.zarray(self)
58 @property
59 def zarray(self):
60 """The underlying Zarr array. May only be used during the computation once the array has been created."""
---> 61 return open_if_lazy_zarr_array(self._zarray)
File ~/Documents/Work/Code/cubed/cubed/storage/zarr.py:115, in open_if_lazy_zarr_array(array)
113 def open_if_lazy_zarr_array(array: T_ZarrArray) -> zarr.Array:
114 """If array is a LazyZarrArray then open it, leaving other arrays unchanged."""
--> 115 return array.open() if isinstance(array, LazyZarrArray) else array
File ~/Documents/Work/Code/cubed/cubed/storage/zarr.py:72, in LazyZarrArray.open(self)
67 """Open the Zarr array for reading or writing and return it.
68
69 Note that the Zarr array must have been created or this method will raise an exception.
70 """
71 # r+ means read/write, fail if it doesn't exist
---> 72 return zarr.open_array(
73 self.store,
74 mode="r+",
75 shape=self.shape,
76 dtype=self.dtype,
77 chunks=self.chunks,
78 )
File ~/miniconda3/envs/cubed_coiled/lib/python3.11/site-packages/zarr/creation.py:579, in open_array(store, mode, shape, chunks, dtype, compressor, fill_value, order, synchronizer, filters, cache_metadata, cache_attrs, path, object_codec, chunk_store, storage_options, partial_decompress, write_empty_chunks, zarr_version, dimension_separator, meta_array, **kwargs)
577 if contains_group(store, path=path):
578 raise ContainsGroupError(path)
--> 579 raise ArrayNotFoundError(path)
581 elif mode == 'w':
582 init_array(store, shape=shape, chunks=chunks, dtype=dtype,
583 compressor=compressor, fill_value=fill_value,
584 order=order, filters=filters, overwrite=True, path=path,
585 object_codec=object_codec, chunk_store=chunk_store,
586 dimension_separator=dimension_separator)
ArrayNotFoundError: array not found at path %r' '' |
I think I managed to get all the callbacks working! They show that |
This suggests that the Zarr array was not created (by the worker), or that it was created but wasn't visible (to the client). You could check if you can see the Zarr arrays in S3:
|
I think this is ready to be merged.
The reason for the test failure is that this PR doesn't have tests, which takes cubed below 90% test coverage. We could add tests in this PR, but tests will require hooking up to a Coiled account because there is no local version of Coiled.
Discussion of the adaptive scaling properties of Coiled Functions deserves its own follow-up issue. |
Yes, happy to merge this. BTW could you add a line to https://github.com/tomwhite/cubed/blob/main/.coveragerc then it will avoid the test coverage failure. |
WIP attempt at an executor for running on Coiled functions. Don't actually know if it works yet because we had Cloud authentication issues.
Closes #254
One tricky bit is that IIUC currently you have to pass cluster size to
Coiled.run
, whereas we want to pass worker size (and adaptively scale up the number of workers).cc @jrbourbeau