From 2ad18a9180993fbf5396459c78218652e751b215 Mon Sep 17 00:00:00 2001 From: John-Paul Navarro Date: Thu, 31 Mar 2022 08:32:16 -0500 Subject: [PATCH] Implement the ability to load multiple affiliations, use library to parse url, bug fix(s) --- CHANGELOG | 3 + bin/route_rdr.py | 130 ++++++++++++++++++--------------------- conf/route_rdr.conf.prod | 1 + sbin/route_rdr.sh | 2 +- 4 files changed, 64 insertions(+), 72 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 96ac25b..89788ad 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,2 +1,5 @@ +tag-1.3-20220331 JP + - Implement the ability to load multiple affiliations, use library to parse url, bug fix(s) + tag-1.2-20210907 JP - Switch to .objects.update_or_create for Drupal 3.0 diff --git a/bin/route_rdr.py b/bin/route_rdr.py index 78592f5..0e5d6e5 100755 --- a/bin/route_rdr.py +++ b/bin/route_rdr.py @@ -18,6 +18,8 @@ import signal import ssl from time import sleep +import traceback +from urllib.parse import urlparse import django django.setup() @@ -174,6 +176,8 @@ def Setup(self): self.logger.error('Source and Destination can not both be a {file}') sys.exit(1) + self.AFFILIATIONS = self.config.get('AFFILIATIONS', ['XSEDE']) # Default to XSEDE only + if self.args.daemonaction == 'start': if self.src['scheme'] not in ['http', 'https'] or self.dest['scheme'] not in ['warehouse']: self.logger.error('Can only daemonize when source=[http|https] and destination=warehouse') @@ -182,6 +186,7 @@ def Setup(self): self.logger.info('Source: ' + self.src['display']) self.logger.info('Destination: ' + self.dest['display']) self.logger.info('Config: ' + self.config_file) + self.logger.info('Affiliations: ' + ', '.join(self.AFFILIATIONS)) def SaveDaemonStdOut(self, path): # Save daemon log file using timestamp only if it has anything unexpected in it @@ -207,65 +212,54 @@ def exit(self, rc): self.logger.error('Exiting with rc={}'.format(rc)) sys.exit(rc) - def Retrieve_RDR(self, url): - idx = url.find(':') - if idx <= 0: - self.logger.error('Retrieve URL is not valid') - sys.exit(1) - - (type, obj) = (url[0:idx], url[idx+1:]) - if type not in ['http', 'https']: - self.logger.error('Retrieve URL is not valid') - sys.exit(1) - - if obj[0:2] != '//': - self.logger.error('Retrieve URL is not valid') + def Retrieve_RDR_Resources(self, url): + rdr_all = [] + for AFF in self.AFFILIATIONS: + rdr_aff = self.Retrieve_RDR(url, affiliation=AFF) + if rdr_aff: + if 'resources' not in rdr_aff: + self.logger.error('RDR JSON response (affiliation={}) is missing a \'resources\' element'.format(AFF)) + else: + rdr_all.extend(rdr_aff['resources']) + return(rdr_all) + + def Retrieve_RDR(self, url, affiliation='XSEDE'): + urlp = urlparse(url) + if not urlp.scheme or not urlp.netloc or not urlp.path: + self.logger.error('RDR URL is not valid: {}'.format(url)) sys.exit(1) - - obj = obj[2:] - idx = obj.find('/') - if idx <= 0: - self.logger.error('Retrieve URL is not valid') + if urlp.scheme not in ['http', 'https']: + self.logger.error('RDR URL scheme is not valid: {}'.format(url)) sys.exit(1) - - (host, path) = (obj[0:idx], obj[idx:]) - idx = host.find(':') - if idx > 0: - port = host[idx+1:] - elif type == 'https': - port = '443' + if ':' in urlp.netloc: + (host, port) = urlp.netloc.split(':') else: - port = '80' + (host, port) = (urlp.netloc, '') + if not port: + port = '80' if urlp.scheme == 'http' else '443' # Default is HTTPS/443 headers = {'Content-type': 'application/json', - 'XA-CLIENT': 'XSEDE', + 'XA-CLIENT': affiliation, 'XA-KEY-FORMAT': 'underscore'} ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) conn = httplib.HTTPSConnection(host=host, port=port, context=ctx) - if path[0] != '/': - path = '/' + path - conn.request('GET', path, None , headers) + conn.request('GET', urlp.path, None , headers) self.logger.debug('HTTP GET {}'.format(url)) response = conn.getresponse() result = response.read() self.logger.debug('HTTP RESP {} {} (returned {}/bytes)'.format(response.status, response.reason, len(result))) try: - rdr_obj = json.loads(result) + rdr_json = json.loads(result) except ValueError as e: self.logger.error('Response not in expected JSON format ({})'.format(e)) return(None) - else: - return(rdr_obj) + return(rdr_json) def Analyze_RDR(self, rdr_obj): - if 'resources' not in rdr_obj: - self.logger.error('RDR JSON response is missing the base \'resources\' element') - self.stats['Skip'] += 1 - sys.exit(1) maxlen = {} - for p_res in rdr_obj['resources']: # Parent resources + for p_res in rdr_obj: # Parent resources if any(x not in p_res for x in ('project_affiliation', 'resource_id', 'info_resourceid')) \ - or p_res['project_affiliation'] != 'XSEDE' \ + or p_res['project_affiliation'] not in self.AFFILIATIONS \ or str(p_res['info_resourceid']).lower() == 'none' \ or p_res['info_resourceid'] == '': self.stats['Skip'] += 1 @@ -327,12 +321,6 @@ def Read_Cache(self, file): sys.exit(1) def Warehouse_RDR(self, rdr_obj): - if 'resources' not in rdr_obj: - self.stats['Skip'] += 1 - msg = 'RDR JSON response is missing a \'resources\' element' - self.logger.error(msg) - return(False, msg) - id_lookup = {'compute_resources': 'compute_resource_id', 'other_resources': 'other_resource_id', 'grid_resources': 'grid_resource_id', @@ -349,10 +337,10 @@ def Warehouse_RDR(self, rdr_obj): for item in RDRResource.objects.all(): self.cur[item.rdr_resource_id] = item - for p_res in rdr_obj['resources']: + for p_res in rdr_obj: # Require affiliation=XSEDE, a resource_id, and an information services ResourceID if any(x not in p_res for x in ('project_affiliation', 'resource_id', 'info_resourceid')) \ - or p_res['project_affiliation'] != 'XSEDE' \ + or p_res['project_affiliation'] not in self.AFFILIATIONS \ or str(p_res['info_resourceid']).lower() == 'none' \ or p_res['info_resourceid'] == '': self.stats['Skip'] += 1 @@ -517,29 +505,30 @@ def Run(self): if self.src['scheme'] == 'file': RDR = self.Read_Cache(self.src['path']) else: - RDR = self.Retrieve_RDR(self.src['uri']) - - if self.dest['scheme'] == 'file': - bytes = self.Write_Cache(self.dest['path'], RDR) - elif self.dest['scheme'] == 'analyze': - self.Analyze_RDR(RDR) - elif self.dest['scheme'] == 'warehouse': - pa_application=os.path.basename(__file__) - pa_function='Warehouse_RDR' - pa_topic = 'Resource Description Repository' - pa_about = 'xsede.org' - pa_id = '{}:{}:{}'.format(pa_application, pa_function, pa_topic) - pa = ProcessingActivity(pa_application, pa_function, pa_id , pa_topic, pa_about) - (rc, warehouse_msg) = self.Warehouse_RDR(RDR) - - self.end = datetime.now(utc) - summary_msg = 'Processed in {:.3f}/seconds: {}/updates, {}/deletes, {}/skipped'.format((self.end - self.start).total_seconds(), self.stats['Update'], self.stats['Delete'], self.stats['Skip']) - self.logger.info(summary_msg) - if self.dest['scheme'] == 'warehouse': - if rc: # No errors - pa.FinishActivity(rc, summary_msg) - else: # Something failed, use returned message - pa.FinishActivity(rc, warehouse_msg) + RDR = self.Retrieve_RDR_Resources(self.src['uri']) + + if RDR: + if self.dest['scheme'] == 'file': + bytes = self.Write_Cache(self.dest['path'], RDR) + elif self.dest['scheme'] == 'analyze': + self.Analyze_RDR(RDR) + elif self.dest['scheme'] == 'warehouse': + pa_application=os.path.basename(__file__) + pa_function='Warehouse_RDR' + pa_topic = 'Resource Description Repository' + pa_about = ','.join(self.AFFILIATIONS) + pa_id = '{}:{}:{}'.format(pa_application, pa_function, pa_topic) + pa = ProcessingActivity(pa_application, pa_function, pa_id , pa_topic, pa_about) + (rc, warehouse_msg) = self.Warehouse_RDR(RDR) + + self.end = datetime.now(utc) + summary_msg = 'Processed in {:.3f}/seconds: {}/updates, {}/deletes, {}/skipped'.format((self.end - self.start).total_seconds(), self.stats['Update'], self.stats['Delete'], self.stats['Skip']) + self.logger.info(summary_msg) + if self.dest['scheme'] == 'warehouse': + if rc: # No errors + pa.FinishActivity(rc, summary_msg) + else: # Something failed, use returned message + pa.FinishActivity(rc, warehouse_msg) if not self.args.daemonaction: break self.smart_sleep(self.start) @@ -558,4 +547,3 @@ def Run(self): traceback.print_exc(file=sys.stdout) rc = 1 router.exit(rc) - diff --git a/conf/route_rdr.conf.prod b/conf/route_rdr.conf.prod index dd125e7..225aa01 100644 --- a/conf/route_rdr.conf.prod +++ b/conf/route_rdr.conf.prod @@ -2,6 +2,7 @@ "DESTINATION": "warehouse", "RDR_INFO_URL": "https://rdr.xsede.org/xsede-api/provider/rdr/v1/resources", "RDR_LAST_URL": "https://rdr.xsede.org/xsede-api/provider/rdr/v1/resources/last_update", + "AFFILIATIONS": ["XSEDE", "TACC"], "LOG_FILE": "/soft/warehouse-apps-1.0/Manage-RDR/var/route_rdr.log", "LOG_LEVEL": "warning", "RUN_DIR": "/soft/warehouse-apps-1.0/Manage-RDR/var", diff --git a/sbin/route_rdr.sh b/sbin/route_rdr.sh index 69cf06a..c6f0ed1 100755 --- a/sbin/route_rdr.sh +++ b/sbin/route_rdr.sh @@ -69,7 +69,7 @@ do_stop () { } do_debug () { echo -n "Debugging: ${PYTHON_BIN} ${APP_BIN} $@ ${APP_OPTS}" - ${PYTHON_BIN} ${APP_BIN} $@ ${APP_OPTS} + ${PYTHON_BIN} ${APP_BIN} start $@ ${APP_OPTS} RETVAL=$? }