Skip to content

Commit

Permalink
Add jobs parser for Slurm v22
Browse files Browse the repository at this point in the history
  • Loading branch information
soline-b committed Oct 25, 2023
1 parent cca76b8 commit bde0f78
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 45 deletions.
151 changes: 151 additions & 0 deletions slurm_state/helpers/parser_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,154 @@ def renamer(k, v, res):
res[name] = v

return renamer


def copy_and_stringify(k, v, res):
res[k] = str(v)


def rename_subitems(subitem_dict):
def renamer(k, v, res):
for subitem, name in subitem_dict.items():
res[name] = v[subitem]

return renamer


def translate_with_value_modification(v_modification, translator, **args):
"""
This function returns a translator that includes a modification
on the value which will be transmitted to it
Parameters:
v_modification The function modifying the value before it
is transmitted to the translator
translator The translator called
Returns:
A translator function including the expected value modification
"""
# Some translator can depend on specific arguments. Thus, we call
# them before to apply it and get the translator which has to be
# used
final_translator = translator(**args)

# This helper is used to update the value v before applying the
# translator on the triplet (k, v, res)
def combiner(k, v, res):
final_translator(k, v_modification(v), res)

return combiner


def zero_to_null(v):
"""
Convert the value from 0 to null
Parameter:
v The values to be converted if appliable
Return:
The converted values
"""
# If a value of v equals 0, transform it to None
for (v_k, v_v) in v.items():
if v_v == 0:
v[v_k] = None
# Return v
return v


def rename_and_stringify_subitems(subitem_dict):
def renamer(k, v, res):
for subitem, name in subitem_dict.items():
res[name] = str(v[subitem])

return renamer


def join_subitems(separator, name):
def joiner(k, v, res):
values = []
for _, value in v.items():
values.append(str(value))
res[name] = separator.join(values)

return joiner


def extract_tres_data(k, v, res):
"""
Extract count of the elements present in the value associated to the key "tres"
in the input dictionary. Such a dictionary would present a structure similar as depicted below:
"tres": {
'allocated': [
{'type': 'cpu', 'name': None, 'id': 1, 'count': 4},
{'type': 'mem', 'name': None, 'id': 2, 'count': 40960},
{'type': 'node', 'name': None, 'id': 4, 'count': 1},
{'type': 'billing', 'name': None, 'id': 5, 'count': 1},
{'type': 'gres', 'name': 'gpu', 'id': 1001, 'count': 1}
],
'requested': [
{'type': 'cpu', 'name': None, 'id': 1, 'count': 4},
{'type': 'mem', 'name': None, 'id': 2, 'count': 40960},
{'type': 'node', 'name': None, 'id': 4, 'count': 1},
{'type': 'billing', 'name': None, 'id': 5, 'count': 1},
{'type': 'gres', 'name': 'gpu', 'id': 1001, 'count': 1}
]
}
The dictionaries (and their associated keys) inserted in the job result by this function
for this input should be:
"tres_allocated": {
"billing": 1,
"mem": 40960,
"num_cpus": 4,
"num_gpus": 1,
"num_nodes": 1
}
"tres_requested": {
"billing": 1,
"mem": 40960,
"num_cpus": 4,
"num_gpus": 1,
"num_nodes": 1
}
"""

def get_tres_key(tres_type, tres_name):
"""
Basically, this function is used to rename the element
we want to retrieve regarding the TRES type (as we are
for now only interested by the "count" of the entity)
"""
if tres_type == "mem" or tres_type == "billing":
return tres_type
elif tres_type == "cpu":
return "num_cpus"
elif tres_type == "gres":
if tres_name == "gpu":
return "num_gpus"
else:
return "gres"
elif tres_type == "node":
return "num_nodes"
else:
return None

