diff --git a/dask_jobqueue/htcondor.py b/dask_jobqueue/htcondor.py index fb7b0be7..18fc74eb 100644 --- a/dask_jobqueue/htcondor.py +++ b/dask_jobqueue/htcondor.py @@ -16,7 +16,6 @@ class HTCondorJob(Job): %(job_header)s -Environment = "%(quoted_environment)s" Arguments = "%(quoted_arguments)s" Executable = %(executable)s @@ -67,7 +66,14 @@ def __init__( env_extra = dask.config.get( "jobqueue.%s.env-extra" % self.config_name, default=[] ) - self.env_dict = self.env_lines_to_dict(env_extra) + + if env_extra is not None: + # Overwrite command template: prepend commands from env_extra separated by semicolon. + # This is special for HTCondor, because lines to execute on the worker node cannot be + # simply added to the submit script like for other batch systems. + self._command_template = ( + "; ".join(env_extra) + "; " + self._command_template + ) self.job_header_dict = { "MY.DaskWorkerName": '"htcondor--$F(MY.JobId)--"', @@ -118,31 +124,15 @@ def __init__( + " ".join(shlex.quote(arg) for arg in cancel_command_extra) ) - def env_lines_to_dict(self, env_lines): - """Convert an array of export statements (what we get from env-extra - in the config) into a dict""" - env_dict = {} - for env_line in env_lines: - split_env_line = shlex.split(env_line) - if split_env_line[0] == "export": - split_env_line = split_env_line[1:] - for item in split_env_line: - if "=" in item: - k, v = item.split("=", 1) - env_dict[k] = v - return env_dict - def job_script(self): """Construct a job submission script""" quoted_arguments = quote_arguments(["-c", self._command_template]) - quoted_environment = quote_environment(self.env_dict) job_header_lines = "\n".join( "%s = %s" % (k, v) for k, v in self.job_header_dict.items() ) return self._script_template % { "shebang": self.shebang, "job_header": job_header_lines, - "quoted_environment": quoted_environment, "quoted_arguments": quoted_arguments, "executable": self.executable, } @@ -260,6 +250,17 @@ class HTCondorCluster(JobQueueCluster): This also works with adaptive clusters. This automatically launches and kill workers based on load. >>> cluster.adapt(maximum_jobs=20) + + If setup commands need to be run before starting the worker on the worker node, ``env_extra`` can be used, + e.g., to activate a virtual environment: + + >>> from dask_jobqueue.htcondor import HTCondorCluster + >>> cluster = HTCondorCluster(cores=1, memory="2GB", disk="4GB", + env_extra=['cd /some/path/', 'source venv/bin/activate']) + + Note that environment variables are no longer passed via the ``Environment`` parameter in the submit + description file. If you explictly want to set that, you need to use ``job_extra``. + """.format( job=job_parameters, cluster=cluster_parameters ) diff --git a/dask_jobqueue/tests/test_htcondor.py b/dask_jobqueue/tests/test_htcondor.py index 99a55733..1664da85 100644 --- a/dask_jobqueue/tests/test_htcondor.py +++ b/dask_jobqueue/tests/test_htcondor.py @@ -27,7 +27,12 @@ def test_job_script(): processes=2, memory="100MB", disk="100MB", - env_extra=['export LANG="en_US.utf8"', 'export LC_ALL="en_US.utf8"'], + env_extra=[ + 'export LANG="en_US.utf8"', + 'export LC_ALL="en_US.utf8"', + "cd /some/path/", + "source venv/bin/activate", + ], job_extra={"+Extra": "True"}, submit_command_extra=["-verbose"], cancel_command_extra=["-forcex"], @@ -40,9 +45,10 @@ def test_job_script(): assert "MY.DaskWorkerDisk = 100000000" in job_script assert "MY.DaskWorkerMemory = 100000000" in job_script assert 'MY.JobId = "$(ClusterId).$(ProcId)"' in job_script - assert "LANG=en_US.utf8" in job_script - assert "LC_ALL=en_US.utf8" in job_script - assert "export" not in job_script + assert 'export LANG=""en_US.utf8""' in job_script + assert 'export LC_ALL=""en_US.utf8""' in job_script + assert "cd /some/path/" in job_script + assert "source venv/bin/activate" in job_script assert "+Extra = True" in job_script assert re.search( r"condor_submit\s.*-verbose", cluster._dummy_job.submit_command diff --git a/docs/source/advanced-tips-and-tricks.rst b/docs/source/advanced-tips-and-tricks.rst index bd798106..237adc6f 100644 --- a/docs/source/advanced-tips-and-tricks.rst +++ b/docs/source/advanced-tips-and-tricks.rst @@ -68,6 +68,36 @@ accepted option on some SLURM clusters. The error was something like this: sbatch: error: Memory specification can not be satisfied sbatch: error: Batch job submission failed: Requested node configuration is not available +Run setup commands before starting the worker with ``env_extra`` +---------------------------------------------------------------- + +Sometimes you need to run some setup commands before the actual worker can be started. This includes +setting environment variables, loading environment modules, sourcing/activating a virtual environment, +or activating conda/mamba environments. + +This can be achieved using the ``env_extra`` parameter. Example for setting up a virtual environment: + +.. code-block:: python + + from dask_jobqueue.htcondor import HTCondorCluster + env_extra = ['cd /some/path', 'source venv/bin/activate'] + cluster = HTCondorCluster(cores=1, memory="2GB", disk="4GB", log_directory = 'logs', python='python3', + env_extra=env_extra) + print(cluster.job_script()) + +For ``HTCondorCluster``, the commands will be prepended to the actual python call in the ``Arguments`` +parameter in the submit description file. The relevant lines will look like this: + +.. code-block:: text + + ... + Arguments = "-c 'cd /some/path; source venv/bin/activate; python3 -m distributed.cli.dask_worker tcp://: --nthreads 1 --memory-limit 2.00GB --name dummy-name --nanny --death-timeout 60'" + Executable = /bin/sh + ... + +For other batch systems (``*Cluster`` classes) the additional commands will be inserted as separate lines +in the submission script. + How to handle job queueing system walltime killing workers ---------------------------------------------------------- diff --git a/docs/source/examples.rst b/docs/source/examples.rst index ad029716..4f9a3823 100644 --- a/docs/source/examples.rst +++ b/docs/source/examples.rst @@ -32,7 +32,7 @@ PBS Deployments interface='ib0') Moab Deployments -~~~~~~~~~~~~~~~~ +---------------- On systems which use the Moab Workload Manager, a subclass of ``PBSCluster`` can be used, called ``MoabCluster``: