Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse almost-any sacct output #101

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 64 additions & 9 deletions SchedulerJobInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,13 @@ def __init__(self,
requeue_time:str=None,
wait_time:str=None,
run_time:str=None,
timelimit:str=None,

state:str=None,
reason:str=None,
user:str=None,
queue:str=None,
node_list:str=None,
project:str=None,
licenses:str=None,

Expand All @@ -75,6 +80,7 @@ def __init__(self,
ru_oublock:int=None,
ru_stime:float=None,
ru_utime:float=None,
ru_ttime:float=None,

# Put resource_request at end because can contain ',' which is also the CSV separator
resource_request:str='',
Expand All @@ -98,13 +104,17 @@ def __init__(self,
submit_time (str): Date and time that the job was submitted.
start_time (str): Date and time that the job started on the compute node
finish_time (str): Date and time that the job finished.
timelimit (str): Time limit of job.

ineligible_pend_time (str): LSF: The time that the job was pending because it was ineligible to run because of unmet dependencies
eligible_time (str): Slurm: Date and time when the job became eligible to run. Can be used to calculate ineligible_pend_time.
requeue_time (str): LSF: The job's requeue time.
wait_time (str): The time that the job waited to start after it was eligible.
run_time (str): The time that the job ran. It should be the difference between finish_time and start_time.

state (str): Job state
reason (str): reason for block
user (str): user that submitted job.
queue (str): queue that the job was submitted to.
project (str): project that the job belongs to
licenses (str): comma separated list of licenses used by the job. Format: license1[:int][license2[:int]]
Expand All @@ -121,6 +131,7 @@ def __init__(self,
ru_oublock (int):
ru_stime (int):
ru_utime (int):
ru_ttime (int):

resource_request (str): Additional resources requested by the job, for example, licenses

Expand All @@ -138,6 +149,7 @@ def __init__(self,
(self.submit_time, self.submit_time_dt) = SchedulerJobInfo.fix_datetime(submit_time)
(self.start_time, self.start_time_dt) = SchedulerJobInfo.fix_datetime(start_time)
(self.finish_time, self.finish_time_dt) = SchedulerJobInfo.fix_datetime(finish_time)
(self.timelimit, self.timelimit_td) = SchedulerJobInfo.fix_duration(timelimit)

# Optional fields
try:
Expand Down Expand Up @@ -166,7 +178,11 @@ def __init__(self,
logger.warning(f"Invalid run_time: {run_time}")
self.run_time = self.run_time_td = None

self.state = state
self.reason = reason
self.user = user
self.queue = queue
self.node_list = node_list
self.project = project
self.licenses = licenses

Expand All @@ -180,8 +196,9 @@ def __init__(self,
self.ru_msgsnd = SchedulerJobInfo.fix_int(ru_msgsnd)
self.ru_nswap = SchedulerJobInfo.fix_int(ru_nswap)
self.ru_oublock = SchedulerJobInfo.fix_int(ru_oublock)
self.ru_stime = SchedulerJobInfo.fix_duration(ru_stime)[0]
self.ru_utime = SchedulerJobInfo.fix_duration(ru_utime)[0]
(self.ru_stime, self.ru_stime_td) = SchedulerJobInfo.fix_duration(ru_stime)
(self.ru_utime, self.ru_utime_td) = SchedulerJobInfo.fix_duration(ru_utime)
(self.ru_ttime, self.ru_ttime_td) = SchedulerJobInfo.fix_duration(ru_ttime)

self.resource_request = resource_request

Expand Down Expand Up @@ -209,10 +226,17 @@ def __init__(self,
self.wait_time_td = self.start_time_dt - self.eligible_time_dt
self.wait_time = timedelta_to_string(self.wait_time_td)

if not self.run_time:
if (not self.run_time) and self.finish_time_dt:
self.run_time_td = self.finish_time_dt - self.start_time_dt
self.run_time = timedelta_to_string(self.run_time_td)

if self.ru_maxrss is None:
# Can make an educated guess
if self.state == 'OUT_OF_MEMORY':
self.ru_maxrss = self.max_mem_gb / self.num_hosts
elif self.run_time_td < timedelta(seconds=60):
self.ru_maxrss = 0

@staticmethod
def from_dict(field_dict: dict):
job_id = int(field_dict['job_id'])
Expand All @@ -222,14 +246,19 @@ def from_dict(field_dict: dict):
submit_time = str(field_dict['submit_time'])
start_time = str(field_dict['start_time'])
finish_time = str(field_dict['finish_time'])
timelimit = str(field_dict['timelimit'])

ineligible_pend_time = str(field_dict['ineligible_pend_time'])
eligible_time = str(field_dict['eligible_time'])
requeue_time = str(field_dict['requeue_time'])
wait_time = str(field_dict['wait_time'])
run_time = str(field_dict['run_time'])

state = int(field_dict['state'])
reason = field_dict['reason']
user = field_dict.get('user', None)
queue = field_dict.get('queue', None)
node_list = field_dict.get('node_list', None)
project = field_dict.get('project', None)
licenses = field_dict.get('licensese', None)

Expand All @@ -245,6 +274,7 @@ def from_dict(field_dict: dict):
ru_oublock = SchedulerJobInfo.fix_int(field_dict['ru_oublock'])
ru_stime = str(field_dict['ru_stime'])
ru_utime = str(field_dict['ru_utime'])
ru_ttime = str(field_dict['ru_ttime'])

resource_request = str(field_dict['resource_request'])

Expand All @@ -256,14 +286,19 @@ def from_dict(field_dict: dict):
submit_time = submit_time,
start_time = start_time,
finish_time = finish_time,
timelimit = timelimit,
# Optional fields
ineligible_pend_time = ineligible_pend_time,
eligible_time = eligible_time,
requeue_time = requeue_time,
wait_time = wait_time,
run_time = run_time,

state = state,
reason = reason,
user = user,
queue = queue,
node_list = node_list,
project = project,
licenses = licenses,

Expand All @@ -279,6 +314,7 @@ def from_dict(field_dict: dict):
ru_oublock = ru_oublock,
ru_stime = ru_stime,
ru_utime = ru_utime,
ru_ttime = ru_ttime,

resource_request = resource_request,
)
Expand All @@ -291,6 +327,10 @@ def from_dict(field_dict: dict):

def to_dict(self) -> dict:
d = self.__dict__.copy()
return d

def fields(self):
d = self.to_dict()
del d['submit_time_dt']
del d['start_time_dt']
del d['finish_time_dt']
Expand All @@ -299,10 +339,8 @@ def to_dict(self) -> dict:
del d['run_time_td']
del d['ineligible_pend_time_td']
del d['requeue_time_td']
return d

def fields(self):
return self.to_dict().keys()
del d['timelimit_dt']
return d.keys()

@staticmethod
def fix_datetime(value):
Expand Down Expand Up @@ -335,7 +373,7 @@ def fix_datetime(value):
Returns:
tuple(str, datetime): typle with ISO format DateTime string: `YYYY-MM-DDTHH:MM::SS` and datetime object
'''
if value == None:
if value is None or value == 'Unknown':
return (None, None)
dt_str = None
dt = None
Expand Down Expand Up @@ -415,7 +453,10 @@ def fix_duration(duration):
td = str_to_timedelta(duration)
else:
raise ValueError(f"Invalid type for duration: {duration} has type '{type(duration)}', expected int, float, or str")
duration_str = timedelta_to_string(td)
if td is None:
duration_str = ''
else:
duration_str = timedelta_to_string(td)
return (duration_str, td)

@staticmethod
Expand Down Expand Up @@ -495,6 +536,14 @@ def str_to_datetime(string_value: str) -> datetime:
'''
if str(type(string_value)) != "<class 'str'>":
raise ValueError(f"Invalid type for string_value: {string_value} has type '{type(string_value)}', expected str")
if string_value in ['Unknown', 'None']:
return None

date_pattern = r'^\d{4}-\d{2}-\d{2}$'
match = re.search(date_pattern, string_value)
if match:
string_value += "T00:00:00"

return datetime.strptime(string_value, SchedulerJobInfo.DATETIME_FORMAT).replace(tzinfo=timezone.utc)

def datetime_to_str(dt: datetime) -> str:
Expand Down Expand Up @@ -527,6 +576,12 @@ def str_to_timedelta(string_value: str) -> timedelta:
'''
if str(type(string_value)) != "<class 'str'>":
raise ValueError(f"Invalid type for string_value: {string_value} has type '{type(string_value)}', expected str")

if string_value == 'UNLIMITED':
return None
elif string_value == '0':
return timedelta(0)

values = string_value.split(':')
seconds = float(values.pop())
minutes = int(values.pop())
Expand Down
82 changes: 39 additions & 43 deletions SchedulerLogParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,10 @@ def __init__(self, input_csv: str, output_csv: str, starttime: str=None, endtime
if not path.exists(output_dir):
makedirs(output_dir)
self._output_csv_fh = open(output_csv, 'w', newline='')
self._output_field_names = self._get_job_field_names()
# Profiling showed that the dict writer is slightly slower
self._use_csv_dict_writer = False
if not self._use_csv_dict_writer:
self._csv_writer = csv.writer(self._output_csv_fh, dialect='excel', lineterminator='\n')
else:
self._csv_dict_writer = csv.DictWriter(self._output_csv_fh, self._output_field_names, dialect='excel', lineterminator='\n', extrasaction='ignore')
self._write_csv_header()
self._use_csv_dict_writer = False # Profiling showed DictWriter slightly slower
self._csv_output_initialised = False
self._csv_writer = None
self._csv_dict_writer = None
else:
self._output_csv_fh = None
self._num_output_jobs = 0
Expand Down Expand Up @@ -137,47 +133,54 @@ def num_output_jobs(self):
return self._num_output_jobs

@staticmethod
def _get_job_field_names():
dummy_job = SchedulerJobInfo(
job_id = '1',
resource_request = 'rusage[mem=6291456,xcelium_sc=1:duration=1m]',
num_cores = 1,
max_mem_gb = 1.1,
num_hosts = 1,

submit_time = '1970-01-01T00:00:00',
start_time = '1970-01-01T00:00:01',
finish_time = '1970-01-01T00:00:05',
)
def _get_job_field_names(job = None):
if job is None:
job = SchedulerJobInfo(
job_id = '1',
state = 'COMPLETED',
resource_request = 'rusage[mem=6291456,xcelium_sc=1:duration=1m]',
num_cores = 1,
max_mem_gb = 1.1,
num_hosts = 1,

submit_time = '1970-01-01T00:00:00',
start_time = '1970-01-01T00:00:01',
finish_time = '1970-01-01T00:00:05',
)
field_names = []
job_dict = dummy_job.__dict__
job_dict = job.__dict__
for field_name in job_dict.keys():
if field_name[-3:] in ['_dt', '_td']:
continue
field_names.append(field_name)
logger.debug(f"field_names={field_names}")
return field_names

def _write_csv_header(self) -> None:
def _init_csv_output(self, job = None) -> None:
'''
Writes the CSV header line to the output csv file.

Called by the constructor if an output csv filename provided.
Initialise CSV writer, and write header line.
If job provided, use it to determine field names.

Raises:
RuntimeError: If no output csv file handle exists.
'''
if self._csv_output_initialised:
return

if not self._output_csv_fh:
raise RuntimeError(f"_write_csv_header called without self._output_csv_fh being set.")
raise RuntimeError(f"_init_csv_output called without self._output_csv_fh being set.")
self._output_field_names = self._get_job_field_names(job)
if not self._use_csv_dict_writer:
if not self._csv_writer:
raise RuntimeError(f"_write_csv_header called without self._csv_writer being set.")
self._csv_writer = csv.writer(self._output_csv_fh, dialect='excel', lineterminator='\n')
self._csv_writer.writerow(self._output_field_names)
else:
if not self._csv_dict_writer:
raise RuntimeError(f"_write_csv_header called without self._csv_dict_writer being set.")
self._csv_dict_writer = csv.DictWriter(self._output_csv_fh, self._output_field_names, dialect='excel', lineterminator='\n', extrasaction='ignore')
self._csv_dict_writer.writeheader()

self._csv_output_initialised = True

def write_job_to_csv(self, job) -> None:
'''
Write a job to the output csv file.
Expand All @@ -187,23 +190,16 @@ def write_job_to_csv(self, job) -> None:
'''
if not self._output_csv_fh:
raise RuntimeError(f"write_job_to_csv called without self._output_csv_fh being set.")
if not self._use_csv_dict_writer:
if not self._csv_writer:
raise RuntimeError(f"write_job_to_csv called without self._csv_writer being set.")
else:
if not self._csv_dict_writer:
raise RuntimeError(f"_write_csv_header called without self._csv_dict_writer being set.")

self._init_csv_output()

if not self._use_csv_dict_writer:
field_values = []
for field_name in self._output_field_names:
field_value = job.__dict__[field_name]
if field_value == None:
field_value = ''
else:
field_value = str(field_value)
field_values.append(field_value)
self._csv_writer.writerow(field_values)
values = []
for k in self._output_field_names:
v = job.__dict__[k]
v = '' if v is None else str(v)
values.append(v)
self._csv_writer.writerow(values)
else:
self._csv_dict_writer.writerow(job.__dict__)
self._num_output_jobs += 1
Expand Down
Loading