diff --git a/all-tests.sh b/all-tests.sh index 94bf6504..32e22d59 100755 --- a/all-tests.sh +++ b/all-tests.sh @@ -18,3 +18,4 @@ pytest -v tests/test_pipeline_cli.py pytest -v tests/test_pipeline_actions.py pytest -v tests/test_execution_cleanup.py pytest -v tests/test_s3_decorators.py +pytest -v tests/test_container.py diff --git a/cgatcore/pipeline/execution.py b/cgatcore/pipeline/execution.py index df529871..7e7e41d7 100644 --- a/cgatcore/pipeline/execution.py +++ b/cgatcore/pipeline/execution.py @@ -119,6 +119,60 @@ def _pickle_args(args, kwargs): return (submit_args, args_file) +class ContainerConfig: + """Container configuration for pipeline execution.""" + + def __init__(self, image=None, volumes=None, env_vars=None, runtime="docker"): + """ + Args: + image (str): Container image (e.g., "ubuntu:20.04"). + volumes (list): Volume mappings (e.g., ['/data:/data']). + env_vars (dict): Environment variables for the container. + runtime (str): Container runtime ("docker" or "singularity"). + """ + self.image = image + self.volumes = volumes or [] + self.env_vars = env_vars or {} + self.runtime = runtime.lower() # Normalise to lowercase + + if self.runtime not in ["docker", "singularity"]: + raise ValueError("Unsupported container runtime: {}".format(self.runtime)) + + def get_container_command(self, statement): + """Convert a statement to run inside a container.""" + if not self.image: + return statement + + if self.runtime == "docker": + return self._get_docker_command(statement) + elif self.runtime == "singularity": + return self._get_singularity_command(statement) + else: + raise ValueError("Unsupported container runtime: {}".format(self.runtime)) + + def _get_docker_command(self, statement): + """Generate a Docker command.""" + volume_args = [f"-v {volume}" for volume in self.volumes] + env_args = [f"-e {key}={value}" for key, value in self.env_vars.items()] + + return " ".join([ + "docker", "run", "--rm", + *volume_args, *env_args, self.image, + "/bin/bash", "-c", f"'{statement}'" + ]) + + def _get_singularity_command(self, statement): + """Generate a Singularity command.""" + volume_args = [f"--bind {volume}" for volume in self.volumes] + env_args = [f"--env {key}={value}" for key, value in self.env_vars.items()] + + return " ".join([ + "singularity", "exec", + *volume_args, *env_args, self.image, + "bash", "-c", f"'{statement}'" + ]) + + def start_session(): """start and initialize the global DRMAA session.""" global GLOBAL_SESSION @@ -739,6 +793,13 @@ def get_val(d, v, alt): return benchmark_data + def set_container_config(self, image, volumes=None, env_vars=None, runtime="docker"): + """Set container configuration for all tasks executed by this executor.""" + + if not image: + raise ValueError("An image must be specified for the container configuration.") + self.container_config = ContainerConfig(image=image, volumes=volumes, env_vars=env_vars, runtime=runtime) + def start_job(self, job_info): """Add a job to active_jobs list when it starts.""" self.active_jobs.append(job_info) @@ -788,15 +849,63 @@ def cleanup_failed_job(self, job_info): else: self.logger.info(f"Output file not found (already removed or not created): {outfile}") - def run(self, statement_list): - """Run a list of statements and track each job's lifecycle.""" + def run( + self, + statement_list, + job_memory=None, + job_threads=None, + container_runtime=None, + image=None, + volumes=None, + env_vars=None, + **kwargs,): + + """ + Execute a list of statements with optional container support. + + Args: + statement_list (list): List of commands to execute. + job_memory (str): Memory requirements (e.g., "4G"). + job_threads (int): Number of threads to use. + container_runtime (str): Container runtime ("docker" or "singularity"). + image (str): Container image to use. + volumes (list): Volume mappings (e.g., ['/data:/data']). + env_vars (dict): Environment variables for the container. + **kwargs: Additional arguments. + """ + # Validation checks + if container_runtime and container_runtime not in ["docker", "singularity"]: + self.logger.error(f"Invalid container_runtime: {container_runtime}") + raise ValueError("Container runtime must be 'docker' or 'singularity'") + + if container_runtime and not image: + self.logger.error(f"Container runtime specified without an image: {container_runtime}") + raise ValueError("An image must be specified when using a container runtime") + benchmark_data = [] + for statement in statement_list: job_info = {"statement": statement} - self.start_job(job_info) # Add job to active_jobs + self.start_job(job_info) try: - # Execute job + # Prepare containerized execution + if container_runtime: + self.set_container_config(image=image, volumes=volumes, env_vars=env_vars, runtime=container_runtime) + statement = self.container_config.get_container_command(statement) + + # Add memory and thread environment variables + if job_memory: + env_vars = env_vars or {} + env_vars["JOB_MEMORY"] = job_memory + if job_threads: + env_vars = env_vars or {} + env_vars["JOB_THREADS"] = job_threads + + # Debugging: Log the constructed command + self.logger.info(f"Executing command: {statement}") + + # Build and execute the statement full_statement, job_path = self.build_job_script(statement) process = subprocess.Popen( full_statement, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE @@ -806,19 +915,22 @@ def run(self, statement_list): if process.returncode != 0: raise OSError( f"Job failed with return code {process.returncode}.\n" - f"stderr: {stderr.decode('utf-8')}\nstatement: {statement}" + f"stderr: {stderr.decode('utf-8')}\ncommand: {statement}" ) - # Collect benchmark data if job was successful + # Collect benchmark data for successful jobs benchmark_data.append( - self.collect_benchmark_data([statement], resource_usage=[{"job_id": process.pid}]) + self.collect_benchmark_data( + statement, resource_usage={"job_id": process.pid} + ) ) - self.finish_job(job_info) # Remove job from active_jobs + self.finish_job(job_info) except Exception as e: self.logger.error(f"Job failed: {e}") self.cleanup_failed_job(job_info) - continue + if not self.ignore_errors: + raise return benchmark_data diff --git a/docs/container/tasks.md b/docs/container/tasks.md new file mode 100644 index 00000000..5d19bc8a --- /dev/null +++ b/docs/container/tasks.md @@ -0,0 +1,164 @@ +# Containerised Execution in `P.run()` + +The `P.run()` method supports executing jobs within container environments using **Docker** or **Singularity**. This functionality enables seamless integration of containerisation for computational workflows. + +## Features + +- **Container Runtime Support**: Execute jobs using either Docker or Singularity. +- **Environment Variables**: Pass custom environment variables to the container. +- **Volume Mapping**: Bind directories between the host system and the container. +- **Container-Specific Command Construction**: Automatically builds the appropriate command for Docker or Singularity. + +--- + +## API Documentation + +### `P.run()` + +The `P.run()` method executes a list of commands with optional support for containerisation via Docker or Singularity. + +### Parameters + +| Parameter | Type | Description | Default | +|---------------------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------| +| `statement_list` | `list` | List of commands (statements) to execute. | Required | +| `job_memory` | `str` | Memory requirements for the job (e.g., `"4G"`). | `None` | +| `job_threads` | `int` | Number of threads to use. | `None` | +| `container_runtime` | `str` | Container runtime to use. Must be `"docker"` or `"singularity"`. | `None` | +| `image` | `str` | The container image to use (e.g., `"ubuntu:20.04"` for Docker or `/path/to/image.sif` for Singularity). | `None` | +| `volumes` | `list` | List of volume mappings (e.g., `"/host/path:/container/path"`). | `None` | +| `env_vars` | `dict` | Dictionary of environment variables to pass to the container (e.g., `{"VAR": "value"}`). | `None` | +| `**kwargs` | `dict` | Additional arguments passed to the executor. | `None` | + +### Returns + +- **`list`**: A list of benchmark data collected from executed jobs. + +### Raises + +- **`ValueError`**: If invalid arguments are provided (e.g., container runtime is missing or invalid, or required arguments for container execution are not supplied). +- **`OSError`**: If the job fails during execution. + +--- + +## Examples + +### Running a Job with Docker + +To execute a job using Docker, specify the `container_runtime` as `"docker"` and provide an image. Optionally, bind host directories to container directories using `volumes`, and pass environment variables with `env_vars`. + +```python +P.run( + statement_list=["echo 'Hello from Docker'"], + container_runtime="docker", + image="ubuntu:20.04", + volumes=["/data:/data"], + env_vars={"MY_VAR": "value"} +) +``` + +This will construct and execute the following Docker command: + +```bash +docker run --rm -v /data:/data -e MY_VAR=value ubuntu:20.04 /bin/bash -c 'echo Hello from Docker' +``` + +### Running a Job with Singularity + +To execute a job using Singularity, specify the `container_runtime` as `"singularity"` and provide a Singularity Image File (SIF). Similarly, you can bind host directories and set environment variables. + +```python +P.run( + statement_list=["echo 'Hello from Singularity'"], + container_runtime="singularity", + image="/path/to/image.sif", + volumes=["/data:/data"], + env_vars={"MY_VAR": "value"} +) +``` + +This will construct and execute the following Singularity command: + +```bash +singularity exec --bind /data:/data --env MY_VAR=value /path/to/image.sif /bin/bash -c 'echo Hello from Singularity' +``` + +--- + +## Usage Notes + +1. **Container Runtime Selection**: + - Use `"docker"` for Docker-based container execution. + - Use `"singularity"` for Singularity-based container execution. + - Ensure the appropriate runtime is installed and available on the system. + +2. **Environment Variables**: + - Use the `env_vars` argument to pass environment variables to the container. + +3. **Volume Mapping**: + - Use the `volumes` argument to bind directories between the host system and the container. + - Docker: Use `["/host/path:/container/path"]`. + - Singularity: Use `["/host/path:/container/path"]`. + +4. **Validation**: + - If `container_runtime` is not specified, container-specific arguments such as `volumes`, `env_vars`, and `image` cannot be used. + - A valid container image must be provided if `container_runtime` is specified. + +--- + +## Error Handling + +- **Invalid Configurations**: + - Raises `ValueError` for invalid configurations, such as: + - Missing container runtime. + - Missing or invalid container image. + - Incompatible arguments (e.g., volumes provided without a container runtime). + +- **Job Failures**: + - Automatically cleans up failed jobs, including temporary files and job outputs. + +--- + +## Implementation Details + +Internally, `P.run()` constructs the appropriate command based on the specified runtime and arguments: + +### Docker + +For Docker, the command is constructed as follows: +```bash +docker run --rm -v /host/path:/container/path -e VAR=value image /bin/bash -c 'statement' +``` + +### Singularity + +For Singularity, the command is constructed as follows: +```bash +singularity exec --bind /host/path:/container/path --env VAR=value image /bin/bash -c 'statement' +``` + +Both commands ensure proper execution and clean-up after the job completes. + +--- + +## Contributing + +To add or enhance containerisation functionality, ensure: +1. New features or fixes support both Docker and Singularity. +2. Unit tests cover all edge cases for container runtime usage. +3. Updates to this documentation reflect changes in functionality. + +--- + +## Adding to MkDocs + +Save this content in a markdown file (e.g., `docs/container_execution.md`) and add it to the `mkdocs.yml` navigation: + +```yaml +nav: + - Home: index.md + - P.run: + - Docker and Singularity: container_execution.md +``` + +This provides a clear, accessible reference for users leveraging containerisation with `P.run()`. diff --git a/docs/container/whole_pipeline.md b/docs/container/whole_pipeline.md new file mode 100644 index 00000000..06842929 --- /dev/null +++ b/docs/container/whole_pipeline.md @@ -0,0 +1,91 @@ +# Container Configuration for Entire Pipeline + +This document describes how to use the `Pipeline` class from `cgatcore.pipeline` to configure container settings **for the entire pipeline**. Unlike configuring individual jobs with container support, this method allows you to set up a consistent execution environment for all tasks across the entire workflow. This is useful for ensuring reproducibility and simplifying pipeline management. + +## Overview + +The `Pipeline` class from `cgatcore.pipeline` allows you to: +- Configure container support for tasks. +- Set up Docker or Singularity containers with environment variables and volume mappings. +- Seamlessly execute multiple tasks inside containers. +- Configure container settings for the entire pipeline, ensuring consistent execution environments across all tasks. + +By configuring the container support at the pipeline level, all commands that are run through `P.run()` will automatically use the specified container settings. + +--- + +## Usage Examples + +### Setting Docker as the Default Runtime for the Entire Pipeline + +Below is an example of how to use the `Pipeline` class to configure and execute all tasks in the pipeline within a Docker container: + +```python +from cgatcore.pipeline import Pipeline + +# Create a pipeline instance +P = Pipeline() + +# Configure container support for Docker for the entire pipeline +P.set_container_config( + image="ubuntu:20.04", + volumes=["/data:/data", "/reference:/reference"], + env_vars={"THREADS": "4", "PATH": "/usr/local/bin:$PATH"}, + runtime="docker" +) + +# Define and run tasks - these will all run in the specified Docker container +P.run([ + "bwa mem /reference/genome.fa /data/sample1.fastq > /data/sample1.bam", + "bwa mem /reference/genome.fa /data/sample2.fastq > /data/sample2.bam" +]) +``` + +### Setting Singularity as the Default Runtime for the Entire Pipeline + +Similarly, the following example shows how to use Singularity for all tasks in the pipeline: + +```python +from cgatcore.pipeline import Pipeline + +# Create a pipeline instance +P = Pipeline() + +# Configure container support for Singularity for the entire pipeline +P.set_container_config( + image="/path/to/ubuntu.sif", + volumes=["/data:/data", "/reference:/reference"], + env_vars={"THREADS": "4", "PATH": "/usr/local/bin:$PATH"}, + runtime="singularity" +) + +# Define and run tasks - these will all run in the specified Singularity container +P.run([ + "bwa mem /reference/genome.fa /data/sample1.fastq > /data/sample1.bam", + "bwa mem /reference/genome.fa /data/sample2.fastq > /data/sample2.bam" +]) +``` + +## When to Use This Approach + +This configuration approach is ideal when: +- You want **all tasks in the pipeline** to run in the same controlled container environment without having to configure container support repeatedly for each individual command. +- Consistency and reproducibility are essential, as this ensures that all tasks use the same software versions, dependencies, and environment. +- You are managing complex workflows where each step depends on a well-defined environment, avoiding any variations that may arise if each step had to be configured separately. + +## Differences from Per-Command Containerisation + +- **Pipeline-Level Configuration**: Use `P.set_container_config()` to set the container settings for the entire pipeline. Every task executed through `P.run()` will use this configuration by default. +- **Per-Command Containerisation**: Use container-specific arguments in `P.run()` for each task individually, which allows different tasks to use different container settings if needed. This is covered in the separate documentation titled **Containerised Execution in `P.run()`**. + +--- + +## Conclusion + +The `Pipeline` class provides an efficient way to standardise the execution environment across all pipeline tasks. By setting container configurations at the pipeline level: +- **All tasks** will use the same Docker or Singularity environment. +- **Configuration is centralised**, reducing redundancy and the risk of errors. +- **Portability** and **reproducibility** are enhanced, making this approach particularly useful for workflows requiring a consistent environment across multiple stages. + +With these examples, users can set up a fully containerised workflow environment for all stages of their pipeline, ensuring robust and repeatable results. + diff --git a/mkdocs.yml b/mkdocs.yml index 3c4b089a..8abf2a6a 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -30,6 +30,9 @@ nav: - Execution: pipeline_modules/execution.md - Utils: pipeline_modules/utils.md - Parameters: pipeline_modules/parameters.md + - Container support: + - Individual tasks: container/tasks.md + - Whole pipeline: docs/container/whole_pipeline.md - S3 Cloud: - S3 Pipeline: s3_integration/s3_pipeline.md - S3 Decorators: s3_integration/s3_decorators.md diff --git a/tests/test_container.py b/tests/test_container.py new file mode 100644 index 00000000..1be9c4aa --- /dev/null +++ b/tests/test_container.py @@ -0,0 +1,140 @@ +import pytest +from unittest.mock import MagicMock, patch +from cgatcore.pipeline.execution import run, Executor + +mocked_params = { + "cluster": { + "options": "", + "queue": None, + "memory_default": "4G", + "tmpdir": "/tmp", + "monitor_interval_queued_default": 30, + "monitor_interval_running_default": 30, + }, + "cluster_tmpdir": "/tmp", + "tmpdir": "/tmp", + "work_dir": "/tmp", + "os": "Linux", +} + + +@patch("cgatcore.pipeline.execution.get_params", return_value=mocked_params) +def test_run_with_container_support(mock_get_params): + """Test running a command with container support.""" + with patch("cgatcore.pipeline.execution.subprocess.Popen") as mock_popen: + mock_process = MagicMock() + mock_process.communicate.return_value = (b"Hello from Docker", b"") + mock_process.returncode = 0 + mock_process.pid = 12345 + mock_popen.return_value = mock_process + + # Use Executor instance + executor = Executor() + + # Mock the method that collects benchmark data + with patch.object(executor, "collect_benchmark_data", return_value=None) as mock_collect_benchmark: + executor.run( + statement_list=["echo Hello from Docker"], + container_runtime="docker", + image="ubuntu:20.04", + ) + + mock_popen.assert_called_once() + actual_call = mock_popen.call_args[0][0] + print(f"Actual call to subprocess.Popen: {actual_call}") + assert "docker run --rm" in actual_call + assert "ubuntu:20.04" in actual_call + assert "echo Hello from Docker" in actual_call + + # Validate that collect_benchmark_data was called + mock_collect_benchmark.assert_called_once() + + +@patch("cgatcore.pipeline.execution.get_params", return_value=mocked_params) +def test_run_without_container_support(mock_get_params): + """Test running a command without container support.""" + with patch("cgatcore.pipeline.execution.subprocess.Popen") as mock_popen: + mock_process = MagicMock() + mock_process.communicate.return_value = (b"Hello from local execution", b"") + mock_process.returncode = 0 + mock_process.pid = 12345 + mock_popen.return_value = mock_process + + # Use Executor instance + executor = Executor() + + # Mock the method that collects benchmark data + with patch.object(executor, "collect_benchmark_data", return_value=None) as mock_collect_benchmark: + executor.run(statement_list=["echo Hello from local execution"]) + + mock_popen.assert_called_once() + actual_call = mock_popen.call_args[0][0] + print(f"Actual call to subprocess.Popen: {actual_call}") + assert "echo Hello from local execution" in actual_call + + # Validate that collect_benchmark_data was called + mock_collect_benchmark.assert_called_once() + + +@patch("cgatcore.pipeline.execution.get_params", return_value=mocked_params) +def test_invalid_container_runtime(mock_get_params): + """Test handling of invalid container runtime.""" + with pytest.raises(ValueError, match="Container runtime must be 'docker' or 'singularity'"): + executor = Executor() + executor.run(statement_list=["echo Test"], container_runtime="invalid_runtime") + + +@patch("cgatcore.pipeline.execution.get_params", return_value=mocked_params) +def test_missing_required_params(mock_get_params): + """Test handling of missing required parameters.""" + with pytest.raises(ValueError, match="An image must be specified when using a container runtime"): + executor = Executor() + executor.run(statement_list=["echo Test"], container_runtime="docker") + + +@patch("cgatcore.pipeline.execution.get_params", return_value=mocked_params) +@patch("cgatcore.pipeline.execution.Executor.cleanup_failed_job") +def test_cleanup_on_failure(mock_cleanup, mock_get_params): + """Test cleanup logic when a job fails.""" + from cgatcore.pipeline.execution import Executor # Ensure proper import + + # Create an instance of Executor + executor = Executor() + + with patch("cgatcore.pipeline.execution.subprocess.Popen") as mock_popen: + # Mock a process failure + mock_process = MagicMock() + mock_process.communicate.return_value = (b"", b"Some error occurred") + mock_process.returncode = 1 # Simulate failure + mock_popen.return_value = mock_process + + # Attempt to run a failing command + with pytest.raises(OSError, match="Job failed with return code"): + executor.run( + statement_list=["echo This will fail"], + container_runtime="docker", # Pass a valid container_runtime + image="ubuntu:20.04" # Add a valid image since container_runtime is provided + ) + + # Ensure cleanup_failed_job was called + mock_cleanup.assert_called_once() + print(f"Arguments to cleanup_failed_job: {mock_cleanup.call_args}") + + # Check subprocess was invoked + mock_popen.assert_called_once() + print(f"Subprocess call: {mock_popen.call_args_list}") + + +@patch("cgatcore.pipeline.execution.get_params", return_value=mocked_params) +def test_job_tracking(mock_get_params): + """Test job tracking lifecycle.""" + with patch("cgatcore.pipeline.execution.subprocess.Popen") as mock_popen: + mock_process = MagicMock() + mock_process.communicate.return_value = (b"output", b"") + mock_process.returncode = 0 + mock_process.pid = 12345 + mock_popen.return_value = mock_process + + run(statement=["echo Job tracking test"]) + + mock_popen.assert_called_once() diff --git a/tests/test_container_config.py b/tests/test_container_config.py new file mode 100644 index 00000000..6258dd63 --- /dev/null +++ b/tests/test_container_config.py @@ -0,0 +1,81 @@ +import unittest +from unittest.mock import patch, MagicMock +from cgatcore.pipeline.execution import Executor, ContainerConfig + + +class TestContainerConfig(unittest.TestCase): + def setUp(self): + """Set up a mock for get_params and an Executor instance.""" + patcher = patch("cgatcore.pipeline.execution.get_params") + self.mock_get_params = patcher.start() + self.addCleanup(patcher.stop) + + # Mock the return value of get_params + self.mock_get_params.return_value = { + "cluster": { + "options": "", + "queue": None, + "memory_default": "4G", + "tmpdir": "/tmp", + "monitor_interval_queued_default": 30, + "monitor_interval_running_default": 30, + }, + "cluster_tmpdir": "/tmp", + "tmpdir": "/tmp", + "work_dir": "/tmp", + "os": "Linux", + } + + self.executor = Executor() + + def test_set_container_config_docker(self): + """Test setting container configuration for Docker.""" + self.executor.set_container_config( + image="ubuntu:20.04", + volumes=["/data:/data", "/reference:/reference"], + env_vars={"TEST_VAR": "value"}, + runtime="docker" + ) + config = self.executor.container_config + self.assertIsInstance(config, ContainerConfig) + self.assertEqual(config.image, "ubuntu:20.04") + self.assertEqual(config.runtime, "docker") + self.assertIn("/data:/data", config.volumes) + self.assertIn("/reference:/reference", config.volumes) + self.assertEqual(config.env_vars["TEST_VAR"], "value") + + def test_set_container_config_singularity(self): + """Test setting container configuration for Singularity.""" + self.executor.set_container_config( + image="/path/to/container.sif", + volumes=["/data:/data", "/reference:/reference"], + env_vars={"TEST_VAR": "value"}, + runtime="singularity" + ) + config = self.executor.container_config + self.assertIsInstance(config, ContainerConfig) + self.assertEqual(config.image, "/path/to/container.sif") + self.assertEqual(config.runtime, "singularity") + self.assertIn("/data:/data", config.volumes) + self.assertIn("/reference:/reference", config.volumes) + self.assertEqual(config.env_vars["TEST_VAR"], "value") + + def test_invalid_runtime(self): + """Test setting an invalid container runtime.""" + with self.assertRaises(ValueError) as context: + self.executor.set_container_config( + image="ubuntu:20.04", runtime="invalid_runtime" + ) + self.assertIn("Unsupported container runtime", str(context.exception)) + + def test_missing_image(self): + """Test setting container configuration without an image.""" + with self.assertRaises(ValueError) as context: + self.executor.set_container_config( + image=None, runtime="docker" + ) + self.assertIn("An image must be specified", str(context.exception)) + + +if __name__ == "__main__": + unittest.main()