Skip to content

Commit

Permalink
Implement the ability to load multiple affiliations, use library to p…
Browse files Browse the repository at this point in the history
…arse url, bug fix(s)
  • Loading branch information
jpnavarro committed Mar 31, 2022
1 parent 55777be commit 2ad18a9
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 72 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
tag-1.3-20220331 JP
- Implement the ability to load multiple affiliations, use library to parse url, bug fix(s)

tag-1.2-20210907 JP
- Switch to .objects.update_or_create for Drupal 3.0
130 changes: 59 additions & 71 deletions bin/route_rdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import signal
import ssl
from time import sleep
import traceback
from urllib.parse import urlparse

import django
django.setup()
Expand Down Expand Up @@ -174,6 +176,8 @@ def Setup(self):
self.logger.error('Source and Destination can not both be a {file}')
sys.exit(1)

self.AFFILIATIONS = self.config.get('AFFILIATIONS', ['XSEDE']) # Default to XSEDE only

if self.args.daemonaction == 'start':
if self.src['scheme'] not in ['http', 'https'] or self.dest['scheme'] not in ['warehouse']:
self.logger.error('Can only daemonize when source=[http|https] and destination=warehouse')
Expand All @@ -182,6 +186,7 @@ def Setup(self):
self.logger.info('Source: ' + self.src['display'])
self.logger.info('Destination: ' + self.dest['display'])
self.logger.info('Config: ' + self.config_file)
self.logger.info('Affiliations: ' + ', '.join(self.AFFILIATIONS))

def SaveDaemonStdOut(self, path):
# Save daemon log file using timestamp only if it has anything unexpected in it
Expand All @@ -207,65 +212,54 @@ def exit(self, rc):
self.logger.error('Exiting with rc={}'.format(rc))
sys.exit(rc)

def Retrieve_RDR(self, url):
idx = url.find(':')
if idx <= 0:
self.logger.error('Retrieve URL is not valid')
sys.exit(1)

(type, obj) = (url[0:idx], url[idx+1:])
if type not in ['http', 'https']:
self.logger.error('Retrieve URL is not valid')
sys.exit(1)

if obj[0:2] != '//':
self.logger.error('Retrieve URL is not valid')
def Retrieve_RDR_Resources(self, url):
rdr_all = []
for AFF in self.AFFILIATIONS:
rdr_aff = self.Retrieve_RDR(url, affiliation=AFF)
if rdr_aff:
if 'resources' not in rdr_aff:
self.logger.error('RDR JSON response (affiliation={}) is missing a \'resources\' element'.format(AFF))
else:
rdr_all.extend(rdr_aff['resources'])
return(rdr_all)

def Retrieve_RDR(self, url, affiliation='XSEDE'):
urlp = urlparse(url)
if not urlp.scheme or not urlp.netloc or not urlp.path:
self.logger.error('RDR URL is not valid: {}'.format(url))
sys.exit(1)

obj = obj[2:]
idx = obj.find('/')
if idx <= 0:
self.logger.error('Retrieve URL is not valid')
if urlp.scheme not in ['http', 'https']:
self.logger.error('RDR URL scheme is not valid: {}'.format(url))
sys.exit(1)

(host, path) = (obj[0:idx], obj[idx:])
idx = host.find(':')
if idx > 0:
port = host[idx+1:]
elif type == 'https':
port = '443'
if ':' in urlp.netloc:
(host, port) = urlp.netloc.split(':')
else:
port = '80'
(host, port) = (urlp.netloc, '')
if not port:
port = '80' if urlp.scheme == 'http' else '443' # Default is HTTPS/443

headers = {'Content-type': 'application/json',
'XA-CLIENT': 'XSEDE',
'XA-CLIENT': affiliation,
'XA-KEY-FORMAT': 'underscore'}
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
conn = httplib.HTTPSConnection(host=host, port=port, context=ctx)
if path[0] != '/':
path = '/' + path
conn.request('GET', path, None , headers)
conn.request('GET', urlp.path, None , headers)
self.logger.debug('HTTP GET {}'.format(url))
response = conn.getresponse()
result = response.read()
self.logger.debug('HTTP RESP {} {} (returned {}/bytes)'.format(response.status, response.reason, len(result)))
try:
rdr_obj = json.loads(result)
rdr_json = json.loads(result)
except ValueError as e:
self.logger.error('Response not in expected JSON format ({})'.format(e))
return(None)
else:
return(rdr_obj)
return(rdr_json)

def Analyze_RDR(self, rdr_obj):
if 'resources' not in rdr_obj:
self.logger.error('RDR JSON response is missing the base \'resources\' element')
self.stats['Skip'] += 1
sys.exit(1)
maxlen = {}
for p_res in rdr_obj['resources']: # Parent resources
for p_res in rdr_obj: # Parent resources
if any(x not in p_res for x in ('project_affiliation', 'resource_id', 'info_resourceid')) \
or p_res['project_affiliation'] != 'XSEDE' \
or p_res['project_affiliation'] not in self.AFFILIATIONS \
or str(p_res['info_resourceid']).lower() == 'none' \
or p_res['info_resourceid'] == '':
self.stats['Skip'] += 1
Expand Down Expand Up @@ -327,12 +321,6 @@ def Read_Cache(self, file):
sys.exit(1)

