Skip to content

Commit

Permalink
Merge pull request #175 from mila-iqia/import_jobs_by_slurm_versions
Browse files Browse the repository at this point in the history
Update parsers to import jobs by Slurm versions
  • Loading branch information
soline-b authored Nov 8, 2023
2 parents 63e8429 + 279dca0 commit f1a2c05
Show file tree
Hide file tree
Showing 10 changed files with 547 additions and 28 deletions.
1 change: 1 addition & 0 deletions slurm_state/helpers/clusters_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def _load_clusters_from_config():

clusters_valid.add_field("sacct_path", optional_string)
clusters_valid.add_field("sinfo_path", optional_string)
clusters_valid.add_field("slurm_version", optional_string, default=None)

# Load the clusters from the configuration file, asserting that it uses the
# predefined format
Expand Down
151 changes: 151 additions & 0 deletions slurm_state/helpers/parser_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,154 @@ def renamer(k, v, res):
res[name] = v

return renamer


def copy_and_stringify(k, v, res):
res[k] = str(v)


def rename_subitems(subitem_dict):
def renamer(k, v, res):
for subitem, name in subitem_dict.items():
res[name] = v[subitem]

return renamer


def translate_with_value_modification(v_modification, translator, **args):
"""
This function returns a translator that includes a modification
on the value which will be transmitted to it
Parameters:
v_modification The function modifying the value before it
is transmitted to the translator
translator The translator called
Returns:
A translator function including the expected value modification
"""
# Some translator can depend on specific arguments. Thus, we call
# them before to apply it and get the translator which has to be
# used
final_translator = translator(**args)

# This helper is used to update the value v before applying the
# translator on the triplet (k, v, res)
def combiner(k, v, res):
final_translator(k, v_modification(v), res)

return combiner


def zero_to_null(v):
"""
Convert the value from 0 to null
Parameter:
v The values to be converted if appliable
Return:
The converted values
"""
# If a value of v equals 0, transform it to None
for (v_k, v_v) in v.items():
if v_v == 0:
v[v_k] = None
# Return v
return v


def rename_and_stringify_subitems(subitem_dict):
def renamer(k, v, res):
for subitem, name in subitem_dict.items():
res[name] = str(v[subitem])

return renamer


def join_subitems(separator, name):
def joiner(k, v, res):
values = []
for _, value in v.items():
values.append(str(value))
res[name] = separator.join(values)

return joiner


def extract_tres_data(k, v, res):
"""
Extract count of the elements present in the value associated to the key "tres"
in the input dictionary. Such a dictionary would present a structure similar as depicted below:
"tres": {
'allocated': [
{'type': 'cpu', 'name': None, 'id': 1, 'count': 4},
{'type': 'mem', 'name': None, 'id': 2, 'count': 40960},
{'type': 'node', 'name': None, 'id': 4, 'count': 1},
{'type': 'billing', 'name': None, 'id': 5, 'count': 1},
{'type': 'gres', 'name': 'gpu', 'id': 1001, 'count': 1}
],
'requested': [
{'type': 'cpu', 'name': None, 'id': 1, 'count': 4},
{'type': 'mem', 'name': None, 'id': 2, 'count': 40960},
{'type': 'node', 'name': None, 'id': 4, 'count': 1},
{'type': 'billing', 'name': None, 'id': 5, 'count': 1},
{'type': 'gres', 'name': 'gpu', 'id': 1001, 'count': 1}
]
}
The dictionaries (and their associated keys) inserted in the job result by this function
for this input should be:
"tres_allocated": {
"billing": 1,
"mem": 40960,
"num_cpus": 4,
"num_gpus": 1,
"num_nodes": 1
}
"tres_requested": {
"billing": 1,
"mem": 40960,
"num_cpus": 4,
"num_gpus": 1,
"num_nodes": 1
}
"""

def get_tres_key(tres_type, tres_name):
"""
Basically, this function is used to rename the element
we want to retrieve regarding the TRES type (as we are
for now only interested by the "count" of the entity)
"""
if tres_type == "mem" or tres_type == "billing":
return tres_type
elif tres_type == "cpu":
return "num_cpus"
elif tres_type == "gres":
if tres_name == "gpu":
return "num_gpus"
else:
return "gres"
elif tres_type == "node":
return "num_nodes"
else:
return None

tres_subdict_names = [
{"sacct_name": "allocated", "cw_name": "tres_allocated"},
{"sacct_name": "requested", "cw_name": "tres_requested"},
]
for tres_subdict_name in tres_subdict_names:
res[
tres_subdict_name["cw_name"]
] = {} # Initialize the "tres_allocated" and the "tres_requested" subdicts
for tres_subdict in v[tres_subdict_name["sacct_name"]]:
tres_key = get_tres_key(
tres_subdict["type"], tres_subdict["name"]
) # Define the key associated to the TRES
if tres_key:
res[tres_subdict_name["cw_name"]][tres_key] = tres_subdict[
"count"
] # Associate the count of the element, as value associated to the key defined previously
59 changes: 59 additions & 0 deletions slurm_state/helpers/ssh_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,62 @@ def open_connection(hostname, username, ssh_key_path, port=22):
ssh_client = None

return ssh_client


