diff --git a/.github/workflows/cgatcore_python.yml b/.github/workflows/cgatcore_python.yml index 87138ff5..80a0b09f 100644 --- a/.github/workflows/cgatcore_python.yml +++ b/.github/workflows/cgatcore_python.yml @@ -26,17 +26,25 @@ jobs: CACHE_NUMBER: 0 with: path: ~/conda_pkgs_dir - key: - ${{ runner.os }}-conda-${{ env.CACHE_NUMBER }}-${{ - hashFiles('conda/environments/cgat-core.yml') }} + key: ${{ runner.os }}-conda-${{ env.CACHE_NUMBER }}-${{ hashFiles('conda/environments/cgat-core.yml') }} + - name: Set installer URL + id: set-installer-url + run: | + if [[ "${{ matrix.os }}" == "ubuntu-latest" ]]; then + echo "installer-url=https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-Linux-x86_64.sh" >> $GITHUB_ENV + elif [[ "${{ matrix.os }}" == "macos-latest" ]]; then + echo "installer-url=https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-MacOSX-arm64.sh" >> $GITHUB_ENV + fi - uses: conda-incubator/setup-miniconda@v2 with: + installer-url: ${{ env.installer-url }} python-version: ${{ matrix.python-version }} channels: conda-forge, bioconda, defaults channel-priority: true activate-environment: cgat-core environment-file: conda/environments/cgat-core.yml - miniforge-variant: Mambaforge + - name: Configure Conda Paths + run: echo "/usr/share/miniconda3/condabin" >> $GITHUB_PATH - name: Show conda run: | conda info diff --git a/all-tests.sh b/all-tests.sh index 66852e1c..5fc0f527 100755 --- a/all-tests.sh +++ b/all-tests.sh @@ -16,3 +16,4 @@ pytest -v tests/test_pipeline_control.py pytest -v tests/test_pipeline_execution.py pytest -v tests/test_pipeline_cli.py pytest -v tests/test_pipeline_actions.py +pytest -v tests/test_execution_cleanup.py diff --git a/cgatcore/pipeline/control.py b/cgatcore/pipeline/control.py index ec922402..3151a787 100644 --- a/cgatcore/pipeline/control.py +++ b/cgatcore/pipeline/control.py @@ -50,7 +50,7 @@ from cgatcore.experiment import get_header, MultiLineFormatter from cgatcore.pipeline.utils import get_caller, get_caller_locals, is_test from cgatcore.pipeline.execution import execute, start_session, \ - close_session + close_session, Executor # redirect os.stat and other OS utilities to cached versions to speed @@ -869,6 +869,9 @@ def parse_commandline(argv=None, optparse=True, **kwargs): type=str, help="working directory. Will be created if it does not exist") + parser.add_argument("--cleanup-on-fail", action="store_true", default=True, + help="Enable cleanup of jobs on pipeline failure.") + group = parser.add_argument_group("pipeline logging configuration") group.add_argument("--pipeline-logfile", dest="pipeline_logfile", @@ -1256,239 +1259,151 @@ def initialize(argv=None, caller=None, defaults=None, optparse=True, **kwargs): def run_workflow(args, argv=None, pipeline=None): - """run workflow given options in args. - - argv is kept for backwards compatibility. - """ - logger = logging.getLogger("cgatcore.pipeline") + logger.debug(f"Starting run_workflow with action {args.pipeline_action}") + + # Instantiate Executor to manage job tracking and cleanup + executor = Executor(job_threads=args.multiprocess, work_dir=get_params()["work_dir"]) + executor.setup_signal_handlers() # Set up signal handlers for cleanup on interruption + + # Determine tasks to force-run if specified + forcedtorun_tasks = ( + ruffus.pipeline_get_task_names() if args.force_run == "all" else args.pipeline_targets + ) if args.force_run else [] + + # Start workflow execution based on the specified action + try: + # Ensure the temporary directory exists + if not os.path.exists(get_params()["tmpdir"]): + logger.warn(f"Local temporary directory {get_params()['tmpdir']} did not exist - created") + try: + os.makedirs(get_params()["tmpdir"]) + except OSError: + pass - logger.debug("starting run_workflow with action {}".format( - args.pipeline_action)) + logger.info(f"Temporary directory is {get_params()['tmpdir']}") - if args.force_run: - if args.force_run == "all": - forcedtorun_tasks = ruffus.pipeline_get_task_names() - else: - forcedtorun_tasks = args.pipeline_targets - else: - forcedtorun_tasks = [] - - # create local scratch if it does not already exists. Note that - # directory itself will be not deleted while its contents should - # be cleaned up. - if not os.path.exists(get_params()["tmpdir"]): - logger.warn("local temporary directory {} did not exist - created".format( - get_params()["tmpdir"])) - try: - os.makedirs(get_params()["tmpdir"]) - except OSError: - # file exists - pass - - logger.info("temporary directory is {}".format(get_params()["tmpdir"])) - - # set multiprocess to a sensible setting if there is no cluster - run_on_cluster = HAS_DRMAA is True and not args.without_cluster - if args.multiprocess is None: - if not run_on_cluster: - args.multiprocess = int(math.ceil( - multiprocessing.cpu_count() / 2.0)) - else: - args.multiprocess = 40 - - # see inputValidation function in Parameters.py - if args.input_validation: - input_validation(get_params(), sys.argv[0]) - - elif args.pipeline_action == "debug": - # create the session proxy - start_session() - - method_name = args.pipeline_targets[0] - caller = get_caller() - method = getattr(caller, method_name) - method(*args.pipeline_targets[1:]) - - elif args.pipeline_action in ("make", - "show", - "state", - "svg", - "plot", - "dot", - "touch", - "regenerate"): - - messenger = None - try: + # Configure multiprocessing settings + run_on_cluster = HAS_DRMAA and not args.without_cluster + args.multiprocess = args.multiprocess or ( + int(math.ceil(multiprocessing.cpu_count() / 2.0)) if not run_on_cluster else 40 + ) + + # Start pipeline session for 'make' action + if args.pipeline_action == "make": + start_session() + try: + # Run pipeline and catch any errors + ruffus.pipeline_run( + args.pipeline_targets, forcedtorun_tasks=forcedtorun_tasks, + logger=logger, verbose=args.loglevel, log_exceptions=args.log_exceptions, + exceptions_terminate_immediately=args.exceptions_terminate_immediately, + checksum_level=args.ruffus_checksums_level, pipeline=pipeline, + one_second_per_job=False + ) + + except ruffus.ruffus_exceptions.RethrownJobError as ex: + if not args.debug: + # Summarise errors if debug mode is off + error_summary = f"{len(ex.args)} tasks encountered errors. Summary:" + error_messages = [] + for idx, e in enumerate(ex.args): + task, job, error, msg, traceback = e + task = re.sub("__main__.", "", task or 'Unknown task') + job = re.sub(r"\s", "", job or 'Unknown job') + msg = msg if isinstance(msg, str) else str(msg) if msg else "No specific message" + if len(msg.splitlines()) > 1: + msg = "" # Show only single-line messages + error_messages.append( + f"{idx + 1}: Task={task}, Error={error}, Job={job}, Message={msg}" + ) + + E.error(error_summary) + for message in error_messages: + E.error(message) + E.error(f"Full traceback can be found in {args.pipeline_logfile}") + logger.error("Start of all error messages") + logger.error(ex) + logger.error("End of all error messages") + + # Execute cleanup if configured + if getattr(args, "cleanup_on_fail", True): # Check if cleanup is enabled on fail + logger.info("Cleaning up all jobs due to pipeline failure.") + executor.cleanup_all_jobs() + + raise ValueError("Pipeline failed with errors") from ex + finally: + # Close pipeline session + close_session() + + elif args.pipeline_action in ( + "show", "touch", "regenerate", "svg", "state" + ): with cache_os_functions(): - if args.pipeline_action == "make": - - if not args.without_cluster and not HAS_DRMAA and not get_params()['testing']: - E.critical( - "DRMAA API not found so cannot talk to a cluster.") - E.critical("Please use --local to run the pipeline" - " on this host: {}".format(os.uname()[1])) - sys.exit(-1) - - # get tasks to be done. This essentially replicates - # the state information within ruffus. - stream = StringIO() + if args.pipeline_action == "show": ruffus.pipeline_printout( - stream, - args.pipeline_targets, - verbose=5, - pipeline=pipeline, - checksum_level=args.ruffus_checksums_level) - - messenger = LoggingFilterProgress(stream.getvalue()) - logger.addFilter(messenger) - - global task - if args.without_cluster: - # use ThreadPool to avoid taking multiple CPU for pipeline - # controller. - opts = {"multithread": args.multiprocess} - else: - # use cooperative multitasking instead of multiprocessing. - opts = {"multiprocess": args.multiprocess, - "pool_manager": "gevent"} - # create the session proxy - start_session() - - logger.info("current directory is {}".format(os.getcwd())) - - ruffus.pipeline_run( - args.pipeline_targets, - forcedtorun_tasks=forcedtorun_tasks, - logger=logger, - verbose=args.loglevel, - log_exceptions=args.log_exceptions, - exceptions_terminate_immediately=args.exceptions_terminate_immediately, - checksum_level=args.ruffus_checksums_level, - pipeline=pipeline, - one_second_per_job=False, - **opts + args.stdout, args.pipeline_targets, forcedtorun_tasks=forcedtorun_tasks, + verbose=args.loglevel, pipeline=pipeline, + checksum_level=args.ruffus_checksums_level ) - - close_session() - - elif args.pipeline_action == "show": - ruffus.pipeline_printout( - args.stdout, - args.pipeline_targets, - forcedtorun_tasks=forcedtorun_tasks, - verbose=args.loglevel, - pipeline=pipeline, - checksum_level=args.ruffus_checksums_level) - elif args.pipeline_action == "touch": ruffus.pipeline_run( - args.pipeline_targets, - touch_files_only=True, - verbose=args.loglevel, - pipeline=pipeline, - checksum_level=args.ruffus_checksums_level) - + args.pipeline_targets, touch_files_only=True, + verbose=args.loglevel, pipeline=pipeline, + checksum_level=args.ruffus_checksums_level + ) elif args.pipeline_action == "regenerate": ruffus.pipeline_run( - args.pipeline_targets, - touch_files_only=args.ruffus_checksums_level, - pipeline=pipeline, - verbose=args.loglevel) - + args.pipeline_targets, touch_files_only=args.ruffus_checksums_level, + pipeline=pipeline, verbose=args.loglevel + ) elif args.pipeline_action == "svg": ruffus.pipeline_printout_graph( - args.stdout.buffer, - args.pipeline_format, - args.pipeline_targets, - forcedtorun_tasks=forcedtorun_tasks, - pipeline=pipeline, - checksum_level=args.ruffus_checksums_level) - + args.stdout.buffer, args.pipeline_format, args.pipeline_targets, + forcedtorun_tasks=forcedtorun_tasks, pipeline=pipeline, + checksum_level=args.ruffus_checksums_level + ) elif args.pipeline_action == "state": ruffus_return_dag( - args.stdout, - target_tasks=args.pipeline_targets, - forcedtorun_tasks=forcedtorun_tasks, - verbose=args.loglevel, - pipeline=pipeline, - checksum_level=args.ruffus_checksums_level) - - elif args.pipeline_action == "plot": - outf, filename = tempfile.mkstemp() - ruffus.pipeline_printout_graph( - os.fdopen(outf, "wb"), - args.pipeline_format, - args.pipeline_targets, - pipeline=pipeline, - checksum_level=args.ruffus_checksums_level) - execute("inkscape %s" % filename) - os.unlink(filename) - - except ruffus.ruffus_exceptions.RethrownJobError as ex: - - if not args.debug: - E.error("%i tasks with errors, please see summary below:" % - len(ex.args)) - for idx, e in enumerate(ex.args): - task, job, error, msg, traceback = e - - if task is None: - # this seems to be errors originating within ruffus - # such as a missing dependency - # msg then contains a RethrownJobJerror - msg = str(msg) - else: - task = re.sub("__main__.", "", task) - job = re.sub(r"\s", "", job) - - # display only single line messages - if len([x for x in msg.split("\n") if x != ""]) > 1: - msg = "" - - E.error("%i: Task=%s Error=%s %s: %s" % - (idx, task, error, job, msg)) - - E.error("full traceback is in %s" % args.pipeline_logfile) - - logger.error("start of all error messages") - logger.error(ex) - logger.error("end of all error messages") - raise ValueError("pipeline failed with errors") from ex - else: - raise - - elif args.pipeline_action == "dump": - args.stdout.write((json.dumps(get_params())) + "\n") - - elif args.pipeline_action == "printconfig": - E.info("printing out pipeline parameters: ") - p = get_params() - for k in sorted(get_params()): - print(k, "=", p[k]) - print_config_files() - - elif args.pipeline_action == "config": - # Level needs to be 2: - # 0th level -> cgatflow.py - # 1st level -> Control.py - # 2nd level -> pipeline_xyz.py - f = sys._getframe(2) - caller = f.f_globals["__file__"] - pipeline_path = os.path.splitext(caller)[0] - general_path = os.path.join(os.path.dirname(pipeline_path), - "configuration") - write_config_files(pipeline_path, general_path) - - elif args.pipeline_action == "clone": - clone_pipeline(args.pipeline_targets[0]) + args.stdout, target_tasks=args.pipeline_targets, + forcedtorun_tasks=forcedtorun_tasks, verbose=args.loglevel, + pipeline=pipeline, checksum_level=args.ruffus_checksums_level + ) - else: - raise ValueError("unknown pipeline action %s" % - args.pipeline_action) + # Dump pipeline parameters + elif args.pipeline_action == "dump": + args.stdout.write((json.dumps(get_params())) + "\n") - E.stop(logger=get_logger()) + # Print configuration settings + elif args.pipeline_action == "printconfig": + E.info("Printing out pipeline parameters:") + for k, v in sorted(get_params().items()): + print(k, "=", v) + print_config_files() + + # Generate default config files + elif args.pipeline_action == "config": + pipeline_path = os.path.splitext(get_caller().__file__)[0] + general_path = os.path.join(os.path.dirname(pipeline_path), "configuration") + write_config_files(pipeline_path, general_path) + + # Clone pipeline structure + elif args.pipeline_action == "clone": + clone_pipeline(args.pipeline_targets[0]) + + else: + raise ValueError(f"Unknown pipeline action {args.pipeline_action}") + + except Exception as e: + logger.exception("An error occurred during pipeline execution.") + if getattr(args, "cleanup_on_fail", True): + logger.info("Cleaning up all jobs due to pipeline failure.") + executor.cleanup_all_jobs() + raise e + + finally: + # End of pipeline run, stop logging + E.stop(logger=get_logger()) def main(argv=None): diff --git a/cgatcore/pipeline/execution.py b/cgatcore/pipeline/execution.py index 45ed2c63..4481dcf2 100644 --- a/cgatcore/pipeline/execution.py +++ b/cgatcore/pipeline/execution.py @@ -27,6 +27,7 @@ import math import shutil import gevent +import signal import cgatcore.experiment as E import cgatcore.iotools as iotools @@ -434,6 +435,7 @@ def __init__(self, **kwargs): self.queue_manager = None self.run_on_cluster = will_run_on_cluster(kwargs) self.job_threads = kwargs.get("job_threads", 1) + self.active_jobs = [] # List to track active jobs if "job_memory" in kwargs and "job_total_memory" in kwargs: raise ValueError( @@ -493,6 +495,8 @@ def __init__(self, **kwargs): if self.monitor_interval_running is None: self.monitor_interval_running = get_params()["cluster"].get( 'monitor_interval_running_default', GEVENT_TIMEOUT_WAIT) + # Set up signal handlers for clean-up on interruption + self.setup_signal_handlers() def __enter__(self): return self @@ -769,6 +773,89 @@ def get_val(d, v, alt): return benchmark_data + def start_job(self, job_info): + """Add a job to active_jobs list when it starts.""" + self.active_jobs.append(job_info) + self.logger.info(f"Job started: {job_info}") + + def finish_job(self, job_info): + """Remove a job from active_jobs list when it finishes.""" + if job_info in self.active_jobs: + self.active_jobs.remove(job_info) + self.logger.info(f"Job completed: {job_info}") + + def cleanup_all_jobs(self): + """Clean up all remaining active jobs on interruption.""" + self.logger.info("Cleaning up all job outputs due to pipeline interruption") + for job_info in self.active_jobs: + self.cleanup_failed_job(job_info) + self.active_jobs.clear() # Clear the list after cleanup + + def setup_signal_handlers(self): + """Set up signal handlers to clean up jobs on SIGINT and SIGTERM.""" + + def signal_handler(signum, frame): + self.logger.info(f"Received signal {signum}. Starting clean-up.") + self.cleanup_all_jobs() + exit(1) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + def cleanup_failed_job(self, job_info): + """Clean up files generated by a failed job.""" + if "outfile" in job_info: + outfiles = [job_info["outfile"]] + elif "outfiles" in job_info: + outfiles = job_info["outfiles"] + else: + self.logger.warning(f"No output files found for job {job_info.get('job_name', 'unknown')}") + return + + for outfile in outfiles: + if os.path.exists(outfile): + try: + os.remove(outfile) + self.logger.info(f"Removed failed job output file: {outfile}") + except OSError as e: + self.logger.error(f"Error removing file {outfile}: {str(e)}") + else: + self.logger.info(f"Output file not found (already removed or not created): {outfile}") + + def run(self, statement_list): + """Run a list of statements and track each job's lifecycle.""" + benchmark_data = [] + for statement in statement_list: + job_info = {"statement": statement} + self.start_job(job_info) # Add job to active_jobs + + try: + # Execute job + full_statement, job_path = self.build_job_script(statement) + process = subprocess.Popen( + full_statement, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + stdout, stderr = process.communicate() + + if process.returncode != 0: + raise OSError( + f"Job failed with return code {process.returncode}.\n" + f"stderr: {stderr.decode('utf-8')}\nstatement: {statement}" + ) + + # Collect benchmark data if job was successful + benchmark_data.append( + self.collect_benchmark_data([statement], resource_usage=[{"job_id": process.pid}]) + ) + self.finish_job(job_info) # Remove job from active_jobs + + except Exception as e: + self.logger.error(f"Job failed: {e}") + self.cleanup_failed_job(job_info) + continue + + return benchmark_data + class GridExecutor(Executor): @@ -985,10 +1072,16 @@ def _convert(key, v): data.update(dict([(x[0], _convert(x[0], x[1])) for x in pairs if len(x) == 2])) - # remove % sign + cpu_value = data.get("percent_cpu", "0") + # Strip potential '%' symbols, handle non-numeric cases gracefully + try: + percent_cpu = int(re.sub("%", "", cpu_value)) if cpu_value.replace("%", "").strip().isdigit() else 0 + except ValueError: + percent_cpu = 0 # Default or fallback value, adjust as necessary + data.update( - {"percent_cpu": int(re.sub("%", "", data.get("percent_cpu", 0))), - "cpu_t": float(data["user_t"]) + float(data["sys_t"])}) + {"percent_cpu": percent_cpu, + "cpu_t": float(data.get("user_t", 0)) + float(data.get("sys_t", 0))}) return JobInfo(jobId=process.pid, resourceUsage=data) diff --git a/cgatcore/pipeline/parameters.py b/cgatcore/pipeline/parameters.py index ba5107aa..8cfdbdcb 100644 --- a/cgatcore/pipeline/parameters.py +++ b/cgatcore/pipeline/parameters.py @@ -239,7 +239,7 @@ def input_validation(PARAMS, pipeline_script=""): E.info('''checking pipeline configuration''') - for key, value in sorted(PARAMS.iteritems()): + for key, value in sorted(PARAMS.items()): key = str(key) value = str(value) diff --git a/tests/test_execution_cleanup.py b/tests/test_execution_cleanup.py new file mode 100644 index 00000000..5673ed98 --- /dev/null +++ b/tests/test_execution_cleanup.py @@ -0,0 +1,175 @@ +# tests/test_execution_cleanup.py + +import unittest +from unittest.mock import patch, MagicMock +import os +import sys +import tempfile +import shutil +import types +from ruffus.ruffus_exceptions import RethrownJobError +from ruffus import pipeline_run, follows, Pipeline + + +class MockParams(dict): + def __getitem__(self, key): + if key in self: + value = super().__getitem__(key) + if isinstance(value, dict): + return MockParams(value) + else: + return value + else: + raise KeyError(f"Missing parameter accessed: '{key}'") + + +class TestExecutionCleanup(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Patch sys.modules with dummy scripts for 'cgat_check_deps' + cls.dummy_scripts = types.ModuleType('scripts') + cls.dummy_cgat_check_deps = types.ModuleType('scripts.cgat_check_deps') + + # Assign dummy function `checkDepedencies` to 'scripts.cgat_check_deps' + cls.dummy_cgat_check_deps.checkDepedencies = MagicMock(return_value=([], [])) # Adjust the return value as needed + + # Patch 'sys.modules' to include both 'scripts' and 'scripts.cgat_check_deps' + cls.patcher_scripts = patch.dict('sys.modules', { + 'scripts': cls.dummy_scripts, + 'scripts.cgat_check_deps': cls.dummy_cgat_check_deps + }) + cls.patcher_scripts.start() + + @classmethod + def tearDownClass(cls): + cls.patcher_scripts.stop() + + def setUp(self): + self.test_dir = tempfile.mkdtemp() + + # Patches for params and args + self.patcher_execution = patch( + 'cgatcore.pipeline.execution.get_params', + return_value=MockParams({ + "work_dir": self.test_dir, + "tmpdir": self.test_dir, + "cluster": {"memory_default": "4G"}, + "pipeline_logfile": "pipeline.log", + "without_cluster": True, + }) + ) + self.patcher_execution.start() + + self.patcher_control = patch( + 'cgatcore.pipeline.control.get_params', + return_value=MockParams({ + "work_dir": self.test_dir, + "tmpdir": self.test_dir, + "cluster": {"memory_default": "4G"}, + "pipeline_logfile": "pipeline.log", + "without_cluster": True, + }) + ) + self.patcher_control.start() + + # Initialize Executor instance here + from cgatcore.pipeline.execution import Executor + self.executor = Executor() + + def tearDown(self): + self.patcher_execution.stop() + self.patcher_control.stop() + shutil.rmtree(self.test_dir, ignore_errors=True) + + @follows() + def target_task(): + pass # This is a minimal task, no operation needed + + @patch('ruffus.pipeline_run') + def test_error_handling_calls_cleanup(self, mock_pipeline_run): + mock_pipeline_run.side_effect = RethrownJobError([("target_task", "job1", "error1", "msg1", "traceback1")]) + + mock_args = MagicMock() + mock_args.debug = False + mock_args.pipeline_action = "make" + mock_args.input_validation = False # Disable input validation + mock_args.pipeline_targets = ["target_task"] # Reference the dummy task + mock_args.without_cluster = True + mock_args.multiprocess = 1 + mock_args.loglevel = 5 + mock_args.log_exceptions = True + mock_args.exceptions_terminate_immediately = False + mock_args.stdout = sys.stdout + mock_args.stderr = sys.stderr + mock_args.timeit_file = None + mock_args.timeit_header = True + mock_args.timeit_name = 'test_timeit' + + with patch('cgatcore.experiment.get_args', return_value=mock_args): + from cgatcore.pipeline import control + + with patch('cgatcore.pipeline.control.logging.getLogger') as mock_logger: + mock_logger.return_value = MagicMock() + print("About to call control.run_workflow() and expecting ValueError") + + # Use assertRaises to capture the expected ValueError + with self.assertRaises(ValueError) as context: + control.run_workflow(mock_args, pipeline=None) + + print("Caught expected ValueError:", str(context.exception)) + + def test_cleanup_failed_job_single_file(self): + test_file = os.path.join(self.test_dir, "test_output.txt") + with open(test_file, "w") as f: + f.write("Test content") + job_info = {"outfile": test_file} + self.executor.cleanup_failed_job(job_info) + self.assertFalse(os.path.exists(test_file)) + + def test_cleanup_failed_job_multiple_files(self): + test_files = [os.path.join(self.test_dir, f"test_output_{i}.txt") for i in range(3)] + for file in test_files: + with open(file, "w") as f: + f.write("Test content") + job_info = {"outfiles": test_files} + self.executor.cleanup_failed_job(job_info) + for file in test_files: + self.assertFalse(os.path.exists(file)) + + def test_cleanup_failed_job_nonexistent_file(self): + non_existent_file = os.path.join(self.test_dir, "non_existent.txt") + job_info = {"outfile": non_existent_file} + try: + self.executor.cleanup_failed_job(job_info) + except Exception as e: + self.fail(f"cleanup_failed_job raised an exception unexpectedly: {e}") + + def test_cleanup_failed_job_no_outfiles(self): + job_info = {"job_name": "test_job"} + with self.assertLogs('cgatcore.pipeline', level='WARNING') as cm: + self.executor.cleanup_failed_job(job_info) + self.assertIn("No output files found for job test_job", cm.output[0]) + + def test_start_job(self): + job_info = {"job_name": "test_job"} + self.executor.start_job(job_info) + self.assertIn(job_info, self.executor.active_jobs) + + def test_finish_job(self): + job_info = {"job_name": "test_job"} + self.executor.start_job(job_info) + self.executor.finish_job(job_info) + self.assertNotIn(job_info, self.executor.active_jobs) + + def test_cleanup_all_jobs(self): + test_files = [os.path.join(self.test_dir, f"test_output_{i}.txt") for i in range(3)] + for file in test_files: + with open(file, "w") as f: + f.write("Test content") + job_infos = [{"outfile": file} for file in test_files] + for job_info in job_infos: + self.executor.start_job(job_info) + self.executor.cleanup_all_jobs() + for file in test_files: + self.assertFalse(os.path.exists(file)) + self.assertEqual(len(self.executor.active_jobs), 0)