tres_subdict_names = [
{"sacct_name": "allocated", "cw_name": "tres_allocated"},
{"sacct_name": "requested", "cw_name": "tres_requested"},
]
for tres_subdict_name in tres_subdict_names:
res[
tres_subdict_name["cw_name"]
] = {} # Initialize the "tres_allocated" and the "tres_requested" subdicts
for tres_subdict in v[tres_subdict_name["sacct_name"]]:
tres_key = get_tres_key(
tres_subdict["type"], tres_subdict["name"]
) # Define the key associated to the TRES
if tres_key:
res[tres_subdict_name["cw_name"]][tres_key] = tres_subdict[
"count"
] # Associate the count of the element, as value associated to the key defined previously
26 changes: 9 additions & 17 deletions slurm_state/mongo_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from slurm_state.helpers.gpu_helper import get_cw_gres_description
from slurm_state.helpers.clusters_helper import get_all_clusters

# from slurm_state.sinfo_parser import node_parser, generate_node_report
from slurm_state.sacct_parser import job_parser, generate_job_report
# Import parser classes
from slurm_state.parsers.job_parser import JobParser
from slurm_state.parsers.node_parser import NodeParser


Expand All @@ -33,14 +33,9 @@ def fetch_slurm_report(parser, cluster_name, report_path):
assert ctx is not None, f"{cluster_name} not configured"

with open(report_path, "r") as f:
if isinstance(parser, NodeParser):
for e in parser.parser(f):
e["cluster_name"] = cluster_name
yield e
else:
for e in parser(f):
e["cluster_name"] = cluster_name
yield e
for e in parser.parser(f):
e["cluster_name"] = cluster_name
yield e


