Skip to content

Commit

Permalink
Merge pull request #191 from cgat-developers/AC-container
Browse files Browse the repository at this point in the history
added container (docker) functionality to the run statement
  • Loading branch information
Acribbs authored Dec 3, 2024
2 parents 3dfbd94 + 6fc6b0e commit 243f74e
Show file tree
Hide file tree
Showing 7 changed files with 601 additions and 9 deletions.
1 change: 1 addition & 0 deletions all-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ pytest -v tests/test_pipeline_cli.py
pytest -v tests/test_pipeline_actions.py
pytest -v tests/test_execution_cleanup.py
pytest -v tests/test_s3_decorators.py
pytest -v tests/test_container.py
130 changes: 121 additions & 9 deletions cgatcore/pipeline/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,60 @@ def _pickle_args(args, kwargs):
return (submit_args, args_file)


class ContainerConfig:
"""Container configuration for pipeline execution."""

def __init__(self, image=None, volumes=None, env_vars=None, runtime="docker"):
"""
Args:
image (str): Container image (e.g., "ubuntu:20.04").
volumes (list): Volume mappings (e.g., ['/data:/data']).
env_vars (dict): Environment variables for the container.
runtime (str): Container runtime ("docker" or "singularity").
"""
self.image = image
self.volumes = volumes or []
self.env_vars = env_vars or {}
self.runtime = runtime.lower() # Normalise to lowercase

if self.runtime not in ["docker", "singularity"]:
raise ValueError("Unsupported container runtime: {}".format(self.runtime))

def get_container_command(self, statement):
"""Convert a statement to run inside a container."""
if not self.image:
return statement

if self.runtime == "docker":
return self._get_docker_command(statement)
elif self.runtime == "singularity":
return self._get_singularity_command(statement)
else:
raise ValueError("Unsupported container runtime: {}".format(self.runtime))

def _get_docker_command(self, statement):
"""Generate a Docker command."""
volume_args = [f"-v {volume}" for volume in self.volumes]
env_args = [f"-e {key}={value}" for key, value in self.env_vars.items()]

return " ".join([
"docker", "run", "--rm",
*volume_args, *env_args, self.image,
"/bin/bash", "-c", f"'{statement}'"
])

def _get_singularity_command(self, statement):
"""Generate a Singularity command."""
volume_args = [f"--bind {volume}" for volume in self.volumes]
env_args = [f"--env {key}={value}" for key, value in self.env_vars.items()]

return " ".join([
"singularity", "exec",
*volume_args, *env_args, self.image,
"bash", "-c", f"'{statement}'"
])


def start_session():
"""start and initialize the global DRMAA session."""
global GLOBAL_SESSION
Expand Down Expand Up @@ -739,6 +793,13 @@ def get_val(d, v, alt):

return benchmark_data

def set_container_config(self, image, volumes=None, env_vars=None, runtime="docker"):
"""Set container configuration for all tasks executed by this executor."""

if not image:
raise ValueError("An image must be specified for the container configuration.")
self.container_config = ContainerConfig(image=image, volumes=volumes, env_vars=env_vars, runtime=runtime)

def start_job(self, job_info):
"""Add a job to active_jobs list when it starts."""
self.active_jobs.append(job_info)
Expand Down Expand Up @@ -788,15 +849,63 @@ def cleanup_failed_job(self, job_info):
else:
self.logger.info(f"Output file not found (already removed or not created): {outfile}")

def run(self, statement_list):
"""Run a list of statements and track each job's lifecycle."""
def run(
self,
statement_list,
job_memory=None,
job_threads=None,
container_runtime=None,
image=None,
volumes=None,
env_vars=None,
**kwargs,):

"""
Execute a list of statements with optional container support.
Args:
statement_list (list): List of commands to execute.
job_memory (str): Memory requirements (e.g., "4G").
job_threads (int): Number of threads to use.
container_runtime (str): Container runtime ("docker" or "singularity").
image (str): Container image to use.
volumes (list): Volume mappings (e.g., ['/data:/data']).
env_vars (dict): Environment variables for the container.
**kwargs: Additional arguments.
"""
# Validation checks
if container_runtime and container_runtime not in ["docker", "singularity"]:
self.logger.error(f"Invalid container_runtime: {container_runtime}")
raise ValueError("Container runtime must be 'docker' or 'singularity'")

if container_runtime and not image:
self.logger.error(f"Container runtime specified without an image: {container_runtime}")
raise ValueError("An image must be specified when using a container runtime")

benchmark_data = []

for statement in statement_list:
job_info = {"statement": statement}
self.start_job(job_info) # Add job to active_jobs
self.start_job(job_info)

try:
# Execute job
# Prepare containerized execution
if container_runtime:
self.set_container_config(image=image, volumes=volumes, env_vars=env_vars, runtime=container_runtime)
statement = self.container_config.get_container_command(statement)

# Add memory and thread environment variables
if job_memory:
env_vars = env_vars or {}
env_vars["JOB_MEMORY"] = job_memory
if job_threads:
env_vars = env_vars or {}
env_vars["JOB_THREADS"] = job_threads

# Debugging: Log the constructed command
self.logger.info(f"Executing command: {statement}")

