Skip to content

Commit

Permalink
added set_config for container for the whole pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
Acribbs committed Dec 3, 2024
1 parent 0429425 commit dc6db87
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 57 deletions.
154 changes: 97 additions & 57 deletions cgatcore/pipeline/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,60 @@ def _pickle_args(args, kwargs):
return (submit_args, args_file)


class ContainerConfig:
"""Container configuration for pipeline execution."""

def __init__(self, image=None, volumes=None, env_vars=None, runtime="docker"):
"""
Args:
image (str): Container image (e.g., "ubuntu:20.04").
volumes (list): Volume mappings (e.g., ['/data:/data']).
env_vars (dict): Environment variables for the container.
runtime (str): Container runtime ("docker" or "singularity").
"""
self.image = image
self.volumes = volumes or []
self.env_vars = env_vars or {}
self.runtime = runtime.lower() # Normalise to lowercase

if self.runtime not in ["docker", "singularity"]:
raise ValueError("Unsupported container runtime: {}".format(self.runtime))

def get_container_command(self, statement):
"""Convert a statement to run inside a container."""
if not self.image:
return statement

if self.runtime == "docker":
return self._get_docker_command(statement)
elif self.runtime == "singularity":
return self._get_singularity_command(statement)
else:
raise ValueError("Unsupported container runtime: {}".format(self.runtime))

def _get_docker_command(self, statement):
"""Generate a Docker command."""
volume_args = [f"-v {volume}" for volume in self.volumes]
env_args = [f"-e {key}={value}" for key, value in self.env_vars.items()]

return " ".join([
"docker", "run", "--rm",
*volume_args, *env_args, self.image,
"/bin/bash", "-c", f"'{statement}'"
])

def _get_singularity_command(self, statement):
"""Generate a Singularity command."""
volume_args = [f"--bind {volume}" for volume in self.volumes]
env_args = [f"--env {key}={value}" for key, value in self.env_vars.items()]

return " ".join([
"singularity", "exec",
*volume_args, *env_args, self.image,
"bash", "-c", f"'{statement}'"
])


def start_session():
"""start and initialize the global DRMAA session."""
global GLOBAL_SESSION
Expand Down Expand Up @@ -739,6 +793,13 @@ def get_val(d, v, alt):

return benchmark_data

def set_container_config(self, image, volumes=None, env_vars=None, runtime="docker"):
"""Set container configuration for all tasks executed by this executor."""

if not image:
raise ValueError("An image must be specified for the container configuration.")
self.container_config = ContainerConfig(image=image, volumes=volumes, env_vars=env_vars, runtime=runtime)

def start_job(self, job_info):
"""Add a job to active_jobs list when it starts."""
self.active_jobs.append(job_info)
Expand Down Expand Up @@ -789,31 +850,28 @@ def cleanup_failed_job(self, job_info):
self.logger.info(f"Output file not found (already removed or not created): {outfile}")

