Skip to content

Commit

Permalink
Improved path handling and directory creations
Browse files Browse the repository at this point in the history
Fixed bug: rrdtools_exe.dump() was wrongly decoding the output of subprocessSupport.iexe_cmd() which is already a string

Code improvements, no specific bug fix:
creating also parent directories,
eliminated nested os.path.join calls,
better docstrings,
consolidated tmp2final verifyHelper functions,
eliminated global variable use,
use os.path.join instead of string concatemation with /
  • Loading branch information
mambelli committed Aug 16, 2023
1 parent 3e20e9e commit 9f3f574
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 378 deletions.
180 changes: 48 additions & 132 deletions factory/glideFactoryMonitorAggregator.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,14 @@
# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0

#
# Project:
# glideinWMS
#
# File Version:
#
# Description:
# This module implements the functions needed
# to aggregate the monitoring fo the glidein factory
#
# Author:
# Igor Sfiligoi (May 23rd 2007)
#

import copy
import json
import os.path
import pickle
import shutil
import tempfile
import time

from glideinwms.factory import glideFactoryMonitoring
Expand Down Expand Up @@ -89,154 +77,68 @@ def rrd_site(name):
type_strings = {"Status": "Status", "Requested": "Req", "ClientMonitor": "Client"}

##############################################################################
rrd_problems_found = False
# Function used by Factory reconfig/upgrade
# No logging available, output is to stdout/err


def verifyHelper(filename, dict, fix_rrd=False):
"""
Helper function for verifyRRD. Checks one file,
prints out errors. if fix_rrd, will attempt to
dump out rrd to xml, add the missing attributes,
then restore. Original file is obliterated.
@param filename: filename of rrd to check
@param dict: expected dictionary
@param fix_rrd: if true, will attempt to add missing attrs
"""
global rrd_problems_found
if not os.path.exists(filename):
print("WARNING: %s missing, will be created on restart" % (filename))
return
rrd_obj = rrdSupport.rrdSupport()
(missing, extra) = rrd_obj.verify_rrd(filename, dict)
for attr in extra:
print(f"ERROR: {filename} has extra attribute {attr}")
if fix_rrd:
print("ERROR: fix_rrd cannot fix extra attributes")
if not fix_rrd:
for attr in missing:
print(f"ERROR: {filename} missing attribute {attr}")
if len(missing) > 0:
rrd_problems_found = True
if fix_rrd and (len(missing) > 0):
(f, tempfilename) = tempfile.mkstemp()
(out, tempfilename2) = tempfile.mkstemp()
(restored, restoredfilename) = tempfile.mkstemp()
backup_str = str(int(time.time())) + ".backup"
print(f"Fixing {filename}... (backed up to {filename + backup_str})")
os.close(out)
os.close(restored)
os.unlink(restoredfilename)
# Use exe version since dump, restore not available in rrdtool
dump_obj = rrdSupport.rrdtool_exe()
outstr = dump_obj.dump(filename)
for line in outstr:
os.write(f, "%s\n" % line)
os.close(f)
# Move file to backup location
shutil.move(filename, filename + backup_str)
rrdSupport.addDataStore(tempfilename, tempfilename2, missing)
dump_obj.restore(tempfilename2, restoredfilename)
os.unlink(tempfilename)
os.unlink(tempfilename2)
shutil.move(restoredfilename, filename)
if len(extra) > 0:
rrd_problems_found = True


def verifyRRD(fix_rrd=False):
def verifyRRD(fix_rrd=False, backup=True):
"""
Go through all known monitoring rrds and verify that they
match existing schema (could be different if an upgrade happened)
If fix_rrd is true, then also attempt to add any missing attributes.
Args:
fix_rrd (bool): if True, will attempt to add missing attrs
backup (bool): if True, backup the old RRD before fixing
Returns:
bool: True if all OK, False if there is a problem w/ RRD files
"""
global rrd_problems_found
global monitorAggregatorConfig
rrd_problems_found = False
mon_dir = monitorAggregatorConfig.monitor_dir

# Factory monitoring dictionaries
status_dict = {}
completed_stats_dict = {}
completed_waste_dict = {}
counts_dict = {}

# initialize the RRD dictionaries to match the current schema for verification
for tp in list(status_attributes.keys()):
if tp in list(type_strings.keys()):
tp_str = type_strings[tp]
attributes_tp = status_attributes[tp]
for a in attributes_tp:
status_dict[f"{tp_str}{a}"] = None

for jobrange in glideFactoryMonitoring.getAllJobRanges():
completed_stats_dict[f"JobsNr_{jobrange}"] = None
for timerange in glideFactoryMonitoring.getAllTimeRanges():
completed_stats_dict[f"Lasted_{timerange}"] = None
completed_stats_dict[f"JobsLasted_{timerange}"] = None

