Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trial running the rechunking tool on Beam's Dask Runner #31

Open
mattjbr123 opened this issue Nov 7, 2024 · 5 comments
Open

Trial running the rechunking tool on Beam's Dask Runner #31

mattjbr123 opened this issue Nov 7, 2024 · 5 comments
Assignees

Comments

@mattjbr123
Copy link
Collaborator

mattjbr123 commented Nov 7, 2024

https://beam.apache.org/releases/pydoc/current/apache_beam.runners.dask.dask_runner.html

It's relatively new and buggy so might prove more hassle than it's worth, but we can give it a try and move to other runners (probably Spark) if needed.

@metazool
Copy link

metazool commented Dec 5, 2024

Can I ask a bit about where you're running Beam?

Is this on JASMIN, or is it AWS-based? Is there somewhere we can read through the configuration?

cc @albags

(The context for asking was talking through the Argo Workflows setup that's being used with https://github.com/NERC-CEH/dri-ingestion - possible that Beam would be a good fit for image processing pipelines, and what's needed to ensure portability between one and the other?)

@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Dec 5, 2024

Originally the idea is to trial it on JASMIN, specifically LOTUS, their HPC-like cluster, which has dask-gateway for ease of spinning up dask clusters

Is there somewhere we can read through the configuration?

There will be once I've actually had a go at it!

@mattjbr123
Copy link
Collaborator Author

I realise a 'getting started' blurb and some more info might be helpful.
There are minimal instructions in the README for creating the necessary environment and running the script that uses Beam. Currently doesn't cover needing access to the data itself, which is on a 'group_workspace' (read 'access controlled disk space') on JASMIN (which I can happilly grant you access to if you go down that route).
So you might not be able to actually run the script yet but maybe looking at it is still somewhat helpful.

There is limited configuration needed for Beam itself at this stage of our development - it largely figures stuff out for itself with the pipeline you've given it to run. The only things we've tweaked so far is 'num_workers' when running in parallel, the 'direct_running_mode' to tell it to run in serial or parallel, and 'auto_unique_labels' which allows multiple steps in the pipeline of the same name (e.g. Print, useful for debugging).
So far we have run it 'locally', which in this case means on a JASMIN VM or PM (physical machine), and on LOTUS, JASMIN's HPC-like cluster, both using the 'Direct Runner' which is the built-in no-frills generic 'Runner' which means Beam can run anywhere.

The next step, covered by this here issue, is to move away from the limitations of the Direct Runner and use another, the first to try being the Dask Runner. What this does is allow a Beam pipeline to be translated into Dask parlance/objects/task-graphs etc. and run it wherever you have a dask cluster, such as the dask gateway that JASMIN maintains. This will probably require some more configuration, and is what I'm currently working on.

@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Dec 13, 2024

Have successfully configured the dask gateway on JASMIN and run a trial job, using the below code snippet:

import dask_gateway

gw = dask_gateway.Gateway("https://dask-gateway.jasmin.ac.uk")

options = gw.cluster_options()

# number of cores for each worker
options.worker_cores = 4

# to ensure the client, scheduler and workers all use the same python environment
options.worker_setup = "source /home/users/mattjbr/miniconda3/bin/activate /home/users/mattjbr/miniconda3/envs/daskbeam"

cluster = gw.new_cluster(options, shutdown_on_close=False)

cluster.adapt(minimum=1, maximum=4)

client = cluster.get_client()

client.status
running'

client
<Client: 'tls://172.17.11.136:39389' processes=1 threads=2, memory=8.00 GiB>

and with a config file containing an API token to allow access to the gateway in my home directory at ~/config/dask/gateway.yaml
(how we manage this 'secret' for the conversion tool we are developing needs some thought)

@mattjbr123
Copy link
Collaborator Author

Xarray then would pick up this client by default, without any further config, whereas beam will require the URL to be provided.
Next commit has added a copy of the script adapted for using dask & beam, or at least how I think it should work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development

When branches are created from issues, their pull requests are automatically linked.

2 participants