Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turn off dask profiler by default #19

Open
nsmith- opened this issue Dec 2, 2022 · 5 comments
Open

Turn off dask profiler by default #19

nsmith- opened this issue Dec 2, 2022 · 5 comments
Labels
enhancement New feature or request

Comments

@nsmith-
Copy link
Member

nsmith- commented Dec 2, 2022

In https://git.rwth-aachen.de/3pia/cms_analyses/common/-/blob/master/dask.yml#L34-37 the lines

distributed:
  worker:
    memory:
      target: 0.7
      spill: 0.9
      pause: 0.92
      terminate: 0
    profile:
      interval: 1d
      cycle: 2d
      low-level: False

adjust default dask worker memory limits and profiler settings. The worker memory pause fraction is a source of frequent headaches as jobs on that worker stall for a long time, often slowing down the overall processing. @pfackeldey would you recommend these defaults also here?

@oshadura
Copy link
Member

oshadura commented Dec 5, 2022

Following as it could be interesting for coffea-casa AF as well :)

@pfackeldey
Copy link

Dear @nsmith-,
these settings have been good and solid for us so far. Keep in mind though, that they are somewhat specific for the use-case with HTCondor (e.g. we let HTCondor terminate the job with the worker, and we prohibit here the nanny to terminate it via terminate: 0).
Concerning the profiling: we disabled it as it is only relevant for the performance/profile tab in the GUI of the dask scheduler. This GUI is for us barely/not usable in the case where we do a full processing (>50k dask-jobs); thus we disabled it.
Best, Peter

@nsmith-
Copy link
Member Author

nsmith- commented Feb 16, 2023

I've realized that in this setup, the dask config arguments are difficult to propagate to the workers. Just adding things to the yaml or with dask.config.set on the client is not sufficient. With the patch in #20 one can use environment variables in the constructor,

cluster = LPCCondorCluster(
    job_script_prologue=[
        "export DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL=1d",
        "export DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE=2d",
    ],
)

and can confirm they get respected:

In [1]: client.run(lambda: dask.config.get("distributed.worker.profile"))
Out[1]:
{'tcp://131.225.191.76:10001': {'interval': '1d',
  'cycle': '2d',
  'enabled': True,
  'low-level': False}}

@yimuchen perhaps with this patch you can also play more easily with MALLOC_TRIM_THRESHOLD_ and other memory settings as well?

@yimuchen
Copy link
Contributor

I just confirmed that this patch indeed allows us to set worker environment variables.

@nsmith- nsmith- added the enhancement New feature or request label Feb 6, 2024
@NJManganelli
Copy link