def run(
self,
statement_list,
job_memory=None,
job_threads=None,
container_runtime=None,
image=None,
volumes=None,
env_vars=None,
**kwargs,
):
self,
statement_list,
job_memory=None,
job_threads=None,
container_runtime=None,
image=None,
volumes=None,
env_vars=None,
**kwargs,):
"""
Execute a list of statements with optional container support for Docker or Singularity.
Args:
statement_list (list): List of commands to execute.
job_memory (str): Memory requirements (e.g., "4G").
job_threads (int): Number of threads to use.
container_runtime (str): Container runtime to use ("docker" or "singularity").
image (str): Container image to use (e.g., "ubuntu:20.04" or a Singularity SIF path).
volumes (list): List of volume mappings (e.g., ['/data:/data'] for Docker or Singularity).
env_vars (dict): Environment variables for the container.
**kwargs: Additional arguments passed to the executor.
Returns:
list: Benchmark data collected from executed jobs.
Execute a list of statements with optional container support.
Args:
statement_list (list): List of commands to execute.
job_memory (str): Memory requirements (e.g., "4G").
job_threads (int): Number of threads to use.
container_runtime (str): Container runtime ("docker" or "singularity").
image (str): Container image to use.
volumes (list): Volume mappings (e.g., ['/data:/data']).
env_vars (dict): Environment variables for the container.
**kwargs: Additional arguments.
"""
# Validation checks
if container_runtime and container_runtime not in ["docker", "singularity"]:
Expand All @@ -827,41 +885,22 @@ def run(
benchmark_data = []

for statement in statement_list:
job_info = {"statement": statement} # Preserve the original statement
job_info = {"statement": statement}
self.start_job(job_info)

try:
# Prepare for containerised execution
# Prepare containerized execution
if container_runtime:
volume_args = []
env_args = []

if volumes:
volume_flag = "-v" if container_runtime == "docker" else "--bind"
volume_args.extend([f"{volume_flag} {v}" for v in volumes])

if env_vars:
env_flag = "-e" if container_runtime == "docker" else "--env"
env_args.extend([f"{env_flag} {k}={v}" for k, v in env_vars.items()])

if job_memory:
env_args.append(f"{env_flag} JOB_MEMORY={job_memory}")
if job_threads:
env_args.append(f"{env_flag} JOB_THREADS={job_threads}")

# Construct the container command
container_command = [
container_runtime,
"run",
"--rm",
*volume_args,
*env_args,
image,
"/bin/bash",
"-c",
statement,
]
statement = " ".join(container_command) # Keep for debugging/logging
self.set_container_config(image=image, volumes=volumes, env_vars=env_vars, runtime=container_runtime)
statement = self.container_config.get_container_command(statement)

# Add memory and thread environment variables
if job_memory:
env_vars = env_vars or {}
env_vars["JOB_MEMORY"] = job_memory
if job_threads:
env_vars = env_vars or {}
env_vars["JOB_THREADS"] = job_threads

# Debugging: Log the constructed command
self.logger.info(f"Executing command: {statement}")
Expand All @@ -882,15 +921,16 @@ def run(
# Collect benchmark data for successful jobs
benchmark_data.append(
self.collect_benchmark_data(
[statement], resource_usage=[{"job_id": process.pid}]
statement, resource_usage={"job_id": process.pid}
)
)
self.finish_job(job_info)

except Exception as e:
self.logger.error(f"Job failed: {e}")
self.cleanup_failed_job(job_info)
raise # Propagate the exception
if not self.ignore_errors:
raise

return benchmark_data

Expand Down
81 changes: 81 additions & 0 deletions tests/test_container_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import unittest
from unittest.mock import patch, MagicMock
from cgatcore.pipeline.execution import Executor, ContainerConfig


class TestContainerConfig(unittest.TestCase):
def setUp(self):
"""Set up a mock for get_params and an Executor instance."""
patcher = patch("cgatcore.pipeline.execution.get_params")
self.mock_get_params = patcher.start()
self.addCleanup(patcher.stop)

# Mock the return value of get_params
self.mock_get_params.return_value = {
"cluster": {
"options": "",
"queue": None,
"memory_default": "4G",
"tmpdir": "/tmp",
"monitor_interval_queued_default": 30,
"monitor_interval_running_default": 30,
},
"cluster_tmpdir": "/tmp",
"tmpdir": "/tmp",
"work_dir": "/tmp",
"os": "Linux",
}

self.executor = Executor()

def test_set_container_config_docker(self):
"""Test setting container configuration for Docker."""
self.executor.set_container_config(
image="ubuntu:20.04",
volumes=["/data:/data", "/reference:/reference"],
env_vars={"TEST_VAR": "value"},
runtime="docker"
)
config = self.executor.container_config
self.assertIsInstance(config, ContainerConfig)
self.assertEqual(config.image, "ubuntu:20.04")
self.assertEqual(config.runtime, "docker")
self.assertIn("/data:/data", config.volumes)
self.assertIn("/reference:/reference", config.volumes)
self.assertEqual(config.env_vars["TEST_VAR"], "value")

def test_set_container_config_singularity(self):
"""Test setting container configuration for Singularity."""
self.executor.set_container_config(
image="/path/to/container.sif",
volumes=["/data:/data", "/reference:/reference"],
env_vars={"TEST_VAR": "value"},
runtime="singularity"
)
config = self.executor.container_config
self.assertIsInstance(config, ContainerConfig)
self.assertEqual(config.image, "/path/to/container.sif")
self.assertEqual(config.runtime, "singularity")
self.assertIn("/data:/data", config.volumes)
self.assertIn("/reference:/reference", config.volumes)
self.assertEqual(config.env_vars["TEST_VAR"], "value")

def test_invalid_runtime(self):
"""Test setting an invalid container runtime."""
with self.assertRaises(ValueError) as context:
self.executor.set_container_config(
image="ubuntu:20.04", runtime="invalid_runtime"
)
self.assertIn("Unsupported container runtime", str(context.exception))

def test_missing_image(self):
"""Test setting container configuration without an image."""
with self.assertRaises(ValueError) as context:
self.executor.set_container_config(
image=None, runtime="docker"
)
self.assertIn("An image must be specified", str(context.exception))


if __name__ == "__main__":
unittest.main()

0 comments on commit dc6db87

Please sign in to comment.