def Warehouse_RDR(self, rdr_obj):
if 'resources' not in rdr_obj:
self.stats['Skip'] += 1
msg = 'RDR JSON response is missing a \'resources\' element'
self.logger.error(msg)
return(False, msg)

id_lookup = {'compute_resources': 'compute_resource_id',
'other_resources': 'other_resource_id',
'grid_resources': 'grid_resource_id',
Expand All @@ -349,10 +337,10 @@ def Warehouse_RDR(self, rdr_obj):
for item in RDRResource.objects.all():
self.cur[item.rdr_resource_id] = item

for p_res in rdr_obj['resources']:
for p_res in rdr_obj:
# Require affiliation=XSEDE, a resource_id, and an information services ResourceID
if any(x not in p_res for x in ('project_affiliation', 'resource_id', 'info_resourceid')) \
or p_res['project_affiliation'] != 'XSEDE' \
or p_res['project_affiliation'] not in self.AFFILIATIONS \
or str(p_res['info_resourceid']).lower() == 'none' \
or p_res['info_resourceid'] == '':
self.stats['Skip'] += 1
Expand Down Expand Up @@ -517,29 +505,30 @@ def Run(self):
if self.src['scheme'] == 'file':
RDR = self.Read_Cache(self.src['path'])
else:
RDR = self.Retrieve_RDR(self.src['uri'])

if self.dest['scheme'] == 'file':
bytes = self.Write_Cache(self.dest['path'], RDR)
elif self.dest['scheme'] == 'analyze':
self.Analyze_RDR(RDR)
elif self.dest['scheme'] == 'warehouse':
pa_application=os.path.basename(__file__)
pa_function='Warehouse_RDR'
pa_topic = 'Resource Description Repository'
pa_about = 'xsede.org'
pa_id = '{}:{}:{}'.format(pa_application, pa_function, pa_topic)
pa = ProcessingActivity(pa_application, pa_function, pa_id , pa_topic, pa_about)
(rc, warehouse_msg) = self.Warehouse_RDR(RDR)

self.end = datetime.now(utc)
summary_msg = 'Processed in {:.3f}/seconds: {}/updates, {}/deletes, {}/skipped'.format((self.end - self.start).total_seconds(), self.stats['Update'], self.stats['Delete'], self.stats['Skip'])
self.logger.info(summary_msg)
if self.dest['scheme'] == 'warehouse':
if rc: # No errors
pa.FinishActivity(rc, summary_msg)
else: # Something failed, use returned message
pa.FinishActivity(rc, warehouse_msg)
RDR = self.Retrieve_RDR_Resources(self.src['uri'])

if RDR:
if self.dest['scheme'] == 'file':
bytes = self.Write_Cache(self.dest['path'], RDR)
elif self.dest['scheme'] == 'analyze':
self.Analyze_RDR(RDR)
elif self.dest['scheme'] == 'warehouse':
pa_application=os.path.basename(__file__)
pa_function='Warehouse_RDR'
pa_topic = 'Resource Description Repository'
pa_about = ','.join(self.AFFILIATIONS)
pa_id = '{}:{}:{}'.format(pa_application, pa_function, pa_topic)
pa = ProcessingActivity(pa_application, pa_function, pa_id , pa_topic, pa_about)
(rc, warehouse_msg) = self.Warehouse_RDR(RDR)

self.end = datetime.now(utc)
summary_msg = 'Processed in {:.3f}/seconds: {}/updates, {}/deletes, {}/skipped'.format((self.end - self.start).total_seconds(), self.stats['Update'], self.stats['Delete'], self.stats['Skip'])
self.logger.info(summary_msg)
if self.dest['scheme'] == 'warehouse':
if rc: # No errors
pa.FinishActivity(rc, summary_msg)
else: # Something failed, use returned message
pa.FinishActivity(rc, warehouse_msg)
if not self.args.daemonaction:
break
self.smart_sleep(self.start)
Expand All @@ -558,4 +547,3 @@ def Run(self):
traceback.print_exc(file=sys.stdout)
rc = 1
router.exit(rc)

1 change: 1 addition & 0 deletions conf/route_rdr.conf.prod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"DESTINATION": "warehouse",
"RDR_INFO_URL": "https://rdr.xsede.org/xsede-api/provider/rdr/v1/resources",
"RDR_LAST_URL": "https://rdr.xsede.org/xsede-api/provider/rdr/v1/resources/last_update",
"AFFILIATIONS": ["XSEDE", "TACC"],
"LOG_FILE": "/soft/warehouse-apps-1.0/Manage-RDR/var/route_rdr.log",
"LOG_LEVEL": "warning",
"RUN_DIR": "/soft/warehouse-apps-1.0/Manage-RDR/var",
Expand Down
2 changes: 1 addition & 1 deletion sbin/route_rdr.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ do_stop () {
}
do_debug () {
echo -n "Debugging: ${PYTHON_BIN} ${APP_BIN} $@ ${APP_OPTS}"
${PYTHON_BIN} ${APP_BIN} $@ ${APP_OPTS}
${PYTHON_BIN} ${APP_BIN} start $@ ${APP_OPTS}
RETVAL=$?
}

Expand Down

0 comments on commit 2ad18a9

Please sign in to comment.