def slurm_job_to_clockwork_job(slurm_job: dict):
Expand Down Expand Up @@ -149,9 +144,10 @@ def main_read_report_and_update_collection(
id_key = (
"job_id" # The id_key is used to determine how to retrieve the ID of a job
)
parser = job_parser # This parser is used to retrieve and format useful information from a sacct job
parser = JobParser(
cluster_name
) # This parser is used to retrieve and format useful information from a sacct job
from_slurm_to_clockwork = slurm_job_to_clockwork_job # This function is used to translate a Slurm job (created through the parser) to a Clockwork job
generate_report = generate_job_report # This function is used to generate the file gathering the job information which will be explained later
elif entity == "nodes":
id_key = (
"name" # The id_key is used to determine how to retrieve the ID of a node
Expand All @@ -160,7 +156,6 @@ def main_read_report_and_update_collection(
cluster_name
) # This parser is used to retrieve and format useful information from a sacct node
from_slurm_to_clockwork = slurm_node_to_clockwork_node # This function is used to translate a Slurm node (created through the parser) to a Clockwork node
# generate_report = generate_node_report # This function is used to generate the file gathering the node information which will be explained later
else:
# Raise an error because it should not happen
raise ValueError(
Expand All @@ -178,10 +173,7 @@ def main_read_report_and_update_collection(
print(
f"Generate report file for the {cluster_name} cluster at location {report_file_path}."
)
if entity == "jobs":
generate_report(cluster_name, report_file_path)
else:
parser.generate_report(report_file_path)
parser.generate_report(report_file_path)

# Construct an iterator over the list of entities in the report file,
# each one of them is turned into a clockwork job or node, according to applicability
Expand Down
117 changes: 117 additions & 0 deletions slurm_state/parsers/job_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# These functions are translators used in order to handle the values
# we could encounter while parsing a job dictionary retrieved from a
# sacct command.
from slurm_state.helpers.parser_helper import (
copy,
copy_and_stringify,
extract_tres_data,
join_subitems,
rename,
rename_subitems,
rename_and_stringify_subitems,
translate_with_value_modification,
zero_to_null,
)

from slurm_state.parsers.slurm_parser import SlurmParser

# Common imports
import json, re


class JobParser(SlurmParser):
""" """

def __init__(self, cluster_name):
super().__init__("jobs", "sacct", cluster_name)

def generate_report(self, file_name):

# Retrieve the allocations associated to the cluster
allocations = self.cluster["allocations"]

if allocations == []:
# If the cluster has no associated allocation, nothing is requested
print(
f"The cluster {self.cluster['name']} has no allocation related to it. Thus, no job has been retrieved. Associated allocations can be provided in the Clockwork configuration file."
)
return []
else:
# Set the sacct command
# -S is a condition on the start time, 600 being in seconds
# -E is a condition on the end time
# -X means "Only show statistics relevant to the job allocation itself, not taking steps into consideration."
# --associations is used in order to limit the fetched jobs to the ones related to Mila and/or professors who
# may use Clockwork
if allocations == "*":
# We do not provide --associations information because the default for this parameter
# is "all associations"
remote_command = (
f"{self.slurm_command_path} -S now-600 -E now -X --allusers --json"
)
else:
accounts_list = ",".join(allocations)
remote_command = f"{self.slurm_command_path} -S now-600 -E now -X --accounts={accounts_list} --allusers --json"
print(f"remote_command is\n{remote_command}")

return super().generate_report(remote_command, file_name)

def parser(self, f):
""" """
if re.search("^slurm 22\..*$", self.slurm_version):
return self.parser_v22(f)
else:
raise Exception(
f'The {self.entity} parser is not implemented for the Slurm version "{self.slurm_version}".'
)

def parser_v22(self, f):
JOB_FIELD_MAP = {
"account": copy,
"array": rename_and_stringify_subitems(
{"job_id": "array_job_id", "task_id": "array_task_id"}
),
"cluster": rename("cluster_name"),
"exit_code": join_subitems(":", "exit_code"),
"job_id": copy_and_stringify,
"name": copy,
"nodes": copy,
"partition": copy,
"state": rename_subitems({"current": "job_state"}),
"time": translate_with_value_modification(
zero_to_null,
rename_subitems,
subitem_dict={
"limit": "time_limit",
"submission": "submit_time",
"start": "start_time",
"end": "end_time",
},
),
"tres": extract_tres_data,
"user": rename("username"),
"working_directory": copy,
}

# Load the JSON file generated using the Slurm command
# (At this point, slurm_data is a hierarchical structure of dictionaries and lists)
slurm_data = json.load(f)

slurm_entities = slurm_data[self.entity]

for slurm_entity in slurm_entities:
res_entity = (
dict()
) # Initialize the dictionary which will store the newly formatted Slurm data

for k, v in slurm_entity.items():
# We will use a handler mapping to translate this
translator = JOB_FIELD_MAP.get(k, None)

if translator is not None:
# Translate using the translator retrieved from the fields map
translator(k, v, res_entity)

# If no translator has been provided: ignore the field

yield res_entity
28 changes: 13 additions & 15 deletions slurm_state/parsers/node_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
rename,
)

# Common import
# Common imports
import json, re


Expand All @@ -25,8 +25,17 @@ def generate_report(self, file_name):

return super().generate_report(remote_command, file_name)

def get_field_map(self):
node_field_map = {
def parser(self, f):
""" """
if re.search("^slurm 22\..*$", self.slurm_version):
return self.parser_v22(f)
else:
raise Exception(
f'The {self.entity} parser is not implemented for the Slurm version "{self.slurm_version}".'
)

def parser_v22(self, f):
NODE_FIELD_MAP = {
"architecture": rename("arch"),
"comment": copy,
"cores": copy,
Expand All @@ -45,18 +54,7 @@ def get_field_map(self):
"tres": copy,
"tres_used": copy,
}
return node_field_map

def parser(self, f):
""" """
if re.search("^slurm 22\..*$", self.slurm_version):
return self.parser_v22(f)
else:
raise Exception(
f'The {self.entity} parser is not implemented for the Slurm version "{self.slurm_version}".'
)

def parser_v22(self, f):
# Load the JSON file generated using the Slurm command
# (At this point, slurm_data is a hierarchical structure of dictionaries and lists)
slurm_data = json.load(f)
Expand All @@ -70,7 +68,7 @@ def parser_v22(self, f):

for k, v in slurm_entity.items():
# We will use a handler mapping to translate this
translator = self.get_field_map().get(k, None)
translator = NODE_FIELD_MAP.get(k, None)

if translator is not None:
# Translate using the translator retrieved from the fields map
Expand Down
Loading

0 comments on commit bde0f78

Please sign in to comment.