Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import jobs by slurm versions #175

Merged
merged 6 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions slurm_state/helpers/clusters_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def _load_clusters_from_config():

clusters_valid.add_field("sacct_path", optional_string)
clusters_valid.add_field("sinfo_path", optional_string)
clusters_valid.add_field("slurm_version", optional_string, default=None)

# Load the clusters from the configuration file, asserting that it uses the
# predefined format
Expand Down
151 changes: 151 additions & 0 deletions slurm_state/helpers/parser_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,154 @@ def renamer(k, v, res):
res[name] = v

return renamer


def copy_and_stringify(k, v, res):
res[k] = str(v)


def rename_subitems(subitem_dict):
def renamer(k, v, res):
for subitem, name in subitem_dict.items():
res[name] = v[subitem]

return renamer


def translate_with_value_modification(v_modification, translator, **args):
"""
This function returns a translator that includes a modification
on the value which will be transmitted to it

Parameters:
v_modification The function modifying the value before it
is transmitted to the translator
translator The translator called

Returns:
A translator function including the expected value modification
"""
# Some translator can depend on specific arguments. Thus, we call
# them before to apply it and get the translator which has to be
# used
final_translator = translator(**args)

# This helper is used to update the value v before applying the
# translator on the triplet (k, v, res)
def combiner(k, v, res):
final_translator(k, v_modification(v), res)

return combiner


def zero_to_null(v):
"""
Convert the value from 0 to null

Parameter:
v The values to be converted if appliable

Return:
The converted values
"""
# If a value of v equals 0, transform it to None
for (v_k, v_v) in v.items():
if v_v == 0:
v[v_k] = None
# Return v
return v


def rename_and_stringify_subitems(subitem_dict):
def renamer(k, v, res):
for subitem, name in subitem_dict.items():
res[name] = str(v[subitem])

return renamer


def join_subitems(separator, name):
def joiner(k, v, res):
values = []
for _, value in v.items():
values.append(str(value))
res[name] = separator.join(values)

return joiner


def extract_tres_data(k, v, res):
"""
Extract count of the elements present in the value associated to the key "tres"
in the input dictionary. Such a dictionary would present a structure similar as depicted below:
"tres": {
'allocated': [
{'type': 'cpu', 'name': None, 'id': 1, 'count': 4},
{'type': 'mem', 'name': None, 'id': 2, 'count': 40960},
{'type': 'node', 'name': None, 'id': 4, 'count': 1},
{'type': 'billing', 'name': None, 'id': 5, 'count': 1},
{'type': 'gres', 'name': 'gpu', 'id': 1001, 'count': 1}
],
'requested': [
{'type': 'cpu', 'name': None, 'id': 1, 'count': 4},
{'type': 'mem', 'name': None, 'id': 2, 'count': 40960},
{'type': 'node', 'name': None, 'id': 4, 'count': 1},
{'type': 'billing', 'name': None, 'id': 5, 'count': 1},
{'type': 'gres', 'name': 'gpu', 'id': 1001, 'count': 1}
]
}

The dictionaries (and their associated keys) inserted in the job result by this function
for this input should be:
"tres_allocated": {
"billing": 1,
"mem": 40960,
"num_cpus": 4,
"num_gpus": 1,
"num_nodes": 1
}
"tres_requested": {
"billing": 1,
"mem": 40960,
"num_cpus": 4,
"num_gpus": 1,
"num_nodes": 1
}
"""

def get_tres_key(tres_type, tres_name):
"""
Basically, this function is used to rename the element
we want to retrieve regarding the TRES type (as we are
for now only interested by the "count" of the entity)
"""
if tres_type == "mem" or tres_type == "billing":
return tres_type
elif tres_type == "cpu":
return "num_cpus"
elif tres_type == "gres":
if tres_name == "gpu":
return "num_gpus"
else:
return "gres"
elif tres_type == "node":
return "num_nodes"
else:
return None

tres_subdict_names = [
{"sacct_name": "allocated", "cw_name": "tres_allocated"},
{"sacct_name": "requested", "cw_name": "tres_requested"},
]
for tres_subdict_name in tres_subdict_names:
res[
tres_subdict_name["cw_name"]
] = {} # Initialize the "tres_allocated" and the "tres_requested" subdicts
for tres_subdict in v[tres_subdict_name["sacct_name"]]:
tres_key = get_tres_key(
tres_subdict["type"], tres_subdict["name"]
) # Define the key associated to the TRES
if tres_key:
res[tres_subdict_name["cw_name"]][tres_key] = tres_subdict[
"count"
] # Associate the count of the element, as value associated to the key defined previously
59 changes: 59 additions & 0 deletions slurm_state/helpers/ssh_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,62 @@ def open_connection(hostname, username, ssh_key_path, port=22):
ssh_client = None

return ssh_client


def launch_slurm_command(command, hostname, username, ssh_key_filename, port=22):
"""
Launch a Slurm command through SSH and retrieve its response.
Parameters:
command The Slurm command to launch through SSH
hostname The hostname used for the SSH connection to launch the Slurm command
username The username used for the SSH connection to launch the Slurm command
ssh_key_filename The name of the private key in .ssh folder used for the SSH connection to launch the Slurm command
port The port used for the SSH connection to launch the sinfo command
"""
# Print the command to use
print(f"The command launched through SSH is:\n{command}")

