Skip to content

Commit

Permalink
Implement thread refresh function and allow select what params to res…
Browse files Browse the repository at this point in the history
…et (#330)
  • Loading branch information
juztas authored Oct 10, 2023
1 parent 97bf921 commit fdf0214
Show file tree
Hide file tree
Showing 20 changed files with 124 additions and 66 deletions.
10 changes: 5 additions & 5 deletions packaging/general/Config-Fetcher
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ DAEMON = None
class MyDaemon(Daemon):
""" My own Deamon override """

def getThreads(self):
def getThreads(self, houreq, dayeq):
"""Multi threading. Allow multiple sites under single FE"""
outThreads = {}
confFetcher = ConfigFetcher(self.logger)
outThreads['DEFAULT'] = confFetcher
return outThreads
for callType in ['DEFAULT']:
if callType not in self.runThreads:
self.runThreads[callType] = ConfigFetcher(self.logger)
self.runThreads[callType].refreshthread(houreq, dayeq)

if __name__ == "__main__":
parser = getParser(DESCRIPTION)
Expand Down
10 changes: 5 additions & 5 deletions packaging/siterm-site-agent/scripts/siterm-debugger
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ DAEMON = None
class MyDaemon(Daemon):
""" My own Deamon override """

def getThreads(self):
def getThreads(self, houreq, dayeq):
"""Multi threading. Allow multiple sites under single FE"""
outThreads = {}
for sitename in self.config.get('general', 'sitename'):
debAgent = Debugger.Debugger(self.config, sitename)
outThreads[sitename] = debAgent
return outThreads
if sitename not in self.runThreads:
debAgent = Debugger.Debugger(self.config, sitename)
self.runThreads[sitename] = debAgent
self.runThreads[sitename].refreshthread(houreq, dayeq)

if __name__ == "__main__":
parser = getParser(DESCRIPTION)
Expand Down
11 changes: 6 additions & 5 deletions packaging/siterm-site-agent/scripts/siterm-prompush
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ DAEMON = None
class MyDaemon(Daemon):
""" My own Deamon override """

def getThreads(self):
def getThreads(self, houreq, dayeq):
"""Multi threading. Allow multiple sites under single FE"""
outThreads = {}
workDir = self.config.get('general', 'privatedir') + "/SiteRM/background/"
# get run ID;
fname = workDir + f"/background-process-{self.inargs.runnum}.json"
Expand All @@ -36,9 +35,11 @@ class MyDaemon(Daemon):
self.sleepTimers['ok'] = int(backgConfig['requestdict']['resolution'])
if self.config.has_option('general', 'sitename'):
for sitename in self.config.get('general', 'sitename'):
promAgent = PromPush(self.config, sitename, backgConfig)
outThreads[sitename] = promAgent
return outThreads
if sitename not in self.runThreads:
promAgent = PromPush(self.config, sitename, backgConfig)
self.runThreads[sitename] = promAgent
self.runThreads[sitename].refreshthread(houreq, dayeq)


if __name__ == "__main__":
parser = getParser(DESCRIPTION)
Expand Down
10 changes: 5 additions & 5 deletions packaging/siterm-site-agent/scripts/siterm-ruler
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ DAEMON = None
class MyDaemon(Daemon):
""" My own Deamon override """

def getThreads(self):
def getThreads(self, houreq, dayeq):
"""Multi threading. Allow multiple sites under single FE"""
outThreads = {}
for sitename in self.config.get('general', 'sitename'):
rulAgent = Ruler.Ruler(self.config, sitename)
outThreads[sitename] = rulAgent
return outThreads
if sitename not in self.runThreads:
rulAgent = Ruler.Ruler(self.config, sitename)
self.runThreads[sitename] = rulAgent
self.runThreads[sitename].refreshthread(houreq, dayeq)

if __name__ == "__main__":
parser = getParser(DESCRIPTION)
Expand Down
11 changes: 6 additions & 5 deletions packaging/siterm-site-agent/scripts/sitermagent-update
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ DAEMON = None
class MyDaemon(Daemon):
""" My own Deamon override """

def getThreads(self):
def getThreads(self, houreq, dayeq):
"""Multi threading. Allow multiple sites under single FE"""
outThreads = {}
for sitename in self.config.get('general', 'sitename'):
recAgent = RecurringAction(self.config, sitename)
outThreads[sitename] = recAgent
return outThreads
if sitename not in self.runThreads:
recAgent = RecurringAction(self.config, sitename)
self.runThreads[sitename] = recAgent
self.runThreads[sitename].refreshthread(houreq, dayeq)


if __name__ == "__main__":
parser = getParser(DESCRIPTION)
Expand Down
13 changes: 7 additions & 6 deletions packaging/siterm-site-fe/scripts/LookUpService-update
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ DAEMON = None
class MyDaemon(Daemon):
""" My own Deamon override """

def getThreads(self):
def getThreads(self, houreq, dayeq):
"""Multi threading. Allow multiple sites under single FE"""
outThreads = {}
for sitename in self.config.get('general', 'sites'):
thr = LS.LookUpService(self.config, sitename)
outThreads[sitename] = thr
return outThreads
for sitename in self.config.get('general', 'sitename'):
if sitename not in self.runThreads:
thr = LS.LookUpService(self.config, sitename)
self.runThreads[sitename] = thr
self.runThreads[sitename].refreshthread(houreq, dayeq)


if __name__ == "__main__":
parser = getParser(DESCRIPTION)
Expand Down
12 changes: 6 additions & 6 deletions packaging/siterm-site-fe/scripts/ProvisioningService-update
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ DAEMON = None
class MyDaemon(Daemon):
""" My own Deamon override """

def getThreads(self):
def getThreads(self, houreq, dayeq):
"""Multi threading. Allow multiple sites under single FE"""
outThreads = {}
for sitename in self.config.get('general', 'sites'):
thr = prsS.ProvisioningService(self.config, sitename)
outThreads[sitename] = thr
return outThreads
for sitename in self.config.get('general', 'sitename'):
if sitename not in self.runThreads:
thr = prsS.ProvisioningService(self.config, sitename)
self.runThreads[sitename] = thr
self.runThreads[sitename].refreshthread(houreq, dayeq)

if __name__ == "__main__":
parser = getParser(DESCRIPTION)
Expand Down
13 changes: 7 additions & 6 deletions packaging/siterm-site-fe/scripts/SNMPMonitoring-update
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ DAEMON = None
class MyDaemon(Daemon):
""" My own Deamon override """

def getThreads(self):
def getThreads(self, houreq, dayeq):
"""Multi threading. Allow multiple sites under single FE"""
outThreads = {}
for sitename in self.config.get('general', 'sites'):
thr = SNMP.SNMPMonitoring(self.config, sitename)
outThreads[sitename] = thr
return outThreads
for sitename in self.config.get('general', 'sitename'):
if sitename not in self.runThreads:
thr = SNMP.SNMPMonitoring(self.config, sitename)
self.runThreads[sitename] = thr
self.runThreads[sitename].refreshthread(houreq, dayeq)


if __name__ == "__main__":
parser = getParser(DESCRIPTION)
Expand Down
14 changes: 7 additions & 7 deletions packaging/siterm-site-fe/scripts/siterm-prompush
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ DAEMON = None
class MyDaemon(Daemon):
""" My own Deamon override """

def getThreads(self):
def getThreads(self, houreq, dayeq):
"""Multi threading. Allow multiple sites under single FE"""
outThreads = {}
workDir = self.config.get('general', 'privatedir') + "/SiteRM/background/"
# get run ID;
fname = workDir + f"/background-process-{self.inargs.runnum}.json"
Expand All @@ -36,11 +35,12 @@ class MyDaemon(Daemon):
self.sleepTimers['ok'] = int(backgConfig['requestdict']['resolution'])
if self.config.has_option('general', 'sites'):
for sitename in self.config.get('general', 'sites'):
promAgent = PromPush(self.config, sitename, backgConfig)
outThreads[sitename] = promAgent
if not outThreads:
raise Exception('BlaBla')
return outThreads
if sitename not in self.runThreads:
promAgent = PromPush(self.config, sitename, backgConfig)
self.runThreads[sitename] = promAgent
self.runThreads[sitename].refreshthread(houreq, dayeq)
if not self.runThreads:
raise Exception('There are no threads configured. Fatal failure')

if __name__ == "__main__":
parser = getParser(DESCRIPTION)
Expand Down
7 changes: 7 additions & 0 deletions src/python/SiteFE/LookUpService/lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ def __init__(self, config, sitename):
workDir = self.config.get(self.sitename, 'privatedir') + "/LookUpService/"
createDirs(workDir)

def refreshthread(self, *_args):
"""Call to refresh thread for this specific class and reset parameters"""
self.config = getGitConfig()
self.dbI = getVal(getDBConn('LookUpService', self), **{'sitename': self.sitename})
self.switch = Switch(self.config, self.sitename)
self.police = PolicyService(self.config, self.sitename)

def checkForModelDiff(self, saveName):
"""Check if models are different."""
currentModel, currentGraph = getCurrentModel(self, False)
Expand Down
11 changes: 10 additions & 1 deletion src/python/SiteFE/ProvisioningService/provisioningService.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, config, sitename):
self.logger = getLoggingObject(config=self.config, service='ProvisioningService')
self.sitename = sitename
self.switch = Switch(config, sitename)
self.dbI = getVal(getDBConn('LookUpService', self), **{'sitename': self.sitename})
self.dbI = getVal(getDBConn('ProvisioningService', self), **{'sitename': self.sitename})
workDir = self.config.get('general', 'privatedir') + "/ProvisioningService/"
createDirs(workDir)
self.yamlconf = {}
Expand All @@ -59,6 +59,15 @@ def __init__(self, config, sitename):
self.lastApplied = None
self.connID = None

