Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ac kubernetes #182

Merged
merged 22 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6daf925
included initial kubernetes module
Acribbs Nov 3, 2024
6ec06c1
included new get executor function and replaced make_runner to includ…
Acribbs Nov 3, 2024
fdd9555
made changes to kubernetes imports
Acribbs Nov 3, 2024
8f33f53
have included a new BaseExecutior and included inherited executor cla…
Acribbs Nov 10, 2024
b83fb7f
Merge branch 'master' into AC-kubernetes
Acribbs Nov 10, 2024
5245d77
added option to run LocalExecutor if in testing
Acribbs Nov 10, 2024
0f40522
added python kubernetes import to conda
Acribbs Nov 10, 2024
ba6edea
have made localExecutor the fallback Executor
Acribbs Nov 10, 2024
0ab6ad0
to_cluster False defaults to localExecutor
Acribbs Nov 10, 2024
69aa7ba
re-written tests for the new Executors and have tested all of them wi…
Acribbs Nov 11, 2024
38e8946
Merge branch 'master' into AC-kubernetes
Acribbs Nov 11, 2024
884d0bd
merge master
Acribbs Dec 3, 2024
216e628
Merge branch 'master' into AC-kubernetes
Acribbs Jan 1, 2025
e99c399
Missing slurm monitor_job_completion function not originally implemented
Acribbs Jan 1, 2025
1260fd1
Added tests to check slurm job monitoring
Acribbs Jan 1, 2025
d5db251
Added tests for other job monitoring for sge and torque as they were …
Acribbs Jan 1, 2025
24fd1e8
Added missing collect_benchmark_data to executors
Acribbs Jan 1, 2025
87936f0
added collect_benchmark_data to baseexecutor
Acribbs Jan 1, 2025
594b551
updated kubernetes executor with new benchmakring data
Acribbs Jan 1, 2025
0c52a7b
Added docuementation for kubernetes
Acribbs Jan 1, 2025
1764d16
Have added Coverage tests
Acribbs Jan 1, 2025
1a990a8
removed coverage tests
Acribbs Jan 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
codecov:
notify:
require_ci_to_pass: yes