# Build and execute the statement
full_statement, job_path = self.build_job_script(statement)
process = subprocess.Popen(
full_statement, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
Expand All @@ -806,19 +915,22 @@ def run(self, statement_list):
if process.returncode != 0:
raise OSError(
f"Job failed with return code {process.returncode}.\n"
f"stderr: {stderr.decode('utf-8')}\nstatement: {statement}"
f"stderr: {stderr.decode('utf-8')}\ncommand: {statement}"
)

# Collect benchmark data if job was successful
# Collect benchmark data for successful jobs
benchmark_data.append(
self.collect_benchmark_data([statement], resource_usage=[{"job_id": process.pid}])
self.collect_benchmark_data(
statement, resource_usage={"job_id": process.pid}
)
)
self.finish_job(job_info) # Remove job from active_jobs
self.finish_job(job_info)

except Exception as e:
self.logger.error(f"Job failed: {e}")
self.cleanup_failed_job(job_info)
continue
if not self.ignore_errors:
raise

return benchmark_data

Expand Down
164 changes: 164 additions & 0 deletions docs/container/tasks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# Containerised Execution in `P.run()`

The `P.run()` method supports executing jobs within container environments using **Docker** or **Singularity**. This functionality enables seamless integration of containerisation for computational workflows.

## Features

- **Container Runtime Support**: Execute jobs using either Docker or Singularity.
- **Environment Variables**: Pass custom environment variables to the container.
- **Volume Mapping**: Bind directories between the host system and the container.
- **Container-Specific Command Construction**: Automatically builds the appropriate command for Docker or Singularity.

---

## API Documentation

### `P.run()`

The `P.run()` method executes a list of commands with optional support for containerisation via Docker or Singularity.

### Parameters

| Parameter | Type | Description | Default |
|---------------------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------|
| `statement_list` | `list` | List of commands (statements) to execute. | Required |
| `job_memory` | `str` | Memory requirements for the job (e.g., `"4G"`). | `None` |
| `job_threads` | `int` | Number of threads to use. | `None` |
| `container_runtime` | `str` | Container runtime to use. Must be `"docker"` or `"singularity"`. | `None` |
| `image` | `str` | The container image to use (e.g., `"ubuntu:20.04"` for Docker or `/path/to/image.sif` for Singularity). | `None` |
| `volumes` | `list` | List of volume mappings (e.g., `"/host/path:/container/path"`). | `None` |
| `env_vars` | `dict` | Dictionary of environment variables to pass to the container (e.g., `{"VAR": "value"}`). | `None` |
| `**kwargs` | `dict` | Additional arguments passed to the executor. | `None` |

### Returns

- **`list`**: A list of benchmark data collected from executed jobs.

### Raises

- **`ValueError`**: If invalid arguments are provided (e.g., container runtime is missing or invalid, or required arguments for container execution are not supplied).
- **`OSError`**: If the job fails during execution.

---

## Examples

### Running a Job with Docker

To execute a job using Docker, specify the `container_runtime` as `"docker"` and provide an image. Optionally, bind host directories to container directories using `volumes`, and pass environment variables with `env_vars`.

```python
P.run(
statement_list=["echo 'Hello from Docker'"],
container_runtime="docker",
image="ubuntu:20.04",
volumes=["/data:/data"],
env_vars={"MY_VAR": "value"}
)
```

This will construct and execute the following Docker command:

```bash
docker run --rm -v /data:/data -e MY_VAR=value ubuntu:20.04 /bin/bash -c 'echo Hello from Docker'
```

### Running a Job with Singularity

To execute a job using Singularity, specify the `container_runtime` as `"singularity"` and provide a Singularity Image File (SIF). Similarly, you can bind host directories and set environment variables.

```python
P.run(
statement_list=["echo 'Hello from Singularity'"],
container_runtime="singularity",
image="/path/to/image.sif",
volumes=["/data:/data"],
env_vars={"MY_VAR": "value"}
)
```

This will construct and execute the following Singularity command:

```bash
singularity exec --bind /data:/data --env MY_VAR=value /path/to/image.sif /bin/bash -c 'echo Hello from Singularity'
```

---

## Usage Notes

1. **Container Runtime Selection**:
- Use `"docker"` for Docker-based container execution.
- Use `"singularity"` for Singularity-based container execution.
- Ensure the appropriate runtime is installed and available on the system.

2. **Environment Variables**:
- Use the `env_vars` argument to pass environment variables to the container.

3. **Volume Mapping**:
- Use the `volumes` argument to bind directories between the host system and the container.
- Docker: Use `["/host/path:/container/path"]`.
- Singularity: Use `["/host/path:/container/path"]`.

4. **Validation**:
- If `container_runtime` is not specified, container-specific arguments such as `volumes`, `env_vars`, and `image` cannot be used.
- A valid container image must be provided if `container_runtime` is specified.

---

## Error Handling

- **Invalid Configurations**:
- Raises `ValueError` for invalid configurations, such as:
- Missing container runtime.
- Missing or invalid container image.
- Incompatible arguments (e.g., volumes provided without a container runtime).

- **Job Failures**:
- Automatically cleans up failed jobs, including temporary files and job outputs.

---

## Implementation Details

Internally, `P.run()` constructs the appropriate command based on the specified runtime and arguments:

### Docker

For Docker, the command is constructed as follows:
```bash
docker run --rm -v /host/path:/container/path -e VAR=value image /bin/bash -c 'statement'
```

### Singularity

For Singularity, the command is constructed as follows:
```bash
singularity exec --bind /host/path:/container/path --env VAR=value image /bin/bash -c 'statement'
```

Both commands ensure proper execution and clean-up after the job completes.

---

## Contributing

To add or enhance containerisation functionality, ensure:
1. New features or fixes support both Docker and Singularity.
2. Unit tests cover all edge cases for container runtime usage.
3. Updates to this documentation reflect changes in functionality.

---

## Adding to MkDocs

Save this content in a markdown file (e.g., `docs/container_execution.md`) and add it to the `mkdocs.yml` navigation:

```yaml
nav:
- Home: index.md
- P.run:
- Docker and Singularity: container_execution.md
```
This provides a clear, accessible reference for users leveraging containerisation with `P.run()`.
Loading

0 comments on commit 243f74e

Please sign in to comment.