def refreshthread(self, *args):
"""Call to refresh thread for this specific class and reset parameters"""
self.config = getGitConfig()
self.dbI = getVal(getDBConn('ProvisioningService', self), **{'sitename': self.sitename})
self.switch = Switch(self.config, self.sitename)
# If day is not equal (means new day) - lets force re-running individual apply
if args[1]:
self.yamlconfuuidActive = {}

def _forceApply(self):
curDate = datetime.datetime.now().strftime('%Y-%m-%d')
if self.lastApplied != curDate:
Expand Down
4 changes: 2 additions & 2 deletions src/python/SiteFE/REST/Modules/PrometheusCalls.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ def __getServiceStates(self, registry, **kwargs):
for service in services:
state = 'UNKNOWN'
runtime = -1
if int(self.timenow - service['updatedate']) < 300:
# If we are not getting service state for 2 mins, leave state as unknown
if int(self.timenow - service['updatedate']) < 600:
# If we are not getting service state for 10 mins, set state as unknown
state = service['servicestate']
runtime = service['runtime']
labels = {'servicename': service['servicename'], 'hostname': service.get('hostname', 'UNSET')}
Expand Down
6 changes: 6 additions & 0 deletions src/python/SiteFE/SNMPMonitoring/snmpPushGateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from SiteRMLibs.MainUtilities import evaldict
from SiteRMLibs.MainUtilities import getVal
from SiteRMLibs.MainUtilities import getDBConn
from SiteRMLibs.MainUtilities import getGitConfig
from prometheus_client import CollectorRegistry, push_to_gateway
from prometheus_client import Info, Gauge

Expand All @@ -31,6 +32,11 @@ def __init__(self, config, sitename, backgConfig):
self.snmpLabels = {'numb': '', 'vlan': '', 'hostname': ''}
self.snmpLabels.update(self.__getMetadataParams())

def refreshthread(self, *_args):
"""Call to refresh thread for this specific class and reset parameters"""
self.config = getGitConfig()
self.dbI = getVal(getDBConn('PrometheusPush', self), **{'sitename': self.sitename})

def __getMetadataParams(self):
"""Get metadata parameters"""
if 'metadata' in self.backgConfig['requestdict']:
Expand Down
6 changes: 6 additions & 0 deletions src/python/SiteFE/SNMPMonitoring/snmpmon.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ def __init__(self, config, sitename):
self.err = []
self.hostconf = {}

def refreshthread(self, *_args):
"""Call to refresh thread for this specific class and reset parameters"""
self.config = getGitConfig()
self.dbI = getVal(getDBConn('SNMPMonitoring', self), **{'sitename': self.sitename})
self.switch = Switch(self.config, self.sitename)

def _start(self):
self.switch.getinfo(False)
self.switches = self.switch.getAllSwitches()
Expand Down
6 changes: 6 additions & 0 deletions src/python/SiteRMAgent/Debugger/Debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ def __init__(self, config, sitename):
self.diragent = contentDB()
self.logger.info("====== Debugger Start Work. Hostname: %s", self.hostname)

def refreshthread(self, *_args):
"""Call to refresh thread for this specific class and reset parameters"""
self.config = getGitConfig()
self.fullURL = getFullUrl(self.config, self.sitename)
self.hostname = self.config.get('agent', 'hostname')

def getData(self, url):
"""Get data from FE."""
self.logger.info(f'Query: {self.fullURL}{url}')
Expand Down
6 changes: 5 additions & 1 deletion src/python/SiteRMAgent/RecurringActions/DTNMain.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ class RecurringAction():
"""Provisioning service communicates with Local controllers and applies
network changes."""
def __init__(self, config, sitename):
self.config = config
self.config = config if config else getGitConfig()
self.logger = getLoggingObject(config=self.config, service='Agent')
self.sitename = sitename

def refreshthread(self, *_args):
"""Call to refresh thread for this specific class and reset parameters"""
self.config = getGitConfig()

def prepareJsonOut(self):
"""Executes all plugins and prepares json output to FE."""
excMsg = ""
Expand Down
8 changes: 8 additions & 0 deletions src/python/SiteRMAgent/Ruler/Ruler.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ def __init__(self, config, sitename):
QOS.__init__(self)
OverlapLib.__init__(self)

def refreshthread(self, *_args):
"""Call to refresh thread for this specific class and reset parameters"""
self.config = getGitConfig()
self.fullURL = getFullUrl(self.config, self.sitename)
self.hostname = self.config.get('agent', 'hostname')
self.layer2 = VInterfaces(self.config, self.sitename)
self.layer3 = Routing(self.config, self.sitename)

def __clean(self):
"""Clean variables before run"""
self.activeIPs = {'ipv4': {}, 'ipv6': {}}
Expand Down
5 changes: 5 additions & 0 deletions src/python/SiteRMLibs/ConfigFetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ def __init__(self, logger):
self.gitObj = GitConfig()
self.config = None

def refreshthread(self, *_args):
"""Call to refresh thread for this specific class and reset parameters"""
self.gitObj = GitConfig()
self.config = None

def _fetchFile(self, name, url):
output = {}
datetimeNow = datetime.datetime.now() + datetime.timedelta(minutes=10)
Expand Down
20 changes: 10 additions & 10 deletions src/python/SiteRMLibs/Daemonizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self, component, inargs, getGitConf=True):
self.pidfile = f"/tmp/end-site-rm-{component}-{self.inargs.runnum}.pid"
self.config = None
self.logger = None
self.runThreads = {}
if getGitConf:
self.config = getGitConfig()
self.logger = getLoggingObject(config=self.config,
Expand All @@ -73,7 +74,7 @@ def __init__(self, component, inargs, getGitConf=True):
self.logger = getLoggingObject(logFile="%s/%s-" % ('/var/log/', component),
logLevel='DEBUG', logType=logType,
service=self.component)
self.sleepTimers = {'ok': 10, 'failure': 30}
self.sleepTimers = {'ok': 30, 'failure': 60}
self.totalRuntime = 0

def _refreshConfig(self):
Expand Down Expand Up @@ -238,12 +239,11 @@ def runLoop(self):
return False
return True

def refreshThreads(self):
def refreshThreads(self, houreq, dayeq):
"""Refresh threads"""
while True:
try:
runThreads = self.getThreads()
return runThreads
self.getThreads(houreq, dayeq)
except SystemExit:
exc = traceback.format_exc()
self.logger.critical("SystemExit!!! Error details: %s", exc)
Expand All @@ -255,14 +255,14 @@ def refreshThreads(self):

def run(self):
"""Run main execution"""
timeeq, currentHour = reCacheConfig(None)
runThreads = self.refreshThreads()
houreq, dayeq, currentHour, currentDay = reCacheConfig(None, None)
self.refreshThreads(houreq, dayeq)
while self.runLoop():
self.runCount += 1
hadFailure = False
stwork = int(getUTCnow())
try:
for sitename, rthread in list(runThreads.items()):
for sitename, rthread in list(self.runThreads.items()):
stwork = int(getUTCnow())
self.logger.info('Start worker for %s site', sitename)
try:
Expand All @@ -288,11 +288,11 @@ def run(self):
if self.totalRuntime != 0 and self.totalRuntime <= int(getUTCnow()):
self.logger.info('Total Runtime expired. Stopping Service')
sys.exit(0)
timeeq, currentHour = reCacheConfig(currentHour)
if not timeeq:
houreq, dayeq, currentHour, currentDay = reCacheConfig(currentHour, currentDay)
if not houreq:
self.logger.info('Re-initiating Service with new configuration from GIT')
self._refreshConfig()
runThreads = self.refreshThreads()
self.refreshThreads(houreq, dayeq)


@staticmethod
Expand Down
Loading

0 comments on commit fdf0214

Please sign in to comment.