Skip to content

Commit

Permalink
Upgrade to Manage-Resources-V3 and Manage-Glue2 like
Browse files Browse the repository at this point in the history
  • Loading branch information
jpnavarro committed Nov 18, 2020
1 parent 9012332 commit 602b799
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 247 deletions.
232 changes: 114 additions & 118 deletions bin/route_rdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,21 @@
# Process RDR2 information from a source (http, https, file) to a destination (analyze display, file, warehouse)
# Can also subscribe from AMQP
#
# TODO:
# ...
import argparse
from datetime import datetime, tzinfo, timedelta
import http.client as httplib
import json
import logging
import logging.handlers
import os
from pid import PidFile
import pwd
import re
import sys
import argparse
import logging
import logging.handlers
import shutil
import signal
import datetime
from datetime import datetime, tzinfo, timedelta
from time import sleep
try:
import http.client as httplib
except ImportError:
import httplib
import json
import ssl
import shutil
from time import sleep

import django
django.setup()
Expand All @@ -32,7 +27,6 @@
from rdr_db.models import *
from processing_status.process import ProcessingActivity

from daemon import runner
import pdb

class UTC(tzinfo):
Expand All @@ -44,85 +38,97 @@ def dst(self, dt):
return timedelta(0)
utc = UTC()

class HandleRDR():
def __init__(self):
self.args = None
self.config = {}
self.src = {}
self.dest = {}
for var in ['uri', 'scheme', 'path', 'display']: # Where <full> contains <type>:<obj>
self.src[var] = None
self.dest[var] = None
self.peak_sleep = 10 * 60 # 10 minutes in seconds during peak business hours
self.off_sleep = 60 * 60 # 60 minutes in seconds during off hours
self.max_stale = 24 * 60 * 60 # 24 hours in seconds force refresh
# These attributes have their own database column
# Some fields exist in both parent and sub-resources, while others only in one
# Those in one will be left empty in the other, or inherit from the parent
self.have_column = ['resource_id', 'info_resourceid',
'resource_descriptive_name', 'resource_description',
'project_affiliation', 'provider_level',
'resource_status', 'current_statuses', 'updated_at']

default_file = 'file:./rdr.json'
# Used during initialization before loggin is enabled
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)

