diff --git a/SchedulerJobInfo.py b/SchedulerJobInfo.py index d2f8c1e..7991018 100755 --- a/SchedulerJobInfo.py +++ b/SchedulerJobInfo.py @@ -58,8 +58,13 @@ def __init__(self, requeue_time:str=None, wait_time:str=None, run_time:str=None, + timelimit:str=None, + state:str=None, + reason:str=None, + user:str=None, queue:str=None, + node_list:str=None, project:str=None, licenses:str=None, @@ -75,6 +80,7 @@ def __init__(self, ru_oublock:int=None, ru_stime:float=None, ru_utime:float=None, + ru_ttime:float=None, # Put resource_request at end because can contain ',' which is also the CSV separator resource_request:str='', @@ -98,6 +104,7 @@ def __init__(self, submit_time (str): Date and time that the job was submitted. start_time (str): Date and time that the job started on the compute node finish_time (str): Date and time that the job finished. + timelimit (str): Time limit of job. ineligible_pend_time (str): LSF: The time that the job was pending because it was ineligible to run because of unmet dependencies eligible_time (str): Slurm: Date and time when the job became eligible to run. Can be used to calculate ineligible_pend_time. @@ -105,6 +112,9 @@ def __init__(self, wait_time (str): The time that the job waited to start after it was eligible. run_time (str): The time that the job ran. It should be the difference between finish_time and start_time. + state (str): Job state + reason (str): reason for block + user (str): user that submitted job. queue (str): queue that the job was submitted to. project (str): project that the job belongs to licenses (str): comma separated list of licenses used by the job. Format: license1[:int][license2[:int]] @@ -121,6 +131,7 @@ def __init__(self, ru_oublock (int): ru_stime (int): ru_utime (int): + ru_ttime (int): resource_request (str): Additional resources requested by the job, for example, licenses @@ -138,6 +149,7 @@ def __init__(self, (self.submit_time, self.submit_time_dt) = SchedulerJobInfo.fix_datetime(submit_time) (self.start_time, self.start_time_dt) = SchedulerJobInfo.fix_datetime(start_time) (self.finish_time, self.finish_time_dt) = SchedulerJobInfo.fix_datetime(finish_time) + (self.timelimit, self.timelimit_td) = SchedulerJobInfo.fix_duration(timelimit) # Optional fields try: @@ -166,7 +178,11 @@ def __init__(self, logger.warning(f"Invalid run_time: {run_time}") self.run_time = self.run_time_td = None + self.state = state + self.reason = reason + self.user = user self.queue = queue + self.node_list = node_list self.project = project self.licenses = licenses @@ -180,8 +196,9 @@ def __init__(self, self.ru_msgsnd = SchedulerJobInfo.fix_int(ru_msgsnd) self.ru_nswap = SchedulerJobInfo.fix_int(ru_nswap) self.ru_oublock = SchedulerJobInfo.fix_int(ru_oublock) - self.ru_stime = SchedulerJobInfo.fix_duration(ru_stime)[0] - self.ru_utime = SchedulerJobInfo.fix_duration(ru_utime)[0] + (self.ru_stime, self.ru_stime_td) = SchedulerJobInfo.fix_duration(ru_stime) + (self.ru_utime, self.ru_utime_td) = SchedulerJobInfo.fix_duration(ru_utime) + (self.ru_ttime, self.ru_ttime_td) = SchedulerJobInfo.fix_duration(ru_ttime) self.resource_request = resource_request @@ -209,10 +226,17 @@ def __init__(self, self.wait_time_td = self.start_time_dt - self.eligible_time_dt self.wait_time = timedelta_to_string(self.wait_time_td) - if not self.run_time: + if (not self.run_time) and self.finish_time_dt: self.run_time_td = self.finish_time_dt - self.start_time_dt self.run_time = timedelta_to_string(self.run_time_td) + if self.ru_maxrss is None: + # Can make an educated guess + if self.state == 'OUT_OF_MEMORY': + self.ru_maxrss = self.max_mem_gb / self.num_hosts + elif self.run_time_td < timedelta(seconds=60): + self.ru_maxrss = 0 + @staticmethod def from_dict(field_dict: dict): job_id = int(field_dict['job_id']) @@ -222,6 +246,7 @@ def from_dict(field_dict: dict): submit_time = str(field_dict['submit_time']) start_time = str(field_dict['start_time']) finish_time = str(field_dict['finish_time']) + timelimit = str(field_dict['timelimit']) ineligible_pend_time = str(field_dict['ineligible_pend_time']) eligible_time = str(field_dict['eligible_time']) @@ -229,7 +254,11 @@ def from_dict(field_dict: dict): wait_time = str(field_dict['wait_time']) run_time = str(field_dict['run_time']) + state = int(field_dict['state']) + reason = field_dict['reason'] + user = field_dict.get('user', None) queue = field_dict.get('queue', None) + node_list = field_dict.get('node_list', None) project = field_dict.get('project', None) licenses = field_dict.get('licensese', None) @@ -245,6 +274,7 @@ def from_dict(field_dict: dict): ru_oublock = SchedulerJobInfo.fix_int(field_dict['ru_oublock']) ru_stime = str(field_dict['ru_stime']) ru_utime = str(field_dict['ru_utime']) + ru_ttime = str(field_dict['ru_ttime']) resource_request = str(field_dict['resource_request']) @@ -256,6 +286,7 @@ def from_dict(field_dict: dict): submit_time = submit_time, start_time = start_time, finish_time = finish_time, + timelimit = timelimit, # Optional fields ineligible_pend_time = ineligible_pend_time, eligible_time = eligible_time, @@ -263,7 +294,11 @@ def from_dict(field_dict: dict): wait_time = wait_time, run_time = run_time, + state = state, + reason = reason, + user = user, queue = queue, + node_list = node_list, project = project, licenses = licenses, @@ -279,6 +314,7 @@ def from_dict(field_dict: dict): ru_oublock = ru_oublock, ru_stime = ru_stime, ru_utime = ru_utime, + ru_ttime = ru_ttime, resource_request = resource_request, ) @@ -291,6 +327,10 @@ def from_dict(field_dict: dict): def to_dict(self) -> dict: d = self.__dict__.copy() + return d + + def fields(self): + d = self.to_dict() del d['submit_time_dt'] del d['start_time_dt'] del d['finish_time_dt'] @@ -299,10 +339,8 @@ def to_dict(self) -> dict: del d['run_time_td'] del d['ineligible_pend_time_td'] del d['requeue_time_td'] - return d - - def fields(self): - return self.to_dict().keys() + del d['timelimit_dt'] + return d.keys() @staticmethod def fix_datetime(value): @@ -335,7 +373,7 @@ def fix_datetime(value): Returns: tuple(str, datetime): typle with ISO format DateTime string: `YYYY-MM-DDTHH:MM::SS` and datetime object ''' - if value == None: + if value is None or value == 'Unknown': return (None, None) dt_str = None dt = None @@ -415,7 +453,10 @@ def fix_duration(duration): td = str_to_timedelta(duration) else: raise ValueError(f"Invalid type for duration: {duration} has type '{type(duration)}', expected int, float, or str") - duration_str = timedelta_to_string(td) + if td is None: + duration_str = '' + else: + duration_str = timedelta_to_string(td) return (duration_str, td) @staticmethod @@ -495,6 +536,14 @@ def str_to_datetime(string_value: str) -> datetime: ''' if str(type(string_value)) != "": raise ValueError(f"Invalid type for string_value: {string_value} has type '{type(string_value)}', expected str") + if string_value in ['Unknown', 'None']: + return None + + date_pattern = r'^\d{4}-\d{2}-\d{2}$' + match = re.search(date_pattern, string_value) + if match: + string_value += "T00:00:00" + return datetime.strptime(string_value, SchedulerJobInfo.DATETIME_FORMAT).replace(tzinfo=timezone.utc) def datetime_to_str(dt: datetime) -> str: @@ -527,6 +576,12 @@ def str_to_timedelta(string_value: str) -> timedelta: ''' if str(type(string_value)) != "": raise ValueError(f"Invalid type for string_value: {string_value} has type '{type(string_value)}', expected str") + + if string_value == 'UNLIMITED': + return None + elif string_value == '0': + return timedelta(0) + values = string_value.split(':') seconds = float(values.pop()) minutes = int(values.pop()) diff --git a/SchedulerLogParser.py b/SchedulerLogParser.py index fc172c1..6ed0e6a 100755 --- a/SchedulerLogParser.py +++ b/SchedulerLogParser.py @@ -79,14 +79,10 @@ def __init__(self, input_csv: str, output_csv: str, starttime: str=None, endtime if not path.exists(output_dir): makedirs(output_dir) self._output_csv_fh = open(output_csv, 'w', newline='') - self._output_field_names = self._get_job_field_names() - # Profiling showed that the dict writer is slightly slower - self._use_csv_dict_writer = False - if not self._use_csv_dict_writer: - self._csv_writer = csv.writer(self._output_csv_fh, dialect='excel', lineterminator='\n') - else: - self._csv_dict_writer = csv.DictWriter(self._output_csv_fh, self._output_field_names, dialect='excel', lineterminator='\n', extrasaction='ignore') - self._write_csv_header() + self._use_csv_dict_writer = False # Profiling showed DictWriter slightly slower + self._csv_output_initialised = False + self._csv_writer = None + self._csv_dict_writer = None else: self._output_csv_fh = None self._num_output_jobs = 0 @@ -137,20 +133,22 @@ def num_output_jobs(self): return self._num_output_jobs @staticmethod - def _get_job_field_names(): - dummy_job = SchedulerJobInfo( - job_id = '1', - resource_request = 'rusage[mem=6291456,xcelium_sc=1:duration=1m]', - num_cores = 1, - max_mem_gb = 1.1, - num_hosts = 1, - - submit_time = '1970-01-01T00:00:00', - start_time = '1970-01-01T00:00:01', - finish_time = '1970-01-01T00:00:05', - ) + def _get_job_field_names(job = None): + if job is None: + job = SchedulerJobInfo( + job_id = '1', + state = 'COMPLETED', + resource_request = 'rusage[mem=6291456,xcelium_sc=1:duration=1m]', + num_cores = 1, + max_mem_gb = 1.1, + num_hosts = 1, + + submit_time = '1970-01-01T00:00:00', + start_time = '1970-01-01T00:00:01', + finish_time = '1970-01-01T00:00:05', + ) field_names = [] - job_dict = dummy_job.__dict__ + job_dict = job.__dict__ for field_name in job_dict.keys(): if field_name[-3:] in ['_dt', '_td']: continue @@ -158,26 +156,31 @@ def _get_job_field_names(): logger.debug(f"field_names={field_names}") return field_names - def _write_csv_header(self) -> None: + def _init_csv_output(self, job = None) -> None: ''' - Writes the CSV header line to the output csv file. - - Called by the constructor if an output csv filename provided. + Initialise CSV writer, and write header line. + If job provided, use it to determine field names. Raises: RuntimeError: If no output csv file handle exists. ''' + if self._csv_output_initialised: + return + if not self._output_csv_fh: - raise RuntimeError(f"_write_csv_header called without self._output_csv_fh being set.") + raise RuntimeError(f"_init_csv_output called without self._output_csv_fh being set.") + self._output_field_names = self._get_job_field_names(job) if not self._use_csv_dict_writer: if not self._csv_writer: - raise RuntimeError(f"_write_csv_header called without self._csv_writer being set.") + self._csv_writer = csv.writer(self._output_csv_fh, dialect='excel', lineterminator='\n') self._csv_writer.writerow(self._output_field_names) else: if not self._csv_dict_writer: - raise RuntimeError(f"_write_csv_header called without self._csv_dict_writer being set.") + self._csv_dict_writer = csv.DictWriter(self._output_csv_fh, self._output_field_names, dialect='excel', lineterminator='\n', extrasaction='ignore') self._csv_dict_writer.writeheader() + self._csv_output_initialised = True + def write_job_to_csv(self, job) -> None: ''' Write a job to the output csv file. @@ -187,23 +190,16 @@ def write_job_to_csv(self, job) -> None: ''' if not self._output_csv_fh: raise RuntimeError(f"write_job_to_csv called without self._output_csv_fh being set.") - if not self._use_csv_dict_writer: - if not self._csv_writer: - raise RuntimeError(f"write_job_to_csv called without self._csv_writer being set.") - else: - if not self._csv_dict_writer: - raise RuntimeError(f"_write_csv_header called without self._csv_dict_writer being set.") + + self._init_csv_output() if not self._use_csv_dict_writer: - field_values = [] - for field_name in self._output_field_names: - field_value = job.__dict__[field_name] - if field_value == None: - field_value = '' - else: - field_value = str(field_value) - field_values.append(field_value) - self._csv_writer.writerow(field_values) + values = [] + for k in self._output_field_names: + v = job.__dict__[k] + v = '' if v is None else str(v) + values.append(v) + self._csv_writer.writerow(values) else: self._csv_dict_writer.writerow(job.__dict__) self._num_output_jobs += 1 diff --git a/SlurmLogParser.py b/SlurmLogParser.py index bf54a6d..3d0e96c 100755 --- a/SlurmLogParser.py +++ b/SlurmLogParser.py @@ -26,6 +26,11 @@ logger.setLevel(logging.INFO) logger.propagate = False +def camel2snake(x): + # Convert Slurm camel-case field names to our snake-case + s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', x) + return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower() + class SlurmLogParser(SchedulerLogParser): ''' Parse Slurm sacct output to get job completion information. @@ -60,32 +65,56 @@ def __init__(self, sacct_input_file: str=None, sacct_output_file: str=None, outp if not path.exists(sacct_output_dir): makedirs(sacct_output_dir) + self._sacct_input_file_has_header = False + try: + # Analyse first line of input file. If it looks like + # a header line, then use to overwrite SLURM_ACCT_FIELDS + with open(self._sacct_input_file, 'r') as F: + l = F.readline() + elems = l.split('|') + known_fields = set([x[0] for x in SlurmLogParser.SLURM_ACCT_FIELDS]) + if len(set(elems) & known_fields) >= 3: + # Looks like a header line - has at least 3 delimited field names. + self._sacct_input_file_has_header = True + NEW_SLURM_ACCT_FIELDS = [] + for e in elems: + for x in SlurmLogParser.SLURM_ACCT_FIELDS: + if x[0] == e: + NEW_SLURM_ACCT_FIELDS.append(x) + break + SlurmLogParser.SLURM_ACCT_FIELDS = NEW_SLURM_ACCT_FIELDS + except Exception: + pass + self._sacct_fh = None self._line_number = 0 self._eof = False self._parsed_lines = [] SLURM_ACCT_FIELDS = [ - ('State', 's'), # Displays the job status, or state. Put this first so can skip ignored states. - ('JobID', 's'), # The identification number of the job or job step + ('State', 's'), # Displays the job status, or state. Put this first so can skip ignored states. + ('JobID', 's'), # The identification number of the job or job step # Job properties - ('ReqCPUS', 'd'), # Number of requested CPUs - ('ReqMem', 'sd', True), # Minimum required memory for the job in bytes - ('ReqNodes', 'd'), # Requested minimum Node count for the job/step - ('Constraints', 's'), # + ('ReqCPUS', 'd'), # Number of requested CPUs + ('ReqMem', 'sd', True), # Minimum required memory for the job in bytes + ('ReqNodes', 'd'), # Requested minimum Node count for the job/step + ('Reason', 's'), # + ('Constraints', 's'), # # Times - ('Submit', 'dt'), # The time the job was submitted. NOTE: If a job is requeued, the submit time is reset. This is handled by not overwriting fields with the batch step. - ('Eligible', 'dt'), # When the job became eligible to run - ('Start', 'dt'), # Initiation time of the job - ('Elapsed', 'td'), # The job's elapsed time: [DD-[HH:]]MM:SS - ('Suspended', 'td'), # The amount of time a job or job step was suspended - ('End', 'dt'), # Termination time of the job + ('Submit', 'dt'), # The time the job was submitted. NOTE: If a job is requeued, the submit time is reset. This is handled by not overwriting fields with the batch step. + ('Eligible', 'dt'), # When the job became eligible to run + ('Start', 'dt'), # Initiation time of the job + ('Elapsed', 'td'), # The job's elapsed time: [DD-[HH:]]MM:SS + ('Suspended', 'td'), # The amount of time a job or job step was suspended + ('End', 'dt'), # Termination time of the job + ('Timelimit', 'td', True), # Time limit of job - ('ExitCode', 's'), # The exit code returned by the job script or salloc - ('DerivedExitCode', 's'), # The highest exit code returned by the job's job steps + ('ExitCode', 's'), # The exit code returned by the job script or salloc + ('DerivedExitCode', 's'), # The highest exit code returned by the job's job steps - ('AllocNodes', 'd'), # Number of nodes allocated to the job/step - ('NCPUS', 'd'), # Total number of CPUs allocated to the job. Equivalent to AllocCPUS + ('AllocNodes', 'd'), # Number of nodes allocated to the job/step + ('NCPUS', 'd'), # Total number of CPUs allocated to the job. Equivalent to AllocCPUS + ('NodeList', 's'), # Node names allocated to job/step ('MaxDiskRead', 'sd', True), # Maximum number of bytes read by all tasks in job') ('MaxDiskWrite', 'sd', True), # Maximum number of bytes written by all tasks in job @@ -99,6 +128,8 @@ def __init__(self, sacct_input_file: str=None, sacct_output_file: str=None, outp # This field was added, so it should be optional ('Partition', 's'), # The exit code returned by the job script or salloc + + ('User', 's'), # User that submitted job ] SLURM_STATE_CODES = [ @@ -137,6 +168,14 @@ def parse_jobs(self) -> bool: ''' self._parsed_lines = [] self.errors = [] + + if self._sacct_input_file_has_header: + # Discard first line of input file. + if not self._sacct_fh: + self._sacct_fh = open(self._sacct_input_file, 'r') + self._eof = False + line = self._sacct_fh.readline() + job = True while job: job = self.parse_job() @@ -149,6 +188,38 @@ def parse_jobs(self) -> bool: return False return True + def parse_jobs_to_dict(self) -> dict: + ''' + Parse all the jobs from the Slurm sacct command. + + Returns: + bool: Return True if no errors + ''' + self._parsed_lines = [] + self.errors = [] + + if self._sacct_input_file_has_header: + # Discard first line of input file. + if not self._sacct_fh: + self._sacct_fh = open(self._sacct_input_file, 'r') + self._eof = False + line = self._sacct_fh.readline() + + # Use JobID as index + jobs_dict = {} + + job = True + while job: + job = self.parse_job() + if job: + jd = job.to_dict() + idx = jd['job_id'] ; del jd['job_id'] + jobs_dict[idx] = jd + logger.info(f"Parsed {len(jobs_dict)} jobs") + if self.errors: + logger.error(f"{len(self.errors)} errors while parsing jobs") + return jobs_dict + def parse_job(self): ''' Parse a job from the Slurm sacct output. @@ -260,7 +331,7 @@ def _parse_line(self, line): else: raise req_mem_suffix = None - if field_value != None: + if field_value is not None: try: if field_format == 'd': field_value = mem_string_to_int(field_value) @@ -279,10 +350,6 @@ def _parse_line(self, line): else: raise ValueError(f"Invalid format of {field_format} for field {field_name}") except ValueError as e: - if field_name == 'Start' and job_fields['State'] == 'CANCELLED': - # Ignore cancelled jobs that didn't start - logger.debug(f"Ignoring job because it was CANCELLED and did not start.") - return None if empty_field_allowed and field_value == '': field_value = None else: @@ -299,10 +366,6 @@ def _parse_line(self, line): job_fields['State'] = 'CANCELLED' else: raise ValueError(f"Invalid state: {job_fields['State']}") - # Need to stop processing lines with invalid states since following fields may be invalid and cause errors. - if job_fields['State'] in self.SLURM_STATE_CODES_TO_IGNORE: - logger.debug(f" Ignored state: {field_value}") - return None except Exception as e: field_errors += 1 msg = f"Exception while processing fields, {field_name} ({field_format}): {e}\n{line}" @@ -359,7 +422,7 @@ def _process_parsed_lines(self): (job_id, job_fields, first_line_number, field_errors) = self._parsed_lines.pop(0) last_line_number = first_line_number - logger.debug(f"Assembling job {job_id}") + logger.debug(f"Assembling job {job_id})") while self._parsed_lines and self._parsed_lines[0][0] == job_id: # Update fields. Don't overwrite with .update or else blank fields will overwrite non-blank fields. (job_id, next_job_fields, last_line_number, next_field_errors) = self._parsed_lines.pop(0) @@ -367,8 +430,10 @@ def _process_parsed_lines(self): logger.debug(f" Updating job_fields") for field_tuple in self.SLURM_ACCT_FIELDS: field_name = field_tuple[0] - if next_job_fields.get(field_name, None) and not job_fields.get(field_name, None): - job_fields[field_name] = next_job_fields[field_name] + if next_job_fields.get(field_name, None): + if not job_fields.get(field_name, None) or \ + field_name in ['State']: + job_fields[field_name] = next_job_fields[field_name] logger.debug(f" Merged job fields:\n{json.dumps(job_fields, indent=4)}") if field_errors: @@ -386,8 +451,12 @@ def _process_parsed_lines(self): self.errors.append((self._sacct_input_file, first_line_number, msg)) return None + if job_fields['State']=='CANCELLED' and job_fields['Start'] == 'None' and job_fields['Elapsed'] == '00:00:00': + logger.debug(f"Ignoring job {job_id} because it was cancelled and did not run.") + return None + if job_fields['State'] in self.SLURM_STATE_CODES_TO_IGNORE: - logger.debug(f" Ignored state: {field_value}") + logger.debug(f" Ignored state: {job_fields['State']}") return None try: @@ -413,42 +482,48 @@ def _create_job_from_job_fields(self, job_fields): SchedulerJobInfo: Parsed job info ''' logger.debug(f"_create_job_from_job_fields({job_fields})") - if job_fields['AllocNodes'] == 0: + + job_fields = dict(job_fields) # copy + + if job_fields.get('AllocNodes') == 0: job_fields['AllocNodes'] = 1 - if job_fields['ReqMem'] != None: - max_mem_gb = job_fields['ReqMem'] / MEM_GB + if job_fields.get('ReqMem') is not None: + job_fields['ReqMemGB'] = job_fields['ReqMem'] / MEM_GB + else: + job_fields['ReqMemGB'] = job_fields.get('ReqMem') + del job_fields['ReqMem'] + + if job_fields.get('ExitCode') is not None: + job_fields['ExitStatus'] = exit_status + del job_fields['ExitCode'] else: - max_mem_gb = None - job = SchedulerJobInfo( - job_id = job_fields['JobID'], - num_cores = job_fields['NCPUS'], - max_mem_gb = max_mem_gb, - num_hosts = job_fields['AllocNodes'], - - submit_time = job_fields['Submit'], - eligible_time = job_fields['Eligible'], - start_time = job_fields['Start'], - run_time = job_fields['Elapsed'], - finish_time = job_fields['End'], - - queue = job_fields['Partition'], - - # ExitCode is {returncode}:{signal} - exit_status = job_fields['ExitCode'].split(':')[0], - - ru_inblock = job_fields['MaxDiskRead'], - ru_majflt = job_fields['MaxPages'], - ru_maxrss = job_fields['MaxRSS'], - # ru_minflt = job_fields['ru_minflt'], - # ru_msgrcv = job_fields['ru_msgrcv'], - # ru_msgsnd = job_fields['ru_msgsnd'], - # ru_nswap = job_fields['ru_nswap'], - ru_oublock = job_fields['MaxDiskWrite'], - ru_stime = job_fields['SystemCPU'], - ru_utime = job_fields['UserCPU'], - - resource_request = job_fields['Constraints'], - ) + exit_status = None + + job_fields = {camel2snake(k):job_fields[k] for k in job_fields} + sacct_renames={ 'ncpus': 'num_cores', + 'alloc_nodes': 'num_hosts', + 'submit': 'submit_time', + 'eligible': 'eligible_time', + 'start': 'start_time', + 'elapsed': 'run_time', + 'end': 'finish_time', + 'partition': 'queue', + 'req_mem_gb': 'max_mem_gb', + 'max_disk_read': 'ru_inblock', + 'max_disk_write': 'ru_oublock', + 'max_pages': 'ru_majflt', + 'max_rss': 'ru_minflt', + 'system_cpu': 'ru_stime', + 'user_cpu': 'ru_utime', + 'total_cpu': 'ru_ttime', + 'constraints': 'resource_requests' + } + for k,v in list(job_fields.items()): + if k in sacct_renames: + job_fields[sacct_renames[k]] = v + del job_fields[k] + job = SchedulerJobInfo(**job_fields) + return job def _clean_up(self):