for jobtype in glideFactoryMonitoring.getAllJobTypes():
for timerange in glideFactoryMonitoring.getAllMillRanges():
completed_waste_dict[f"{jobtype}_{timerange}"] = None

for jobtype in ("Entered", "Exited", "Status"):
for jobstatus in ("Wait", "Idle", "Running", "Held"):
counts_dict[f"{jobtype}{jobstatus}"] = None
for jobstatus in ("Completed", "Removed"):
counts_dict["{}{}".format("Entered", jobstatus)] = None
# FROM: lib2to3.fixes.fix_ws_comma
# completed_waste_dict["%s_%s"%(jobtype, timerange)]=None
#
# for jobtype in ('Entered', 'Exited', 'Status'):
# for jobstatus in ('Wait', 'Idle', 'Running', 'Held'):
# counts_dict["%s%s"%(jobtype, jobstatus)]=None
# for jobstatus in ('Completed', 'Removed'):
# counts_dict["%s%s"%('Entered', jobstatus)]=None
#
# verifyHelper(os.path.join(total_dir,
# "Status_Attributes.rrd"), status_dict, fix_rrd)
# verifyHelper(os.path.join(total_dir,
# "Log_Completed.rrd"),
# glideFactoryMonitoring.getLogCompletedDefaults(), fix_rrd)
# verifyHelper(os.path.join(total_dir,
# "Log_Completed_Stats.rrd"), completed_stats_dict, fix_rrd)
# verifyHelper(os.path.join(total_dir,
# "Log_Completed_WasteTime.rrd"), completed_waste_dict, fix_rrd)
# verifyHelper(os.path.join(total_dir,
# "Log_Counts.rrd"), counts_dict, fix_rrd)
# for filename in os.listdir(dir):
# if filename[:6]=="entry_":
# entrydir=os.path.join(dir, filename)
# for subfilename in os.listdir(entrydir):
# if subfilename[:9]=="frontend_":
# current_dir=os.path.join(entrydir, subfilename)
# verifyHelper(os.path.join(current_dir,
# "Status_Attributes.rrd"), status_dict, fix_rrd)
# verifyHelper(os.path.join(current_dir,
# "Log_Completed.rrd"),
# glideFactoryMonitoring.getLogCompletedDefaults(), fix_rrd)
# verifyHelper(os.path.join(current_dir,
# "Log_Completed_Stats.rrd"), completed_stats_dict, fix_rrd)
# verifyHelper(os.path.join(current_dir,
# "Log_Completed_WasteTime.rrd"),
# completed_waste_dict, fix_rrd)
# verifyHelper(os.path.join(current_dir,
# "Log_Counts.rrd"), counts_dict, fix_rrd)
# return not rrd_problems_found

counts_dict[f"Entered{jobstatus}"] = None
completed_dict = glideFactoryMonitoring.getLogCompletedDefaults()

rrdict = {
"Status_Attributes.rrd": status_dict,
"Log_Completed.rrd": completed_dict,
"Log_Completed_Stats.rrd": completed_stats_dict,
"Log_Completed_WasteTime.rrd": completed_waste_dict,
"Log_Counts.rrd": counts_dict,
}

# check all the existing files
if not os.path.isdir(mon_dir):
print(f"WARNING: monitor directory '{mon_dir}' does not exist, skipping rrd verification.")
return True
for dir_name, sdir_name, f_list in os.walk(mon_dir):
for file_name in f_list:
if file_name in list(rrdict.keys()):
verifyHelper(os.path.join(dir_name, file_name), rrdict[file_name], fix_rrd)

if rrdSupport.verifyHelper(os.path.join(dir_name, file_name), rrdict[file_name], fix_rrd, backup):
rrd_problems_found = True
return not rrd_problems_found


Expand Down Expand Up @@ -280,11 +182,12 @@ def aggregateStatus(in_downtime):
for entry in monitorAggregatorConfig.entries:
# load entry status file
status_fname = os.path.join(
os.path.join(monitorAggregatorConfig.monitor_dir, "entry_" + entry), monitorAggregatorConfig.status_relname
monitorAggregatorConfig.monitor_dir, f"entry_{entry}", monitorAggregatorConfig.status_relname
)
# load entry completed data file
completed_data_fname = os.path.join(
os.path.join(monitorAggregatorConfig.monitor_dir, "entry_" + entry),
monitorAggregatorConfig.monitor_dir,
f"entry_{entry}",
monitorAggregatorConfig.completed_data_relname,
)
completed_data_fp = None
Expand Down Expand Up @@ -377,7 +280,7 @@ def aggregateStatus(in_downtime):
else:
# All other fields could be numbers or something else
try:
# if there already, sum
# if is there already, sum
if a in tela:
tela[a] += int(ela[a])
else:
Expand Down Expand Up @@ -501,11 +404,13 @@ def aggregateStatus(in_downtime):
a_el = int(tp_el[a])
val_dict[f"{tp_str}{a}"] = a_el