class Router():
def __init__(self):
parser = argparse.ArgumentParser(epilog='File SRC|DEST syntax: file:<file path and name')
parser.add_argument('daemon_action', nargs='?', choices=('start', 'stop', 'restart'), \
parser.add_argument('daemonaction', nargs='?', choices=('start', 'stop', 'restart'), \
help='{start, stop, restart} daemon')
parser.add_argument('-s', '--source', action='store', dest='src', \
help='Messages source {file, http[s]} (default=file)')
# parser.add_argument('-a', '--subscribe', action='store', dest='sub', \
# help='Messages subscribe from AMQP')
parser.add_argument('-d', '--destination', action='store', dest='dest', \
help='Message destination {file, analyze, or warehouse} (default=analyze)')
parser.add_argument('--daemon', action='store_true', \
help='Daemonize execution')
parser.add_argument('-l', '--log', action='store', \
help='Logging level (default=warning)')
parser.add_argument('-c', '--config', action='store', default='./route_rdr.conf', \
help='Configuration file default=./route_rdr.conf')
parser.add_argument('--verbose', action='store_true', \
help='Verbose output')
parser.add_argument('--daemon', action='store_true', \
help='Daemonize execution')
parser.add_argument('--pdb', action='store_true', \
help='Run with Python debugger')
self.args = parser.parse_args()

# Trace for debugging as early as possible
if self.args.pdb:
pdb.set_trace()

# Load configuration file
config_path = os.path.abspath(self.args.config)
self.config_file = os.path.abspath(self.args.config)
try:
with open(config_path, 'r') as file:
with open(self.config_file, 'r') as file:
conf=file.read()
file.close()
except IOError as e:
raise
eprint('Error "{}" reading config={}'.format(e, config_path))
sys.exit(1)
try:
self.config = json.loads(conf)
except ValueError as e:
print('Error "{}" parsing config={}'.format(e, config_path))
eprint('Error "{}" parsing config={}'.format(e, config_path))
sys.exit(1)

# Initialize logging from arguments, or config file, or default to WARNING as last resort
numeric_log = None
if self.args.log is not None:
numeric_log = getattr(logging, self.args.log.upper(), None)
if numeric_log is None and 'LOG_LEVEL' in self.config:
numeric_log = getattr(logging, self.config['LOG_LEVEL'].upper(), None)
if numeric_log is None:
numeric_log = getattr(logging, 'WARNING', None)
if not isinstance(numeric_log, int):
raise ValueError('Invalid log level: {}'.format(numeric_log))
if self.config.get('PID_FILE'):
self.pidfile_path = self.config['PID_FILE']
else:
name = os.path.basename(__file__).replace('.py', '')
self.pidfile_path = '/var/run/{}/{}.pid'.format(name, name)

def Setup(self):
# Initialize log level from arguments, or config file, or default to WARNING
loglevel_str = (self.args.log or self.config.get('LOG_LEVEL', 'WARNING')).upper()
loglevel_num = getattr(logging, loglevel_str, None)
self.logger = logging.getLogger('DaemonLog')
self.logger.setLevel(numeric_log)
self.logger.setLevel(loglevel_num)
self.formatter = logging.Formatter(fmt='%(asctime)s.%(msecs)03d %(levelname)s %(message)s', \
datefmt='%Y/%m/%d %H:%M:%S')
self.handler = logging.handlers.TimedRotatingFileHandler(self.config['LOG_FILE'], when='W6', \
backupCount=999, utc=True)
self.handler = logging.handlers.TimedRotatingFileHandler(self.config['LOG_FILE'], \
when='W6', backupCount=999, utc=True)
self.handler.setFormatter(self.formatter)
self.logger.addHandler(self.handler)

# Initialize stdout, stderr
if self.args.daemon and 'LOG_FILE' in self.config:
self.stdout_path = self.config['LOG_FILE'].replace('.log', '.daemon.log')
self.stderr_path = self.stdout_path
self.SaveDaemonStdOut(self.stdout_path)
sys.stdout = open(self.stdout_path, 'wt+')
sys.stderr = open(self.stderr_path, 'wt+')

signal.signal(signal.SIGINT, self.exit_signal)
signal.signal(signal.SIGTERM, self.exit_signal)

self.logger.info('Starting program=%s pid=%s, uid=%s(%s)' % \
(os.path.basename(__file__), os.getpid(), os.geteuid(), pwd.getpwuid(os.geteuid()).pw_name))

self.src = {}
self.dest = {}
for var in ['uri', 'scheme', 'path', 'display']: # Where <full> contains <type>:<obj>
self.src[var] = None
self.dest[var] = None
self.peak_sleep = 10 * 60 # 10 minutes in seconds during peak business hours
self.off_sleep = 60 * 60 # 60 minutes in seconds during off hours
self.max_stale = 24 * 60 * 60 # 24 hours in seconds force refresh
# These attributes have their own database column
# Some fields exist in both parent and sub-resources, while others only in one
# Those in one will be left empty in the other, or inherit from the parent
self.have_column = ['resource_id', 'info_resourceid',
'resource_descriptive_name', 'resource_description',
'project_affiliation', 'provider_level',
'resource_status', 'current_statuses', 'updated_at']
default_file = 'file:./rdr.json'

# Verify arguments and parse compound arguments
if not getattr(self.args, 'src', None): # Tests for None and empty ''
if 'RDR_INFO_URL' in self.config:
Expand Down Expand Up @@ -168,21 +174,38 @@ def __init__(self):
self.logger.error('Source and Destination can not both be a {file}')
sys.exit(1)

if self.args.daemon_action:
self.stdin_path = '/dev/null'
if 'LOG_FILE' in self.config:
self.stdout_path = self.config['LOG_FILE'].replace('.log', '.daemon.log')
self.stderr_path = self.stdout_path
else:
self.stdout_path = '/dev/tty'
self.stderr_path = '/dev/tty'
self.SaveDaemonLog(self.stdout_path)
self.pidfile_timeout = 5
if 'PID_FILE' in self.config:
self.pidfile_path = self.config['PID_FILE']
else:
name = os.path.basename(__file__).replace('.py', '')
self.pidfile_path = '/var/run/{}/{}.pid'.format(name ,name)
if self.args.daemonaction == 'start':
if self.src['scheme'] not in ['http', 'https'] or self.dest['scheme'] not in ['warehouse']:
self.logger.error('Can only daemonize when source=[http|https] and destination=warehouse')
sys.exit(1)

self.logger.info('Source: ' + self.src['display'])
self.logger.info('Destination: ' + self.dest['display'])
self.logger.info('Config: ' + self.config_file)

def SaveDaemonStdOut(self, path):
# Save daemon log file using timestamp only if it has anything unexpected in it
try:
file = open(path, 'r')
lines = file.read()
file.close()
if not re.match("^started with pid \d+$", lines) and not re.match("^$", lines):
ts = datetime.strftime(datetime.now(), '%Y-%m-%d_%H:%M:%S')
newpath = '{}.{}'.format(path, ts)
self.logger.debug('Saving previous daemon stdout to {}'.format(newpath))
shutil.copy(path, newpath)
except Exception as e:
self.logger.error('Exception in SaveDaemonStdOut({})'.format(path))
return

def exit_signal(self, signum, frame):
self.logger.critical('Caught signal={}({}), exiting with rc={}'.format(signum, signal.Signals(signum).name, signum))
sys.exit(signum)

def exit(self, rc):
if rc:
self.logger.error('Exiting with rc={}'.format(rc))
sys.exit(rc)

def Retrieve_RDR(self, url):
idx = url.find(':')
Expand Down Expand Up @@ -442,25 +465,6 @@ def latest_status_date(self, resource_status_dates, current_status, which_date):
except:
return(None)

def SaveDaemonLog(self, path):
# Save daemon log file using timestamp only if it has anything unexpected in it
try:
with open(path, 'r') as file:
lines=file.read()
file.close()
if not re.match("^started with pid \d+$", lines) and not re.match("^$", lines):
ts = datetime.strftime(datetime.now(), '%Y-%m-%d_%H:%M:%S')
newpath = '{}.{}'.format(path, ts)
shutil.copy(path, newpath)
print('SaveDaemonLog as {}'.format(newpath))
except Exception as e:
print('Exception in SaveDaemonLog({})'.format(path))
return

def exit_signal(self, signal, frame):
self.logger.critical('Caught signal={}, exiting...'.format(signal))
sys.exit(0)

def smart_sleep(self, last_run):
# This functions sleeps, performs refresh checks, and returns when it's time to refresh
while True:
Expand Down Expand Up @@ -497,13 +501,7 @@ def smart_sleep(self, last_run):
self.logger.error('{} parsing last_update_time={}: {}'.format(type(e).__name__, ts_json['last_update_time'], e.message))
last_db_update = None

def run(self):
signal.signal(signal.SIGINT, self.exit_signal)
signal.signal(signal.SIGTERM, self.exit_signal)
self.logger.info('Starting program={} pid={}, uid={}({})'.format(os.path.basename(__file__), os.getpid(), os.geteuid(), pwd.getpwuid(os.geteuid()).pw_name))
self.logger.info('Source: ' + self.src['display'])
self.logger.info('Destination: ' + self.dest['display'])

def Run(self):
while True:
self.start = datetime.now(utc)
self.stats = {
Expand Down Expand Up @@ -538,24 +536,22 @@ def run(self):
pa.FinishActivity(rc, summary_msg)
else: # Something failed, use returned message
pa.FinishActivity(rc, warehouse_msg)
if not self.args.daemon_action:
if not self.args.daemonaction:
break
self.smart_sleep(self.start)

########## CUSTOMIZATIONS END ##########

if __name__ == '__main__':
router = HandleRDR()
if router.args.daemon_action is None: # Interactive execution
myrouter = router.run()
sys.exit(0)

if router.args.daemon_action == 'start':
if router.src['scheme'] not in ['http', 'https'] or router.dest['scheme'] not in ['warehouse']:
router.logger.error('Can only daemonize when source=[http|https] and destination=warehouse')
sys.exit(1)
router = Router()
with PidFile(router.pidfile_path):
try:
router.Setup()
rc = router.Run()
except Exception as e:
msg = '{} Exception: {}'.format(type(e).__name__, e)
router.logger.error(msg)
traceback.print_exc(file=sys.stdout)
rc = 1
router.exit(rc)

# Daemon execution
router.logger.info('Daemon startup')
daemon_runner = runner.DaemonRunner(router)
daemon_runner.daemon_context.files_preserve=[router.handler.stream]
daemon_runner.daemon_context.working_directory=router.config['RUN_DIR']
daemon_runner.do_action()
Loading

0 comments on commit 602b799

Please sign in to comment.