I wanted to consolidate this into my own config, ended up going a bit overboard with boilerplate (so this is non-minimal), but doing the following (creating a context manager with all the jobqueue and dask workers settings), which is simultaneously written as a config file to a location the workers can see, seems to ensure all the settings are respected (some like nanny are picked up from the context config, some only from the file, and it's possible that in writing out this config I'm overriding/missing some elements that would otherwise be picked up from the default lpcjobqueue config, but anyway...

Note, I couldn't get the dask environment variables to work for the MALLOC and MKL/OMP threads settings, because the parser seems to lower-case the variable names, and thus I was getting a "MALLOC_TRIM_THRESHOLD_": 65536 and a "malloc_trim_threshold_": , and it seems like only the properly upper-cased one was respected). Therefore I stuck with passing them in as env variables for the "distributed.nanny.pre-spawn-environ"

from __future__ import annotations

from typing import Literal, Any

import os
import sys
import pprint
import pathlib
from ruamel import yaml

import dask
from distributed import Client
if "cmslpc" in os.getenv("HOSTNAME"):
    from lpcjobqueue import LPCCondorCluster
    _LPC = True
else:
    _LPC = False

def retrieve_cluster_config(client, key="jobqueue", subkeys: list[str]=None):
    config = client.run(lambda: dask.config.get(key))
    #config = client.run(lambda: dask.config.get("distributed"))
    if subkeys is not None:
        for key in subkeys:
            config = {worker: config[worker].get(key) for worker in config}
    return config

def jobqueue_cluster(n_workers: int | list[int | str] = 1, # pass list of length 2 for min and max adaptive scaling
                     # specific to this function, will do a wait for this minimum number of workers, blocking call
                     wait_for_workers: int | None | bool = 1,
                     # direct dask jobqueue configuration arguments for convenience
                     cores: int = 4, # will set in the "jobqueue.{config_name}"
                     memory: str | int = "8GB", # will set in the "jobqueue.{config_name}"
                     disk: str | int = "30GB", # will set in the "jobqueue.{config_name}"
                     processes: int = 1, # will set in the "jobqueue.{config_name}"
                     log_directory: str | bool = False, # will set in the "jobqueue.{config_name}"
                     config_name: str = None, # for picking up settings via "jobqueue.{config_name}" in dask.config
                     job_extra_directives: dict[str, Any] | None = None,
                     profile_level: Literal["low", "medium", "high"] = "high",
                     # dask jobqueue configuration arguments from dict, overriding default set in function and above
                     config_set_args: dict[str, Any] = None,
                     # LPCCondorCluster direct arguments
                     ship_env: bool = True,
                     image: str | None = None,
                     transfer_input_files: str | list[str] | None = None,
                     # death_timeout=180,
                     # job_script_prologue: list[str] | None = None,
                     verbose=False,
                     **base_class_kwargs):
    # The Inception of Configurations... this should all really follow the dask mechanisms with yaml configurations, but now we're here
    # https://github.com/dask/dask-jobqueue/blob/main/dask_jobqueue/core.py
    # https://github.com/dask/dask-jobqueue/blob/main/dask_jobqueue/htcondor.py
    # https://github.com/CoffeaTeam/lpcjobqueue/blob/main/src/lpcjobqueue/cluster.py
    # https://github.com/CoffeaTeam/lpcjobqueue/blob/main/src/lpcjobqueue/config.yaml
    # https://github.com/dask/distributed/blob/main/distributed/nanny.py

    # This might be useful: https://github.com/search?q=repo%3Adask%2Fdistributed%20split-shuffle&type=code
    #     unknown-task-duration: 500ms  # Default duration for all tasks with unknown durations ("15m", "2h")
    #     default-task-durations: # How long we expect function names to run ("1h", "1s") (helps for long tasks)
    #               rechunk-split: 1us

    # Args that must be passed to LPCCondorCluster directly:
    # ship_env, image (auto-prefix: "/cvmfs/unpacked.cern.ch/registry.hub.docker.com/"), transfer_input_files, base_class_kwargs
    if config_name is None:
        if _LPC:
            config_name = "lpccondor"
            # auto log directory case, otherwise pass through via config
            if log_directory is True:
                log_directory = f"/uscmst1b_scratch/lpc1/3DayLifetime/{os.getlogin()}/"
        else:
            raise NotImplementedError("Only LPCCondorCluster is supported at the moment, automatic check of HOSTNAME not matched, please set explicitly with `name` argument")
    if job_extra_directives is None:
        job_extra_directives = {}
    # make sure x509 proxy is added to the extra_directives...
    job_extra_directives.update(set_default_proxy(base_class_kwargs.get("job_extra_directives", {})))
    if image is None:
        image = os.getenv("COFFEA_IMAGE")
    if transfer_input_files is None:
        # kwargs.setdefault('transfer_input_files', [f'{os.getenv("BASE")}/utils', f'{os.getenv("BASE")}/models'])
        transfer_input_files = []

    default_set_args = {
        # These are set in the config, implicitly picked up by dask_jobqueue.core.Job, propagated up to HTCondorCluster etc.
        f"jobqueue.{config_name}.cores": cores,
        f"jobqueue.{config_name}.memory": memory,
        f"jobqueue.{config_name}.disk": disk,
        f"jobqueue.{config_name}.processes": processes, # default 1, threads will be calculated as int(cores/processes)
        f"jobqueue.{config_name}.log_directory": log_directory,
        f"jobqueue.{config_name}.death_timeout": 180,
        f"jobqueue.{config_name}.job_extra_directives": job_extra_directives,

        "distributed.worker.memory.target": 0.80, # default 0.6
        "distributed.worker.memory.rebalance.recipient_max": 0.80, # default 0.6
        "distributed.worker.memory.rebalance.sender_min": 0.50, # default 0.3
        "distributed.worker.memory.rebalance.sender_recipient_gap": 0.1, # default 0.1
        "distributed.worker.memory.spill": 0.91, # default 0.7
        "distributed.worker.memory.pause": 0.93, # default 0.8
        "distributed.worker.memory.terminate": 0.96, # default 0.95
        "distributed.worker.memory.max_spill": "15GB", # default false which permits unlimited spill
        "distributed.scheduler.dashboard.status.task_stream_length": 5000, # default 1000
        "distributed.scheduler.dashboard.tasks.task_stream_length": 500000, # default 100000
        "distributed.nanny.pre-spawn-environ": ["export MALLOC_TRIM_THRESHOLD_=65536",
                                                f"export MKL_NUM_THREADS={2}",
                                                f"export OMP_NUM_THREADS={2}",
                                                f"export OPENBLAS_NUM_THREADS={2}",
        #                                        f"export XRDPARALLELEVTLOOP={cores}",
                                                ],
        "distributed.comm.zstd.level": 3, # default 3
        #"distributed.comm.zstd.threads": cores, # default 0
    }

    if profile_level == "low":
        default_set_args.update({
        "distributed.worker.profile.interval": "1d",
        "distributed.worker.profile.cycle": "2d"})
    elif profile_level == "medium":
        default_set_args.update({
        "distributed.worker.profile.interval": "1s",
        "distributed.worker.profile.cycle": "1m"})
    elif profile_level == "high":
        default_set_args.update({
        "distributed.worker.profile.interval": "10ms",
        "distributed.worker.profile.cycle": "1000ms"})
    else:
        raise ValueError("profile_level must be one of 'low', 'medium', 'high'")

    if config_set_args is None:
        config_set_args = {}
    if not isinstance(config_set_args, dict):
        raise ValueError("config_set_args must be a dictionary of key-value pairs in the style of dask.config.set inputs")
    default_set_args.update(config_set_args)
    #with dask.config.set(config=dask.config, **{"distributed.worker.profile.interval": "1d"}) as config:
    # with dask.config.set(default_set_args, dask.config) as config:
    with dask.config.set(default_set_args) as config:
        # jobqueue and distributed.nanny settings instantiated here will get propagated to the workers for setup
        # but will not be reflected in the retrieved settings on the workers (this shouldn't matter) - they will be seen in the local/submitting environment
        if verbose:
            pprint.pprint("config:")
            pprint.pprint(config)
        # in order to propagate the non-nanny distributed settings to the workers, we have to make it into a yaml file accessible in their resolution order:
        # https://docs.dask.org/en/latest/configuration.html
        # ~/.config/dask #inaccessible
        # {sys.prefix}/etc/dask # which is /srv/.env/etc/dask
        # ... others
        # these settings will be reflected in the retrieved configuration settings (near as I've checked), but will NOT be seen in the local/submitting environment
        folder = pathlib.Path(f"{sys.prefix}/etc/dask/")
        folder.mkdir(parents=True, exist_ok=True)
        yaml_file = folder / f"{config_name}.yaml"
        with open(yaml_file, 'w') as out:
            yaml.dump(config, out, default_flow_style=False)
            if verbose:
                print("yaml file written to", yaml_file.absolute())

        print("Generating the Cluster...")
        if config_name == "lpccondor":
            cluster = LPCCondorCluster(#scheduler=None,
                                       name=None,
                                       ship_env=ship_env,
                                       image=image,
                                       transfer_input_files=transfer_input_files,
                                       **base_class_kwargs
                                       )
        else:
            raise NotImplementedError("Only LPCCondorCluster is supported at the moment")
        if isinstance(n_workers, int):
            cluster.scale(n_workers)
        elif isinstance(n_workers, list) and len(n_workers) == 2:
            cluster.adapt(minimum=n_workers[0], maximum=n_workers[1])
        client = Client(cluster)
        if wait_for_workers is True or (isinstance(wait_for_workers, int) and wait_for_workers > 0):
            print(f"Waiting for {wait_for_workers} workers...")
            client.wait_for_workers(wait_for_workers)
            print("Workers online!")
        else:
            print("Not awaiting workers, please ensure to call `client.wait_for_workers(n: int > 0)` before submitting workloads.")
        print("Dashboard available at", client.dashboard_link)
        return cluster, client

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants