Skip to content

Commit

Permalink
Fix container recording for pulsar
Browse files Browse the repository at this point in the history
And re-use abstractions for determining file path.
  • Loading branch information
mvdbeek committed Jul 26, 2024
1 parent 5a27e70 commit 9b772f1
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 11 deletions.
7 changes: 4 additions & 3 deletions lib/galaxy/job_metrics/instrumenters/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import json
import logging
import os
import time
from typing import (
Any,
Expand All @@ -25,7 +24,6 @@
START_EPOCH_KEY = "start_epoch"
END_EPOCH_KEY = "end_epoch"
RUNTIME_SECONDS_KEY = "runtime_seconds"
CONTAINER_FILE = "__container.json"
CONTAINER_ID = "container_id"
CONTAINER_TYPE = "container_type"

Expand Down Expand Up @@ -89,9 +87,12 @@ def job_properties(self, job_id, job_directory: str) -> Dict[str, Any]:
properties[RUNTIME_SECONDS_KEY] = end - start
return properties

def get_container_file_path(self, job_directory):
return self._instrument_file_path(job_directory, "container")

def __read_container_details(self, job_directory) -> Dict[str, str]:
try:
with open(os.path.join(job_directory, CONTAINER_FILE)) as fh:
with open(self.get_container_file_path(job_directory)) as fh:
return json.load(fh)
except FileNotFoundError:
return {}
Expand Down
19 changes: 13 additions & 6 deletions lib/galaxy/jobs/command_factory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import json
import typing
from logging import getLogger
from os import getcwd
from os import (
getcwd,
makedirs,
)
from os.path import (
abspath,
join,
Expand Down Expand Up @@ -81,8 +84,16 @@ def build_command(
__handle_dependency_resolution(commands_builder, job_wrapper, remote_command_params)

__handle_task_splitting(commands_builder, job_wrapper)

for_pulsar = "pulsar_version" in remote_command_params
if container:
if core_job_metric_plugin := runner.app.job_metrics.default_job_instrumenter.get_configured_plugin("core"):
directory = join(job_wrapper.working_directory, "metadata") if for_pulsar else job_wrapper.working_directory
makedirs(directory, exist_ok=True)
container_file_path = core_job_metric_plugin.get_container_file_path(directory)
with open(container_file_path, "w") as container_file:
container_file.write(
json.dumps({"container_id": container.container_id, "container_type": container.container_type})
)
if (container and modify_command_for_container) or job_wrapper.commands_in_new_shell:
if container and modify_command_for_container:
# Many Docker containers do not have /bin/bash.
Expand Down Expand Up @@ -181,10 +192,6 @@ def __externalize_commands(
source_command = ""
if container:
source_command = container.source_environment
with open(join(job_wrapper.working_directory, "__container.json"), "w") as container_file:
container_file.write(
json.dumps({"container_id": container.container_id, "container_type": container.container_type})
)
script_contents = f"#!{shell}\n{integrity_injection}{set_e}{source_command}{tool_commands}"
write_script(
local_container_script,
Expand Down
3 changes: 1 addition & 2 deletions test/integration/test_containerized_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ def _run_and_get_contents(self, tool_id: str, history_id: str):
run_response = self.dataset_populator.run_tool(tool_id, {}, history_id)
job_id = run_response["jobs"][0]["id"]
self.dataset_populator.wait_for_job(job_id=job_id, assert_ok=True, timeout=EXTENDED_TIMEOUT)
job_details = self.dataset_populator.get_job_details(job_id=job_id, full=True).json()
job_metrics = job_details["job_metrics"]
job_metrics = self.dataset_populator._get(f"/api/jobs/{job_id}/metrics").json()
# would be nice if it wasn't just a list of unpredictable order ...
container_id = None
container_type = None
Expand Down

0 comments on commit 9b772f1

Please sign in to comment.