# Check the given SSH key
assert ssh_key_filename, "Missing ssh_key_filename from config."

# Now this is the private ssh key that we are using with Paramiko.
ssh_key_path = os.path.join(os.path.expanduser("~"), ".ssh", ssh_key_filename)

# Connect through SSH
try:
ssh_client = open_connection(
hostname, username, ssh_key_path=ssh_key_path, port=port
)
except Exception as inst:
print(
f"Error. Failed to connect to {hostname} to launch the command:\n{command}"
)
print(inst)
return []

# If a connection has been established
if ssh_client:
ssh_stdin, ssh_stdout, ssh_stderr = ssh_client.exec_command(command)

# We should find a better option to retrieve stderr
"""
response_stderr = "".join(ssh_stderr.readlines())
if len(response_stderr):
print(
f"Stderr in sinfo call on {hostname}. This doesn't mean that the call failed entirely, though.\n{response_stderr}"
)
"""
stdout = ssh_stdout.readlines()
ssh_client.close()
return stdout

else:
print(
f"Error. Failed to connect to {hostname} to make the call. Returned `None` but no exception was thrown."
)

# If no SSH connection has been established, raise an exception
raise Exception(
f"No SSH connection has been established while trying to run {command}."
)
33 changes: 19 additions & 14 deletions slurm_state/mongo_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from slurm_state.helpers.gpu_helper import get_cw_gres_description
from slurm_state.helpers.clusters_helper import get_all_clusters

from slurm_state.sinfo_parser import node_parser, generate_node_report
from slurm_state.sacct_parser import job_parser, generate_job_report
# Import parser classes
from slurm_state.parsers.job_parser import JobParser
from slurm_state.parsers.node_parser import NodeParser


def pprint_bulk_result(result):
Expand All @@ -20,19 +21,21 @@ def pprint_bulk_result(result):
print(result.bulk_api_result)


def fetch_slurm_report(parser, cluster_name, report_path):
def fetch_slurm_report(parser, report_path):
"""
Yields elements ready to be slotted into the "slurm" field,
but they have to be processed further before committing to MongoDB.
"""
# Retrieve the cluster name
cluster_name = parser.cluster["name"]

assert os.path.exists(report_path), f"The report path {report_path} is missing."

ctx = get_all_clusters().get(cluster_name, None)
assert ctx is not None, f"{cluster_name} not configured"

with open(report_path, "r") as f:
for e in parser(f):
for e in parser.parser(f):
e["cluster_name"] = cluster_name
yield e

Expand Down Expand Up @@ -136,47 +139,49 @@ def main_read_report_and_update_collection(
# Initialize the time of this operation's beginning
timestamp_start = time.time()

# Retrieve clusters data from the configuration file
clusters = get_all_clusters()
assert cluster_name in clusters

# Check the input parameters
assert entity in ["jobs", "nodes"]

if entity == "jobs":
id_key = (
"job_id" # The id_key is used to determine how to retrieve the ID of a job
)
parser = job_parser # This parser is used to retrieve and format useful information from a sacct job
parser = JobParser(
cluster_name
) # This parser is used to retrieve and format useful information from a sacct job
from_slurm_to_clockwork = slurm_job_to_clockwork_job # This function is used to translate a Slurm job (created through the parser) to a Clockwork job
generate_report = generate_job_report # This function is used to generate the file gathering the job information which will be explained later
elif entity == "nodes":
id_key = (
"name" # The id_key is used to determine how to retrieve the ID of a node
)
parser = node_parser # This parser is used to retrieve and format useful information from a sacct node
parser = NodeParser(
cluster_name
) # This parser is used to retrieve and format useful information from a sacct node
from_slurm_to_clockwork = slurm_node_to_clockwork_node # This function is used to translate a Slurm node (created through the parser) to a Clockwork node
generate_report = generate_node_report # This function is used to generate the file gathering the node information which will be explained later
else:
# Raise an error because it should not happen
raise ValueError(
f'Incorrect value for entity in main_read_sacct_and_update_collection: "{entity}" when it should be "jobs" or "nodes".'
)

# Retrieve clusters data from the configuration file
clusters = get_all_clusters()
assert cluster_name in clusters

## Retrieve entities ##

# Generate a report file if required
if not from_file or not os.path.exists(report_file_path):
print(
f"Generate report file for the {cluster_name} cluster at location {report_file_path}."
)
generate_report(cluster_name, report_file_path)
parser.generate_report(report_file_path)

# Construct an iterator over the list of entities in the report file,
# each one of them is turned into a clockwork job or node, according to applicability
I_clockwork_entities_from_report = map(
from_slurm_to_clockwork,
fetch_slurm_report(parser, cluster_name, report_file_path),
fetch_slurm_report(parser, report_file_path),
)

L_updates_to_do = [] # Entity updates to store in the database if requested
Expand Down
Loading
Loading