def launch_slurm_command(command, hostname, username, ssh_key_filename, port=22):
"""
Launch a Slurm command through SSH and retrieve its response.
Parameters:
command The Slurm command to launch through SSH
hostname The hostname used for the SSH connection to launch the Slurm command
username The username used for the SSH connection to launch the Slurm command
ssh_key_filename The name of the private key in .ssh folder used for the SSH connection to launch the Slurm command
port The port used for the SSH connection to launch the sinfo command
"""
# Print the command to use
print(f"The command launched through SSH is:\n{command}")

# Check the given SSH key
assert ssh_key_filename, "Missing ssh_key_filename from config."

# Now this is the private ssh key that we are using with Paramiko.
ssh_key_path = os.path.join(os.path.expanduser("~"), ".ssh", ssh_key_filename)

# Connect through SSH
try:
ssh_client = open_connection(
hostname, username, ssh_key_path=ssh_key_path, port=port
)
except Exception as inst:
print(
f"Error. Failed to connect to {hostname} to launch the command:\n{command}"
)
print(inst)
return []

# If a connection has been established
if ssh_client:
ssh_stdin, ssh_stdout, ssh_stderr = ssh_client.exec_command(command)

# We should find a better option to retrieve stderr
"""
response_stderr = "".join(ssh_stderr.readlines())
if len(response_stderr):
print(
f"Stderr in sinfo call on {hostname}. This doesn't mean that the call failed entirely, though.\n{response_stderr}"
)
"""
stdout = ssh_stdout.readlines()
ssh_client.close()
return stdout

else:
print(
f"Error. Failed to connect to {hostname} to make the call. Returned `None` but no exception was thrown."
)

# If no SSH connection has been established, raise an exception
raise Exception(
f"No SSH connection has been established while trying to run {command}."
)
33 changes: 19 additions & 14 deletions slurm_state/mongo_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from slurm_state.helpers.gpu_helper import get_cw_gres_description
from slurm_state.helpers.clusters_helper import get_all_clusters

from slurm_state.sinfo_parser import node_parser, generate_node_report
from slurm_state.sacct_parser import job_parser, generate_job_report
# Import parser classes
from slurm_state.parsers.job_parser import JobParser
from slurm_state.parsers.node_parser import NodeParser


def pprint_bulk_result(result):
Expand All @@ -20,19 +21,21 @@ def pprint_bulk_result(result):
print(result.bulk_api_result)


def fetch_slurm_report(parser, cluster_name, report_path):
def fetch_slurm_report(parser, report_path):
"""
Yields elements ready to be slotted into the "slurm" field,
but they have to be processed further before committing to MongoDB.
"""
# Retrieve the cluster name
cluster_name = parser.cluster["name"]

assert os.path.exists(report_path), f"The report path {report_path} is missing."

ctx = get_all_clusters().get(cluster_name, None)
assert ctx is not None, f"{cluster_name} not configured"

with open(report_path, "r") as f:
for e in parser(f):
for e in parser.parser(f):
e["cluster_name"] = cluster_name
yield e

Expand Down Expand Up @@ -136,47 +139,49 @@ def main_read_report_and_update_collection(
# Initialize the time of this operation's beginning
timestamp_start = time.time()

# Retrieve clusters data from the configuration file
clusters = get_all_clusters()
assert cluster_name in clusters

# Check the input parameters
assert entity in ["jobs", "nodes"]

if entity == "jobs":
id_key = (
"job_id" # The id_key is used to determine how to retrieve the ID of a job
)
parser = job_parser # This parser is used to retrieve and format useful information from a sacct job
parser = JobParser(
cluster_name
) # This parser is used to retrieve and format useful information from a sacct job
from_slurm_to_clockwork = slurm_job_to_clockwork_job # This function is used to translate a Slurm job (created through the parser) to a Clockwork job
generate_report = generate_job_report # This function is used to generate the file gathering the job information which will be explained later
elif entity == "nodes":
id_key = (
"name" # The id_key is used to determine how to retrieve the ID of a node
)
parser = node_parser # This parser is used to retrieve and format useful information from a sacct node
parser = NodeParser(
cluster_name
) # This parser is used to retrieve and format useful information from a sacct node
from_slurm_to_clockwork = slurm_node_to_clockwork_node # This function is used to translate a Slurm node (created through the parser) to a Clockwork node
generate_report = generate_node_report # This function is used to generate the file gathering the node information which will be explained later
else:
# Raise an error because it should not happen
raise ValueError(
f'Incorrect value for entity in main_read_sacct_and_update_collection: "{entity}" when it should be "jobs" or "nodes".'
)

# Retrieve clusters data from the configuration file
clusters = get_all_clusters()
assert cluster_name in clusters

## Retrieve entities ##

# Generate a report file if required
if not from_file or not os.path.exists(report_file_path):
print(
f"Generate report file for the {cluster_name} cluster at location {report_file_path}."
)
generate_report(cluster_name, report_file_path)
parser.generate_report(report_file_path)

# Construct an iterator over the list of entities in the report file,
# each one of them is turned into a clockwork job or node, according to applicability
I_clockwork_entities_from_report = map(
from_slurm_to_clockwork,
fetch_slurm_report(parser, cluster_name, report_file_path),
fetch_slurm_report(parser, report_file_path),
)

L_updates_to_do = [] # Entity updates to store in the database if requested
Expand Down
Loading

0 comments on commit f1a2c05

Please sign in to comment.