glideFactoryMonitoring.monitoringConfig.write_rrd_multi("total/Status_Attributes", "GAUGE", updated, val_dict)
glideFactoryMonitoring.monitoringConfig.write_rrd_multi(
os.path.join("total", "Status_Attributes"), "GAUGE", updated, val_dict
)

# Frontend total rrds across all factories
for fe in list(status_fe["frontends"].keys()):
glideFactoryMonitoring.monitoringConfig.establish_dir("total/%s" % ("frontend_" + fe))
glideFactoryMonitoring.monitoringConfig.establish_dir(os.path.join("total", f"frontend_{fe}"))
for tp in list(status_fe["frontends"][fe].keys()):
# values (RRD type) - Status or Requested
if not (tp in list(type_strings.keys())):
Expand All @@ -520,7 +425,7 @@ def aggregateStatus(in_downtime):
a_el = int(tp_el[a])
val_dict[f"{tp_str}{a}"] = a_el
glideFactoryMonitoring.monitoringConfig.write_rrd_multi(
"total/%s/Status_Attributes" % ("frontend_" + fe), "GAUGE", updated, val_dict
os.path.join("total", f"frontend_{fe}", "Status_Attributes"), "GAUGE", updated, val_dict
)

return status
Expand All @@ -547,13 +452,20 @@ def aggregateJobsSummary():
for entry in monitorAggregatorConfig.entries:
# load entry log summary file
status_fname = os.path.join(
os.path.join(monitorAggregatorConfig.monitor_dir, "entry_" + entry),
monitorAggregatorConfig.monitor_dir,
f"entry_{entry}",
monitorAggregatorConfig.jobsummary_relname,
)
try:
with open(status_fname, "rb") as fd:
entry_joblist = pickle.load(fd)
except OSError:
# Errors with the file, e.g. FileNotFoundError, IsADirectoryError, PermissionError
logSupport.log.debug(f"Missing file {status_fname}: ignoring and continuing")
continue
except (EOFError, pickle.UnpicklingError):
# Errors with the file content
logSupport.log.debug(f"Empty or corrupted pickle file {status_fname}: ignoring and continuing")
continue
schedd_name = entry_joblist.get("schedd_name", None)
pool_name = entry_joblist.get("collector_name", None)
Expand Down Expand Up @@ -631,13 +543,15 @@ def aggregateLogSummary():
for entry in monitorAggregatorConfig.entries:
# load entry log summary file
status_fname = os.path.join(
os.path.join(monitorAggregatorConfig.monitor_dir, "entry_" + entry),
monitorAggregatorConfig.monitor_dir,
f"entry_{entry}",
monitorAggregatorConfig.logsummary_relname,
)

try:
entry_data = xmlParse.xmlfile2dict(status_fname, always_singular_list=["Fraction", "TimeRange", "Range"])
except OSError:
logSupport.log.debug(f"Missing file {status_fname}: ignoring and continuing")
continue # file not found, ignore

# update entry
Expand Down Expand Up @@ -913,16 +827,18 @@ def aggregateRRDStats(log=logSupport.log):
# assigns the data from every site to 'stats'
stats = {}
for entry in monitorAggregatorConfig.entries:
rrd_fname = os.path.join(os.path.join(monitorAggregatorConfig.monitor_dir, "entry_" + entry), rrd_site(rrd))
rrd_fname = os.path.join(monitorAggregatorConfig.monitor_dir, f"entry_{entry}", rrd_site(rrd))
try:
stats[entry] = xmlParse.xmlfile2dict(rrd_fname, always_singular_list={"timezone": {}})
except FileNotFoundError:
log.debug(
f"aggregateRRDStats {rrd_fname} exception: parse_xml, IOError, File not found (OK if first time)"
)
except OSError:
if os.path.exists(rrd_fname):
log.debug("aggregateRRDStats %s exception: parse_xml, IOError" % rrd_fname)
else:
log.debug(f"aggregateRRDStats {rrd_fname} exception: parse_xml, IOError")
if not os.path.exists(rrd_fname):
log.debug(
"aggregateRRDStats %s exception: parse_xml, IOError, File not existing (OK if first time)"
% rrd_fname
f"aggregateRRDStats {rrd_fname} exception: parse_xml, IOError, File not found (OK if first time) - should have been FileNotFoundError"
)

stats_entries = list(stats.keys())
Expand Down
Loading

0 comments on commit 9f3f574

Please sign in to comment.