coverage:
precision: 2
round: down
paths:
- ./cgatcore/**
ignore:
- tests/**
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# CGAT-core

[![codecov](https://codecov.io/gh/cgat-developers/cgat-core/branch/main/graph/badge.svg)](https://codecov.io/gh/cgat-developers/cgat-core)

![CGAT-core](https://github.com/cgat-developers/cgat-core/blob/master/docs/img/CGAT_logo.png)
----------------------------------------
Expand All @@ -24,4 +27,3 @@ Installation
The following sections describe how to install the [cgatcore](https://cgat-developers.github.io/cgat-core/) framework.

The preferred method to install the cgatcore is using conda, by following the instructions on [read the docs](https://cgat-core.readthedocs.io/en/latest/getting_started/Installation.html). However, there are a few other methods to install cgatcore, including pip and our own bash script installer.

66 changes: 66 additions & 0 deletions cgatcore/pipeline/base_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# cgatcore/pipeline/base_executor.py
import os
import tempfile


class BaseExecutor:
"""Base class for executors that defines the interface for running jobs."""

def __init__(self, **kwargs):
"""Initialize the executor with configuration options."""
self.config = kwargs
self.task_name = "base_task" # Should be overridden by subclasses
self.default_total_time = 0 # Should be overridden by subclasses

def run(self, statement, *args, **kwargs):
"""Run the given job statement. This should be implemented by subclasses."""
raise NotImplementedError("Subclasses must implement this method")

def collect_metric_data(self, *args, **kwargs):
"""Collect metric data if needed."""
raise NotImplementedError("Subclasses must implement this method")

def collect_benchmark_data(self, statements, resource_usage=None):
"""Collect benchmark data for job execution.

Args:
statements (list): List of executed statements
resource_usage (list, optional): Resource usage data

Returns:
dict: Benchmark data including task name and execution time
"""
return {
"task": self.task_name,
"total_t": self.default_total_time,
"statements": statements,
"resource_usage": resource_usage or []
}

def build_job_script(self, statement):
"""Build a simple job script for execution.
Args:
statement (str): The command or script to be executed.
Returns:
tuple: A tuple containing the full command (as a string) and the path where the job script is stored.
"""

job_script_dir = self.config.get("job_script_dir", tempfile.gettempdir())
os.makedirs(job_script_dir, exist_ok=True)

script_path = os.path.join(job_script_dir, "job_script.sh")
with open(script_path, "w") as script_file:
script_file.write(f"#!/bin/bash\n\n{statement}\n")

os.chmod(script_path, 0o755) # Make it executable
return statement, script_path

def __enter__(self):
"""Enter the runtime context related to this object."""
# Any initialisation logic needed for the executor can be added here
return self

def __exit__(self, exc_type, exc_value, traceback):
"""Exit the runtime context related to this object."""
# Cleanup logic, if any, can be added here
pass
107 changes: 65 additions & 42 deletions cgatcore/pipeline/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
from cgatcore.pipeline.files import get_temp_filename, get_temp_dir
from cgatcore.pipeline.parameters import substitute_parameters, get_params
from cgatcore.pipeline.cluster import get_queue_manager, JobInfo
from cgatcore.pipeline.executors import SGEExecutor, SlurmExecutor, TorqueExecutor, LocalExecutor
try:
from cgatcore.pipeline.kubernetes import KubernetesExecutor
except ImportError:
KubernetesExecutor = None # Fallback if Kubernetes is not available


# talking to a cluster
try:
Expand Down Expand Up @@ -377,6 +383,50 @@ def interpolate_statement(statement, kwargs):
return statement


def get_executor(options=None):
"""
Return an executor instance based on the specified queue manager in options.

Parameters:
- options (dict): Dictionary containing execution options,
including "cluster_queue_manager".

Returns:
- Executor instance appropriate for the specified queue manager.
"""
if options is None:
options = get_params()

if options.get("testing", False):
return LocalExecutor(**options)

# Check if to_cluster is explicitly set to False
if not options.get("to_cluster", True): # Defaults to True if not specified
return LocalExecutor(**options)

queue_manager = options.get("cluster_queue_manager", None)

# Check for KubernetesExecutor
if queue_manager == "kubernetes" and KubernetesExecutor is not None:
return KubernetesExecutor(**options)

# Check for SGEExecutor (Sun Grid Engine)
elif queue_manager == "sge" and shutil.which("qsub") is not None:
return SGEExecutor(**options)

# Check for SlurmExecutor
elif queue_manager == "slurm" and shutil.which("sbatch") is not None:
return SlurmExecutor(**options)

# Check for TorqueExecutor
elif queue_manager == "torque" and shutil.which("qsub") is not None:
return TorqueExecutor(**options)

# Fallback to LocalExecutor, not sure if this should raise an error though, feels like it should
else:
return LocalExecutor(**options)


def join_statements(statements, infile, outfile=None):
'''join a chain of statements into a single statement.

Expand Down Expand Up @@ -1271,32 +1321,6 @@ class LocalArrayExecutor(LocalExecutor):
pass


def make_runner(**kwargs):
"""factory function returning an object capable of executing
a list of command line statements.
"""

run_as_array = "job_array" in kwargs and kwargs["job_array"] is not None

# run on cluster if:
# * to_cluster is not defined or set to True
# * command line option without_cluster is set to False
# * an SGE session is present
run_on_cluster = will_run_on_cluster(kwargs)
if run_on_cluster:
if run_as_array:
runner = GridArrayExecutor(**kwargs)
else:
runner = GridExecutor(**kwargs)
else:
if run_as_array:
runner = LocalArrayExecutor(**kwargs)
else:
runner = LocalExecutor(**kwargs)

return runner


def run(statement, **kwargs):
"""run a command line statement.

Expand Down Expand Up @@ -1395,7 +1419,7 @@ def run(statement, **kwargs):
"""
logger = get_logger()

# combine options using priority
# Combine options using priority
options = dict(list(get_params().items()))
caller_options = get_caller_locals()
options.update(list(caller_options.items()))
Expand All @@ -1404,7 +1428,7 @@ def run(statement, **kwargs):
del options["self"]
options.update(list(kwargs.items()))

# inject params named tuple from TaskLibrary functions into option
# Inject params named tuple from TaskLibrary functions into option
# dict. This allows overriding options set in the code with options set
# in a .yml file
if "params" in options:
Expand All @@ -1413,7 +1437,7 @@ def run(statement, **kwargs):
except AttributeError:
pass

# insert parameters supplied through simplified interface such
# Insert parameters supplied through simplified interface such
# as job_memory, job_options, job_queue
options['cluster']['options'] = options.get(
'job_options', options['cluster']['options'])
Expand All @@ -1436,34 +1460,33 @@ def run(statement, **kwargs):

options["task_name"] = calling_module + "." + get_calling_function()

# build statements using parameter interpolation
# Build statements using parameter interpolation
if isinstance(statement, list):
statement_list = []
for stmt in statement:
statement_list.append(interpolate_statement(stmt, options))
statement_list = [interpolate_statement(stmt, options) for stmt in statement]
else:
statement_list = [interpolate_statement(statement, options)]

if len(statement_list) == 0:
logger.warn("no statements found - no execution")
logger.warn("No statements found - no execution")
return []

if options.get("dryrun", False):
for statement in statement_list:
logger.info("dry-run: {}".format(statement))
logger.info("Dry-run: {}".format(statement))
return []

# execute statement list
runner = make_runner(**options)
with runner as r:
benchmark_data = r.run(statement_list)
# Use get_executor to get the appropriate executor
executor = get_executor(options) # Updated to use get_executor

# Execute statement list within the context of the executor
with executor as e:
benchmark_data = e.run(statement_list)

# log benchmark_data
# Log benchmark data
for data in benchmark_data:
logger.info(json.dumps(data))

BenchmarkData = collections.namedtuple(
'BenchmarkData', sorted(benchmark_data[0]))
BenchmarkData = collections.namedtuple('BenchmarkData', sorted(benchmark_data[0]))
return [BenchmarkData(**d) for d in benchmark_data]


Expand Down
Loading
Loading