Skip to content

Commit

Permalink
Test metric collection when runner fails
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Sep 13, 2024
1 parent 0bc1a46 commit 1490982
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
4 changes: 2 additions & 2 deletions lib/galaxy/jobs/runners/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(self, app, nworkers):

super().__init__(app, nworkers)

def __command_line(self, job_wrapper: "MinimalJobWrapper") -> Tuple[str, str]:
def _command_line(self, job_wrapper: "MinimalJobWrapper") -> Tuple[str, str]:
""" """
command_line = job_wrapper.runner_command_line

Expand Down Expand Up @@ -90,7 +90,7 @@ def queue_job(self, job_wrapper):
stderr = stdout = ""

# command line has been added to the wrapper by prepare_job()
job_file, exit_code_path = self.__command_line(job_wrapper)
job_file, exit_code_path = self._command_line(job_wrapper)
job_id = job_wrapper.get_id_tag()

try:
Expand Down
4 changes: 3 additions & 1 deletion test/integration/resubmission_runners.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import subprocess
import time
from typing import List

Expand All @@ -13,7 +14,8 @@ class FailsJobRunner(LocalJobRunner):
def queue_job(self, job_wrapper):
if not self._prepare_job_local(job_wrapper):
return

command_line, _ = self._command_line(job_wrapper)
subprocess.run([command_line])
resource_parameters = job_wrapper.get_resource_parameters()
failure_state = resource_parameters.get("failure_state", None)

Expand Down
31 changes: 27 additions & 4 deletions test/integration/test_job_resubmission.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os

from galaxy_test.driver import integration_util
from galaxy_test.base.populators import DatasetPopulator

SCRIPT_DIRECTORY = os.path.abspath(os.path.dirname(__file__))
JOB_RESUBMISSION_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "resubmission_job_conf.yml")
Expand All @@ -27,15 +28,15 @@
class _BaseResubmissionIntegrationTestCase(integration_util.IntegrationTestCase):
framework_tool_and_types = True

def _assert_job_passes(self, tool_id="exit_code_oom", resource_parameters=None):
def _assert_job_passes(self, tool_id="exit_code_oom", resource_parameters=None, history_id=None):
resource_parameters = resource_parameters or {}
self._run_tool_test(tool_id, resource_parameters=resource_parameters)
self._run_tool_test(tool_id, resource_parameters=resource_parameters, test_history=history_id)

def _assert_job_fails(self, tool_id="exit_code_oom", resource_parameters=None):
def _assert_job_fails(self, tool_id="exit_code_oom", resource_parameters=None, history_id=None):
resource_parameters = resource_parameters or {}
exception_thrown = False
try:
self._run_tool_test(tool_id, resource_parameters=resource_parameters)
self._run_tool_test(tool_id, resource_parameters=resource_parameters, test_history=history_id)
except Exception:
exception_thrown = True

Expand All @@ -44,6 +45,11 @@ def _assert_job_fails(self, tool_id="exit_code_oom", resource_parameters=None):

class TestJobResubmissionIntegration(_BaseResubmissionIntegrationTestCase):
framework_tool_and_types = True
dataset_populator: DatasetPopulator

def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)

@classmethod
def handle_galaxy_config_kwds(cls, config):
Expand All @@ -52,6 +58,9 @@ def handle_galaxy_config_kwds(cls, config):
config["job_resource_params_file"] = JOB_RESUBMISSION_JOB_RESOURCES_CONFIG_FILE
config["job_runner_monitor_sleep"] = 1
config["job_handler_monitor_sleep"] = 1
config["job_metrics"] = [{"type": "core"}]
# Can't set job_metrics_config_file to None as default location will be used otherwise
config["job_metrics_config_file"] = "xxx.xml"

def test_retry_tools_have_resource_params(self):
tool_show = self._get("tools/simple_constructs", data=dict(io_details=True)).json()
Expand All @@ -74,6 +83,20 @@ def test_failure_runner(self):
}
)

def test_failure_runner_job_metrics_collected(self):
with self.dataset_populator.test_history() as history_id:
self._assert_job_fails(
resource_parameters={
"test_name": "test_failure_runner",
"initial_target_environment": "fails_without_resubmission",
},
history_id=history_id,
)
jobs = self.dataset_populator.history_jobs(history_id=history_id)
assert len(jobs) == 1
job_metrics = self.dataset_populator._get(f"/api/jobs/{jobs[0]['id']}/metrics").json()
assert job_metrics

def test_walltime_resubmission(self):
self._assert_job_passes(
resource_parameters={"test_name": "test_walltime_resubmission", "failure_state": "walltime_reached"}
Expand Down

0 comments on commit 1490982

Please sign in to comment.