diff --git a/bin/aso_metrics_ora.py b/bin/aso_metrics_ora.py new file mode 100644 index 0000000..46c7a9c --- /dev/null +++ b/bin/aso_metrics_ora.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python +""" +Kibana monitor script for OracleAso +""" +from __future__ import print_function +from __future__ import division + +import os +import sys +import json +import time +import pycurl +import urllib +import urllib2 +import httplib +import logging +import datetime +import subprocess +from urlparse import urljoin +from socket import gethostname +from optparse import OptionParser + +from RESTInteractions import HTTPRequests +from ServerUtilities import encodeRequest, oracleOutputMapping +from ServerUtilities import TRANSFERDB_STATES, PUBLICATIONDB_STATES + + +def check_availability(): + """put here your availability logic, """ + return 1 + +def generate_xml(input): + from xml.etree.ElementTree import Element, SubElement, tostring + from pprint import pprint + xmllocation = './ASO_XML_Report.xml' + logger = logging.getLogger() + handler = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter("%(asctime)s:%(levelname)s:%(module)s %(message)s", + datefmt="%a, %d %b %Y %H:%M:%S %Z(%z)") + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + root = Element('serviceupdate') + root.set( "xmlns", "http://sls.cern.ch/SLS/XML/update") + child = SubElement(root, "id") + + # change this to a name which you will use in kibana queries(for example vocms031 or any other name) + # or just uncomment next line to use the hostname of the machine which is running this script + # child.text = gethostname().split('.')[0] + child.text = "oramon-testbed" + + fmt = "%Y-%m-%dT%H:%M:%S%z" + now_utc = datetime.datetime.now().strftime(fmt) + child_timestamp = SubElement(root, "timestamp") + child_timestamp.text = str(now_utc) + + child_status = SubElement(root,"status") + + # when you have a way to calculate the availability for your service + # change the function check_availability, for now it will + # always return 1(available) + if check_availability() == 1: + # This means that everything is fine + child_status.text = "available" + else: + child_status.text = "degraded" + + # now put all numeric values her + data = SubElement(root, "data") + + for key in input.keys(): + if isinstance(input[key],dict): + for skey in input[key]: + nName="%s_%s"%(key,skey) + nValue=input[key][skey] + numericval = SubElement(data, "numericvalue") + numericval.set("name",nName) + numericval.text = str(nValue) + + temp_xmllocation = xmllocation + ".temp" + while True: + try: + with open(temp_xmllocation, 'w') as f: + f.write(tostring(root)) + os.system('mv %s %s' % (temp_xmllocation, xmllocation)) + break + except Exception, e: + logger.debug(str(e)) + continue + + # push the XML to elasticSearch + maxi = 0 + while maxi < 3: + cmd = "curl -i -F file=@%s xsls.cern.ch"%xmllocation + try: + pu = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + break + except Exception, e: + logger.debug(str(e)) + maxi = maxi + 1 + continue + +if __name__ == "__main__": + server = HTTPRequests('cmsweb-testbed.cern.ch', + '/data/srv/asyncstageout/state/asyncstageout/creds/OpsProxy', + '/data/srv/asyncstageout/state/asyncstageout/creds/OpsProxy') + + result = server.get('/crabserver/preprod/filetransfers', + data=encodeRequest({'subresource': 'groupedTransferStatistics', 'grouping': 0})) + + results = oracleOutputMapping(result) + + + status = {'transfers':{}, 'publications':{}} + tmp = {'transfers':{ 'DONE':0, 'ACQUIRED':0, 'SUBMITTED':0, 'FAILED':0, 'RETRY':0 }, + 'publications':{'DONE':0, 'ACQUIRED':0, 'NEW':0, 'FAILED':0, 'RETRY':0}} + + #past = open("tmp_transfer") + #tmp = json.load(past) + + for doc in results: + if doc['aso_worker']=="asodciangot1": + if 'transfers' in tmp and TRANSFERDB_STATES[doc['transfer_state']] in tmp['transfers']: + status['transfers'][TRANSFERDB_STATES[doc['transfer_state']]] = - tmp['transfers'][TRANSFERDB_STATES[doc['transfer_state']]] + doc['nt'] + tmp['transfers'][TRANSFERDB_STATES[doc['transfer_state']]] = doc['nt'] + else: + status['transfers'][TRANSFERDB_STATES[doc['transfer_state']]] = doc['nt'] + tmp['transfers'][TRANSFERDB_STATES[doc['transfer_state']]] = doc['nt'] + + result = server.get('/crabserver/preprod/filetransfers', + data=encodeRequest({'subresource': 'groupedPublishStatistics', 'grouping': 0})) + + results = oracleOutputMapping(result) + + for doc in results: + if doc['aso_worker']=="asodciangot1": + if 'publications' in tmp and PUBLICATIONDB_STATES[doc['publication_state']] in tmp['publications']: + status['publications'][PUBLICATIONDB_STATES[doc['publication_state']]] = -tmp['publications'][PUBLICATIONDB_STATES[doc['publication_state']]] + doc['nt'] + tmp['publications'][PUBLICATIONDB_STATES[doc['publication_state']]] = doc['nt'] + else: + status['publications'][PUBLICATIONDB_STATES[doc['publication_state']]] = doc['nt'] + tmp['publications'][PUBLICATIONDB_STATES[doc['publication_state']]] = doc['nt'] + + #past.close() + while True: + try: + tmp_transfer = open("tmp_transfer","w") + tmp_transfer.write(json.dumps(tmp)) + tmp_transfer.close() + break + except Exception as ex: + print(ex) + continue + + print (status) + + generate_xml(tmp) + + sys.exit(0) + diff --git a/bin/fakeMon.py b/bin/fakeMon.py new file mode 100644 index 0000000..fb40014 --- /dev/null +++ b/bin/fakeMon.py @@ -0,0 +1,60 @@ +import threading +import Queue +import sys +import os + +def do_work(in_queue,): + while True: + item = in_queue.get() + # process + with open(item) as json_file: + json_data = json.load(json_file) + + jobid = item.split(".")[1] + + reporter = { + "LFNs": [], + "transferStatus": [], + "failure_reason": [], + "timestamp": [], + "username": "" + } + reporter["LFNs"] = json_data["LFNs"] + reporter["transferStatus"] = ['Finished' for x in range(len(reporter["LFNs"]))] + reporter["username"] = json_data["username"] + reporter["reasons"] = ['' for x in range(len(reporter["LFNs"]))] + reporter["timestamp"] = 10000 + report_j = json.dumps(reporter) + + try: + if not os.path.exists("/data/srv/asyncstageout/current/install/asyncstageout/Monitor/work/%s" %user): + os.makedirs("/data/srv/asyncstageout/current/install/asyncstageout/Monitor/work/%s" %user) + out_file = open("/data/srv/asyncstageout/current/install/asyncstageout/AsyncTransfer/dropbox/inputs/%s/Reporter.%s.json"%(user,jobid),"w") + out_file.write(report_j) + out_file.close() + os.remove('/%s/Monitor.%s.json' %(self.user,self.jobid)) + except Exception as ex: + msg="Cannot create fts job report: %s" %ex + self.logger.error(msg) + + in_queue.task_done() + +if __name__ == "__main__": + self.f + work = Queue.Queue() + results = Queue.Queue() + total = 20 + + # start for workers + for i in xrange(4): + t = threading.Thread(target=do_work, args=(work,)) + t.daemon = True + t.start() + + # produce data + for i in os.listdir("/data/srv/asyncstageout/current/install/asyncstageout/AsyncTransfer/dropbox/outputs"): + work.put(i) + + work.join() + + sys.exit() diff --git a/configuration/Example.py b/configuration/Example.py index 4937d78..f084ef4 100644 --- a/configuration/Example.py +++ b/configuration/Example.py @@ -82,12 +82,22 @@ config.AsyncTransfer.cache_area = cache_area config.AsyncTransfer.ftscp_design = 'ftscp' config.AsyncTransfer.max_h_in_queue = 5 +config.AsyncTransfer.isOracle = False +config.AsyncTransfer.oracleDB = 'cmsweb-testbed.cern.ch' +config.AsyncTransfer.oracleFileTrans = '/crabserver/preprod/filetransfers' +config.AsyncTransfer.asoworker = "asodciangot1" +config.AsyncTransfer.retry_time = 12 config.component_('Reporter') config.Reporter.logLevel = 'INFO' config.Reporter.log_level = logging.INFO config.Reporter.logMsgFormat = "%(asctime)s:%(levelname)s:%(module)s:%(name)s: %(message)s" config.Reporter.namespace = 'AsyncStageOut.Reporter' config.Reporter.componentDir = config.General.workDir +config.Reporter.isOracle =False +config.Reporter.oracleDB = 'cmsweb-testbed.cern.ch' +config.Reporter.oracleFileTrans = '/crabserver/preprod/filetransfers' +config.Reporter.asoworker = "asodciangot1" +config.Reporter.retry_time = 12 config.component_('DBSPublisher') config.DBSPublisher.logLevel = 'INFO' config.DBSPublisher.log_level = logging.INFO @@ -114,22 +124,6 @@ config.DBSPublisher.algoName = 'FIFOPriority' config.DBSPublisher.block_closure_timeout = 18800 config.DBSPublisher.publish_dbs_url = 'https://cmsweb.cern.ch/dbs/prod/phys03/DBSWriter' -#config.component_('Analytics') -#config.Analytics.logLevel = 'INFO' -#config.Analytics.log_level = logging.INFO -#config.Analytics.logMsgFormat = "%(asctime)s:%(levelname)s:%(module)s:%(name)s: %(message)s" -#config.Analytics.user_monitoring_db = user_monitoring_db -#config.Analytics.couch_user_monitoring_instance = userMonitoringCouchUrl -#config.Analytics.analyticsPollingInterval = 900 -#config.Analytics.componentDir = config.General.workDir -#config.Analytics.namespace = 'AsyncStageOut.Analytics' -#config.Analytics.files_database = files_database -#config.Analytics.config_database = config_database -#config.Analytics.config_couch_instance = couchUrl -#config.Analytics.couch_instance = couchUrl -#config.Analytics.config_couch_instance = couchUrl -#config.Analytics.summaries_expiration_days = 6 -#config.Analytics.amq_auth_file = '/path/to/amq/auth/file' config.component_('FilesCleaner') config.FilesCleaner.logLevel = 'INFO' config.FilesCleaner.log_level = logging.INFO diff --git a/src/python/AsyncStageOut/PublisherDaemon.py b/src/python/AsyncStageOut/PublisherDaemon.py index fbf246e..d9e4ef1 100644 --- a/src/python/AsyncStageOut/PublisherDaemon.py +++ b/src/python/AsyncStageOut/PublisherDaemon.py @@ -1,4 +1,4 @@ -#pylint: disable=C0103,W0105 +#pylint: disable=C0103,W0105,broad-except,logging-not-lazy """ Here's the algorithm @@ -17,9 +17,13 @@ from AsyncStageOut.BaseDaemon import BaseDaemon from AsyncStageOut.PublisherWorker import PublisherWorker +from RESTInteractions import HTTPRequests +from ServerUtilities import encodeRequest, oracleOutputMapping result_list = [] + current_running = [] + def publish(user, config): """ Each worker executes this function. @@ -27,18 +31,19 @@ def publish(user, config): logging.debug("Trying to start the worker") try: worker = PublisherWorker(user, config) - except Exception, e: + except Exception as e: logging.debug("Worker cannot be created!:" %e) return user if worker.init: - logging.debug("Starting %s" %worker) - try: - worker() - except Exception, e: - logging.debug("Worker cannot start!:" %e) - return user + logging.debug("Starting %s" %worker) + try: + worker() + except Exception as e: + logging.debug("Worker cannot start!:" %e) + return user return user + def log_result(result): """ Each worker executes this callback. @@ -46,6 +51,7 @@ def log_result(result): result_list.append(result) current_running.remove(result) + class PublisherDaemon(BaseDaemon): """ _PublisherDaemon_ @@ -58,19 +64,29 @@ def __init__(self, config): #Need a better way to test this without turning off this next line BaseDaemon.__init__(self, config, 'DBSPublisher') - server = CouchServer(dburl=self.config.couch_instance, ckey=self.config.opsProxy, cert=self.config.opsProxy) + server = CouchServer(dburl=self.config.couch_instance, + ckey=self.config.opsProxy, + cert=self.config.opsProxy) self.db = server.connectDatabase(self.config.files_database) self.logger.debug('Connected to CouchDB') # Set up a factory for loading plugins - self.factory = WMFactory(self.config.schedAlgoDir, namespace = self.config.schedAlgoDir) + self.factory = WMFactory(self.config.schedAlgoDir, + namespace=self.config.schedAlgoDir) self.pool = Pool(processes=self.config.publication_pool_size) - def algorithm(self, parameters = None): + self.oracleDB = HTTPRequests(self.config.oracleDB, + self.config.opsProxy, + self.config.opsProxy) + + def algorithm(self, parameters=None): """ 1. Get a list of users with files to publish from the couchdb instance 2. For each user get a suitably sized input for publish 3. Submit the publish to a subprocess """ + if self.config.isOracle: + users = self.active_users(self.oracleDB) + else: users = self.active_users(self.db) self.logger.debug('kicking off pool %s' %users) for u in users: @@ -79,7 +95,8 @@ def algorithm(self, parameters = None): self.logger.debug('processing %s' %u) current_running.append(u) self.logger.debug('processing %s' %current_running) - self.pool.apply_async(publish,(u, self.config), callback = log_result) + self.pool.apply_async(publish, (u, self.config), + callback=log_result) def active_users(self, db): """ @@ -87,34 +104,72 @@ def active_users(self, db): following view: publish?group=true&group_level=1 """ - #TODO: Remove stale=ok for now until tested - #query = {'group': True, 'group_level': 3, 'stale': 'ok'} - query = {'group': True, 'group_level': 3} - try: - users = db.loadView('DBSPublisher', 'publish', query) - except Exception, e: - self.logger.exception('A problem occured when contacting couchDB: %s' % e) - return [] - - active_users = [] - if len(users['rows']) <= self.config.publication_pool_size: - active_users = users['rows'] - def keys_map(inputDict): - """ - Map function. - """ - return inputDict['key'] - active_users = map(keys_map, active_users) + if self.config.isOracle: + active_users = [] + + fileDoc = {} + fileDoc['asoworker'] = self.config.asoworker + fileDoc['subresource'] = 'acquirePublication' + + self.logger.debug("Retrieving publications from oracleDB") + + results = '' + try: + results = db.post(self.config.oracleFileTrans, + data=encodeRequest(fileDoc)) + except Exception as ex: + self.logger.error("Failed to acquire publications \ + from oracleDB: %s" %ex) + + fileDoc = dict() + fileDoc['asoworker'] = self.config.asoworker + fileDoc['subresource'] = 'acquiredPublication' + fileDoc['grouping'] = 0 + + self.logger.debug("Retrieving acquired puclications from oracleDB") + + try: + results = db.get(self.config.oracleFileTrans, + data=encodeRequest(fileDoc)) + except Exception as ex: + self.logger.error("Failed to acquire publications \ + from oracleDB: %s" %ex) + + result = oracleOutputMapping(results) + #TODO: join query for publisher (same of submitter) + unique_users = [list(i) for i in set(tuple([x['username'], x['user_group'], x['user_role']]) for x in result + if x['transfer_state']==3)] + return unique_users else: - sorted_users = self.factory.loadObject(self.config.algoName, args = [self.config, self.logger, users['rows'], self.config.publication_pool_size], getFromCache = False, listFlag = True) - #active_users = random.sample(users['rows'], self.config.publication_pool_size) - active_users = sorted_users()[:self.config.publication_pool_size] - self.logger.info('%s active users' % len(active_users)) - self.logger.debug('Active users are: %s' % active_users) + # TODO: Remove stale=ok for now until tested + # query = {'group': True, 'group_level': 3, 'stale': 'ok'} + query = {'group': True, 'group_level': 3} + try: + users = db.loadView('DBSPublisher', 'publish', query) + except Exception as e: + self.logger.exception('A problem occured \ + when contacting couchDB: %s' % e) + return [] + + if len(users['rows']) <= self.config.publication_pool_size: + active_users = users['rows'] + active_users = [x['key'] for x in active_users] + else: + pool_size=self.config.publication_pool_size + sorted_users = self.factory.loadObject(self.config.algoName, + args=[self.config, + self.logger, + users['rows'], + pool_size], + getFromCache=False, + listFlag = True) + active_users = sorted_users()[:self.config.publication_pool_size] + self.logger.info('%s active users' % len(active_users)) + self.logger.debug('Active users are: %s' % active_users) - return active_users + return active_users - def terminate(self, parameters = None): + def terminate(self): """ Called when thread is being terminated. """ diff --git a/src/python/AsyncStageOut/PublisherWorker.py b/src/python/AsyncStageOut/PublisherWorker.py index 37d09e0..fc8cb6c 100644 --- a/src/python/AsyncStageOut/PublisherWorker.py +++ b/src/python/AsyncStageOut/PublisherWorker.py @@ -1,5 +1,5 @@ #!/usr/bin/env -#pylint: disable-msg=C0103 +#pylint: disable-msg=C0103,broad-except,logging-not-lazy,line-too-long ''' The Publisherworker does the following: @@ -35,17 +35,34 @@ from AsyncStageOut import getDNFromUserName from AsyncStageOut import getCommonLogFormatter +from RESTInteractions import HTTPRequests +from ServerUtilities import getHashLfn, PUBLICATIONDB_STATUSES, encodeRequest, oracleOutputMapping class PublisherWorker: - + """ + Publish data component + """ def __init__(self, user, config): """ store the user and tfc the worker """ + self.lfn_map = {} self.user = user[0] self.group = user[1] self.role = user[2] self.config = config + + # This flag is to force calling the publish method (e.g. because the workflow + # status is terminal) even if regular criteria would say not to call it (e.g. + # because there was already a publication done recently for this workflow and + # there are not enough files yet for another block publication). + self.force_publication = False + + # Check if the workflow has expired. If it has, force the publication for the + # available ready files and mark the rest of the files as 'publication failed'. + self.force_failure = False + self.publication_failure_msg = "" + logging.basicConfig(level=config.log_level) self.logger = logging.getLogger('DBSPublisher-Worker-%s' % self.user) formatter = getCommonLogFormatter(self.config) @@ -80,15 +97,23 @@ def __init__(self, user, config): if os.getenv("TEST_ASO"): self.db = None else: - server = CouchServer(dburl=self.config.couch_instance, ckey=self.config.opsProxy, cert=self.config.opsProxy) + server = CouchServer(dburl=self.config.couch_instance, + ckey=self.config.opsProxy, + cert=self.config.opsProxy) self.db = server.connectDatabase(self.config.files_database) if hasattr(self.config, "cache_area"): try: - defaultDelegation['myproxyAccount'] = re.compile('https?://([^/]*)/.*').findall(self.config.cache_area)[0] + getCache = re.compile('https?://([^/]*)/.*') + myproxyAccount = getCache.findall(self.config.cache_area)[0] + defaultDelegation['myproxyAccount'] = myproxyAccount self.cache_area = self.config.cache_area except IndexError: - self.logger.error('MyproxyAccount parameter cannot be retrieved from %s . Fallback to user cache_area ' % (self.config.cache_area)) - query = {'key':self.user} + self.logger.error('MyproxyAccount parameter \ + cannot be retrieved from %s . \ + Fallback to user cache_area ' + % self.config.cache_area) + + query = {'key': self.user} try: self.user_cache_area = self.db.loadView('DBSPublisher', 'cache_area', query)['rows'] except Exception as ex: @@ -100,7 +125,7 @@ def __init__(self, user, config): self.cache_area = self.user_cache_area[0]['value'][0]+self.user_cache_area[0]['value'][1] defaultDelegation['myproxyAccount'] = re.compile('https?://([^/]*)/.*').findall(self.cache_area)[0] except IndexError: - self.logger.error('MyproxyAccount parameter cannot be retrieved from %s' % (self.cache_area)) + self.logger.error('MyproxyAccount parameter cannot be retrieved from %s' % self.cache_area) if getattr(self.config, 'serviceCert', None): defaultDelegation['server_cert'] = self.config.serviceCert if getattr(self.config, 'serviceKey', None): @@ -109,11 +134,17 @@ def __init__(self, user, config): try: if not os.getenv("TEST_ASO"): defaultDelegation['userDN'] = self.userDN + if self.group is None: + defaultDelegation['group'] = '' + else: defaultDelegation['group'] = self.group + if self.role is None: + defaultDelegation['role'] = '' + else: defaultDelegation['role'] = self.role valid, proxy = getProxy(defaultDelegation, self.logger) except Exception as ex: - msg = "Error getting the user proxy" + msg = "Error getting the user proxy" msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(msg) @@ -124,7 +155,7 @@ def __init__(self, user, config): # This will be moved soon self.logger.error('Did not get valid proxy. Setting proxy to ops proxy') self.userProxy = self.config.opsProxy - #self.cache_area = self.config.cache_area + # self.cache_area = self.config.cache_area self.phedexApi = PhEDEx(responseType='json') self.max_files_per_block = max(1, self.config.max_files_per_block) self.block_publication_timeout = self.config.block_closure_timeout @@ -150,6 +181,9 @@ def __init__(self, user, config): msg += str(traceback.format_exc()) self.logger.debug(msg) + self.oracleDB = HTTPRequests(self.config.oracleDB, + self.config.opsProxy, + self.config.opsProxy) def __call__(self): """ @@ -161,53 +195,94 @@ def __call__(self): ## publish (i.e. the file has been transferred, but not yet published -nor its ## publication has failed-). We call them "active" files. active_user_workflows = [] - query = {'group': True, 'startkey': [self.user, self.group, self.role], 'endkey': [self.user, self.group, self.role, {}]} - try: - active_user_workflows = self.db.loadView('DBSPublisher', 'publish', query)['rows'] - except Exception as ex: - self.logger.error('A problem occured when contacting couchDB to get the list of active WFs: %s' % ex) - self.logger.info('Publications for %s will be retried next time' % self.user) - return - self.logger.debug('active user wfs: %s' % active_user_workflows) - self.logger.info('number of active user wfs: %s' % len(active_user_workflows)) - now = time.time() - self.lfn_map = {} - ## Loop over the user workflows. - for user_wf in active_user_workflows: + unique_user_workflows = [] + now = int(time.time()) - time.timezone + if self.config.isOracle: + fileDoc = dict() + fileDoc['asoworker'] = self.config.asoworker + fileDoc['subresource'] = 'acquiredPublication' + fileDoc['grouping'] = 1 + fileDoc['username'] = self.user + try: + results = self.oracleDB.get(self.config.oracleFileTrans, + data=encodeRequest(fileDoc)) + toPub_docs = oracleOutputMapping(results) + # self.logger.info('toPub_docs for %s' % toPub_docs) + active_user_workflows = [[x['username'], + x['user_group'], + x['user_role'], + x['taskname'] + ] + for x in toPub_docs if x['transfer_state']==3 and x['publication_state'] not in [2,3,5] + ] + + unique_user_workflows = [{'key':list(i)} for i in set(tuple(x) for x in active_user_workflows)] + except Exception as ex: + self.logger.error("Failed to get acquired publications \ + from oracleDB: %s" % ex) + return + else: + query = {'group': True, 'startkey': [self.user, self.group, self.role], 'endkey': [self.user, self.group, self.role, {}]} + try: + unique_user_workflows = self.db.loadView('DBSPublisher', 'publish', query)['rows'] + except Exception as ex: + self.logger.error('A problem occured when contacting couchDB \ + to get the list of active WFs: %s' % ex) + self.logger.info('Publications for %s will be retried next time' % self.user) + return + # self.logger.debug('active user wfs: %s' % active_user_workflows) + self.logger.info('number of active user wfs: %s' % len(active_user_workflows)) + # Loop over the user workflows + if self.config.isOracle: + active_ = [{'key': [x['username'], + x['user_group'], + x['user_role'], + x['taskname']], + 'value': [x['destination'], + x['source_lfn'], + x['destination_lfn'], + x['input_dataset'], + x['dbs_url'], + x['last_update'] + ]} + for x in toPub_docs if x['transfer_state']==3 and x['publication_state'] not in [2,3,5]] + # self.logger.debug("active_user_workflows: %s" %active_user_workflows) + for user_wf in unique_user_workflows: workflow = str(user_wf['key'][3]) wfnamemsg = "%s: " % (workflow) - ## This flag is to force calling the publish method (e.g. because the workflow - ## status is terminal) even if regular criteria would say not to call it (e.g. - ## because there was already a publication done recently for this workflow and - ## there are not enough files yet for another block publication). - self.force_publication = False - ## Get the list of active files in the workflow. - active_files = [] - query = {'reduce': False, 'key': user_wf['key']}#'stale': 'ok'} - try: - active_files = self.db.loadView('DBSPublisher', 'publish', query)['rows'] - except Exception as e: - msg = "A problem occured to retrieve the list of active files for %s: %s" % (self.user, e) - self.logger.error(wfnamemsg+msg) - msg = "Publications will be retried next time." - self.logger.info(wfnamemsg+msg) - continue - msg = "Number of active files: %s." % (len(active_files)) - self.logger.info(wfnamemsg+msg) - ## If there are no files to publish, continue with the next workflow. - if not active_files: - msg = "Continuing with next workflow/user in the loop." + # Get the list of active files in the workflow. + if self.config.isOracle: + active_files = [x for x in active_ + if x['key'] == user_wf['key']] + else: + query = {'reduce': False, 'key': user_wf['key']}#'stale': 'ok'} + try: + active_files = self.db.loadView('DBSPublisher', 'publish', query)['rows'] + except Exception as e: + msg = "A problem occured to retrieve \ + the list of active files for %s: %s" % (self.user, e) + self.logger.error(wfnamemsg+msg) + msg = "Publications will be retried next time." + self.logger.info(wfnamemsg+msg) + continue + msg = "Number of active files: %s." % (len(active_files)) self.logger.info(wfnamemsg+msg) - continue - ## Get the job endtime, destination site, input dataset and input DBS URL for - ## the active files in the workflow. Put the destination LFNs in a list of ready - ## files grouped by output dataset. + # If there are no files to publish, continue with the next workflow. + if not active_files: + msg = "Continuing with next workflow/user in the loop." + self.logger.info(wfnamemsg+msg) + continue + # Get the job endtime, destination site, input dataset and input DBS URL for + # the active files in the workflow. Put the destination LFNs in a list of ready + # files grouped by output dataset. lfn_ready = {} wf_jobs_endtime = [] pnn, input_dataset, input_dbs_url = "", "", "" for active_file in active_files: job_end_time = active_file['value'][5] - if job_end_time: + if job_end_time and self.config.isOracle: + wf_jobs_endtime.append(int(job_end_time) - time.timezone) + elif job_end_time: wf_jobs_endtime.append(int(time.mktime(time.strptime(str(job_end_time), '%Y-%m-%d %H:%M:%S'))) - time.timezone) source_lfn = active_file['value'][1] dest_lfn = active_file['value'][2] @@ -216,9 +291,6 @@ def __call__(self): pnn = str(active_file['value'][0]) input_dataset = str(active_file['value'][3]) input_dbs_url = str(active_file['value'][4]) - ## Group the destination LFNs by output dataset. We don't know the dataset names - ## yet, but we can use the fact that there is a one-to-one correspondence - ## between the original (without the jobid) filenames and the dataset names. filename = os.path.basename(dest_lfn) left_piece, jobid_fileext = filename.rsplit('_', 1) if '.' in jobid_fileext: @@ -227,19 +299,16 @@ def __call__(self): else: orig_filename = left_piece lfn_ready.setdefault(orig_filename, []).append(dest_lfn) - msg = "There are %s ready files in %s active files." % (sum(map(len, lfn_ready.values())), user_wf['value']) - self.logger.info(wfnamemsg+msg) - msg = "List of jobs end time (len = %s): %s" % (len(wf_jobs_endtime), wf_jobs_endtime) - self.logger.debug(wfnamemsg+msg) - ## Check if the workflow has expired. If it has, force the publication for the - ## available ready files and mark the rest of the files as 'publication failed'. - self.force_failure = False - self.publication_failure_msg = "" - if wf_jobs_endtime: - wf_jobs_endtime.sort() + # msg = "There are %s ready files in %s active files." % (sum(map(len, lfn_ready.values())), user_wf['value']) + # self.logger.info(wfnamemsg+msg) + try: + msg = "List of jobs end time (len = %s): %s" % (len(wf_jobs_endtime), wf_jobs_endtime) + self.logger.debug(wfnamemsg+msg) + if wf_jobs_endtime: + wf_jobs_endtime.sort() msg = "Oldest job end time: %s. Now: %s." % (wf_jobs_endtime[0], now) self.logger.debug(wfnamemsg+msg) - workflow_duration = (now - wf_jobs_endtime[0]) + workflow_duration = (now - int(wf_jobs_endtime[0])) workflow_expiration_time = self.config.workflow_expiration_time * 24*60*60 if workflow_duration > workflow_expiration_time: self.force_publication = True @@ -252,30 +321,32 @@ def __call__(self): msg = self.publication_failure_msg msg += " Will force the publication if possible or fail it otherwise." self.logger.info(wfnamemsg+msg) - ## List with the number of ready files per dataset. - lens_lfn_ready = map(lambda x: len(x), lfn_ready.values()) + except Exception: + self.logger.exception() + # List with the number of ready files per dataset. + lens_lfn_ready = map(len, lfn_ready.values()) msg = "Number of ready files per dataset: %s." % (lens_lfn_ready) self.logger.info(wfnamemsg+msg) - ## List with booleans that tell if there are more than max_files_per_block to - ## publish per dataset. - enough_lfn_ready = map(lambda x: x >= self.max_files_per_block, lens_lfn_ready) - ## Auxiliary flag. + # List with booleans that tell if there are more than max_files_per_block to + # publish per dataset. + enough_lfn_ready = [(x >= self.max_files_per_block) for x in lens_lfn_ready] + # Auxiliary flag. enough_lfn_ready_in_all_datasets = not False in enough_lfn_ready - ## If for any of the datasets there are less than max_files_per_block to publish, - ## check for other conditions to decide whether to publish that dataset or not. + # If for any of the datasets there are less than max_files_per_block to publish, + # check for other conditions to decide whether to publish that dataset or not. if enough_lfn_ready_in_all_datasets: - ## TODO: Check how often we are on this situation. I suspect it is not so often, - ## in which case I would remove the 'if enough_lfn_ready_in_all_datasets' and - ## always retrieve the workflow status as seems to me it is cleaner and makes - ## the code easier to understand. (Comment from Andres Tanasijczuk) - msg = "All datasets have more than %s ready files." % (self.max_files_per_block) + # TODO: Check how often we are on this situation. I suspect it is not so often, + # in which case I would remove the 'if enough_lfn_ready_in_all_datasets' and + # always retrieve the workflow status as seems to me it is cleaner and makes + # the code easier to understand. (Comment from Andres Tanasijczuk) + msg = "All datasets have more than %s ready files." % (self.max_files_per_block) msg += " No need to retrieve task status nor last publication time." self.logger.info(wfnamemsg+msg) else: msg = "At least one dataset has less than %s ready files." % (self.max_files_per_block) self.logger.info(wfnamemsg+msg) - ## Retrieve the workflow status. If the status can not be retrieved, continue - ## with the next workflow. + # Retrieve the workflow status. If the status can not be retrieved, continue + # with the next workflow. workflow_status = '' url = '/'.join(self.cache_area.split('/')[:-1]) + '/workflow' msg = "Retrieving status from %s" % (url) @@ -284,13 +355,19 @@ def __call__(self): data = {'workflow': workflow} header = {"Content-Type ":"application/json"} try: - _, res_ = self.connection.request(url, data, header, doseq=True, ckey=self.userProxy, cert=self.userProxy)#, verbose=True)# for debug + _, res_ = self.connection.request(url, + data, + header, + doseq=True, + ckey=self.userProxy, + cert=self.userProxy + )# , verbose=True) # for debug except Exception as ex: - msg = "Error retrieving status from cache. Fall back to user cache area" + msg = "Error retrieving status from cache. Fall back to user cache area" msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(wfnamemsg+msg) - query = {'key':self.user} + query = {'key': self.user} try: self.user_cache_area = self.db.loadView('DBSPublisher', 'cache_area', query)['rows'] except Exception as ex: @@ -301,9 +378,15 @@ def __call__(self): self.cache_area = self.user_cache_area[0]['value'][0]+self.user_cache_area[0]['value'][1]+'/filemetadata' try: - _, res_ = self.connection.request(url, data, header, doseq=True, ckey=self.userProxy, cert=self.userProxy)#, verbose=True)# for debug + _, res_ = self.connection.request(url, + data, + header, + doseq=True, + ckey=self.userProxy, + cert=self.userProxy + )#, verbose=True)# for debug except Exception as ex: - msg = "Error retrieving status from user cache area." + msg = "Error retrieving status from user cache area." msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(wfnamemsg+msg) @@ -314,32 +397,32 @@ def __call__(self): buf.close() res = json.loads(res_) workflow_status = res['result'][0]['status'] - msg = "Task status is %s." % (workflow_status) + msg = "Task status is %s." % workflow_status self.logger.info(wfnamemsg+msg) except ValueError: msg = "Workflow removed from WM." self.logger.error(wfnamemsg+msg) workflow_status = 'REMOVED' except Exception as ex: - msg = "Error loading task status!" + msg = "Error loading task status!" msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(wfnamemsg+msg) - ## If the workflow status is terminal, go ahead and publish all the ready files - ## in the workflow. + # If the workflow status is terminal, go ahead and publish all the ready files + # in the workflow. if workflow_status in ['COMPLETED', 'FAILED', 'KILLED', 'REMOVED']: self.force_publication = True msg = "Considering task status as terminal. Will force publication." self.logger.info(wfnamemsg+msg) - ## Otherwise... + # Otherwise... else: msg = "Task status is not considered terminal." self.logger.info(wfnamemsg+msg) msg = "Getting last publication time." self.logger.info(wfnamemsg+msg) - ## Get when was the last time a publication was done for this workflow (this - ## should be more or less independent of the output dataset in case there are - ## more than one). + # Get when was the last time a publication was done for this workflow (this + # should be more or less independent of the output dataset in case there are + # more than one). query = {'reduce': True, 'key': user_wf['key']} try: last_publication_time = self.db.loadView('DBSPublisher', 'last_publication', query)['rows'] @@ -349,18 +432,18 @@ def __call__(self): else: msg = "Last publication time: %s." % (last_publication_time) self.logger.debug(wfnamemsg+msg) - ## If this is the first time a publication would be done for this workflow, go - ## ahead and publish. + # If this is the first time a publication would be done for this workflow, go + # ahead and publish. if not last_publication_time: self.force_publication = True msg = "There was no previous publication. Will force publication." self.logger.info(wfnamemsg+msg) - ## Otherwise... + # Otherwise... else: msg = "Last published block: %s" % (last_publication_time[0]['value']['max']) self.logger.debug(wfnamemsg+msg) - ## If the last publication was long time ago (> our block publication timeout), - ## go ahead and publish. + # If the last publication was long time ago (> our block publication timeout), + # go ahead and publish. time_since_last_publication = now - last_publication_time[0]['value']['max'] hours = int(time_since_last_publication/60/60) minutes = int((time_since_last_publication - hours*60*60)/60) @@ -375,8 +458,8 @@ def __call__(self): msg += " (less than the timeout of %sh:%sm)." % (timeout_hours, timeout_minutes) msg += " Not enough to force publication." self.logger.info(wfnamemsg+msg) - ## Call the publish method with the lists of ready files to publish for this - ## workflow grouped by datasets. + # Call the publish method with the lists of ready files to publish for this + # workflow grouped by datasets. result = self.publish(workflow, input_dataset, input_dbs_url, pnn, lfn_ready) for dataset in result.keys(): published_files = result[dataset].get('published', []) @@ -388,93 +471,157 @@ def __call__(self): force_failure = result[dataset].get('force_failure', False) self.mark_failed(workflow, failed_files, failure_reason, force_failure) - self.logger.info("Publications for user %s (group: %s, role: %s) completed." % (self.user, self.group, self.role)) - + self.logger.info("Publications for user %s (group: %s, role: %s) completed." % (self.user, + self.group, + self.role)) - def mark_good(self, workflow, files=[]): + def mark_good(self, workflow, files): """ Mark the list of files as tranferred """ - wfnamemsg = "%s: " % (workflow) + wfnamemsg = "%s: " % workflow last_update = int(time.time()) - for lfn in files: - data = {} - source_lfn = self.lfn_map[lfn] - docId = getHashLfn(source_lfn) - msg = "Marking file %s as published." % (lfn) - msg += " Document id: %s (source LFN: %s)." % (docId, source_lfn) - self.logger.info(wfnamemsg+msg) - data['publication_state'] = 'published' - data['last_update'] = last_update + if self.config.isOracle: + for lfn in files: + data = {} + source_lfn = self.lfn_map[lfn] + docId = getHashLfn(source_lfn) + msg = "Marking file %s as published." % lfn + msg += " Document id: %s (source LFN: %s)." % (docId, source_lfn) + self.logger.info(wfnamemsg+msg) + data['asoworker'] = self.config.asoworker + data['subresource'] = 'updatePublication' + data['list_of_ids'] = docId + data['list_of_publication_state'] = 'DONE' + try: + result = self.oracleDB.post(self.config.oracleFileTrans, + data=encodeRequest(data)) + self.logger.debug("updated: %s " % docId) + except Exception as ex: + self.logger.error("Error during status update: %s" %ex) + + else: + for lfn in files: + data = {} + source_lfn = self.lfn_map[lfn] + docId = getHashLfn(source_lfn) + msg = "Marking file %s as published." % lfn + msg += " Document id: %s (source LFN: %s)." % (docId, source_lfn) + self.logger.info(wfnamemsg+msg) + data['publication_state'] = 'published' + data['last_update'] = last_update + try: + updateUri = "/" + \ + self.db.name + \ + "/_design/DBSPublisher/_update/updateFile/" + \ + getHashLfn(source_lfn) + updateUri += "?" + urllib.urlencode(data) + self.logger.info(wfnamemsg+"URI: %s" % updateUri) + self.db.makeRequest(uri=updateUri, type="PUT", decode=False) + except Exception as ex: + msg = "Error updating document in Couch." + msg += str(ex) + msg += str(traceback.format_exc()) + self.logger.error(wfnamemsg+msg) try: - updateUri = "/" + self.db.name + "/_design/DBSPublisher/_update/updateFile/" + getHashLfn(source_lfn) - updateUri += "?" + urllib.urlencode(data) - self.logger.info(wfnamemsg+"URI: %s" % updateUri) - self.db.makeRequest(uri = updateUri, type = "PUT", decode = False) + self.db.commit() except Exception as ex: - msg = "Error updating document in Couch." + msg = "Error committing documents in Couch." msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(wfnamemsg+msg) - try: - self.db.commit() - except Exception as ex: - msg = "Error committing documents in Couch." - msg += str(ex) - msg += str(traceback.format_exc()) - self.logger.error(wfnamemsg+msg) - - def mark_failed(self, workflow, files=[], failure_reason="", force_failure=False): + def mark_failed(self, workflow, files, failure_reason="", force_failure=False): """ Something failed for these files so increment the retry count """ - wfnamemsg = "%s: " % (workflow) + wfnamemsg = "%s: " % workflow now = str(datetime.datetime.now()) last_update = int(time.time()) - for lfn in files: - data = {} - source_lfn = self.lfn_map[lfn] - docId = getHashLfn(source_lfn) - msg = "Marking file %s as failed." % (lfn) - msg += " Document id: %s (source LFN: %s)." % (docId, source_lfn) - self.logger.info(wfnamemsg+msg) - # Load document to get the retry_count - try: - document = self.db.document(docId) - except Exception as ex: - msg = "Error loading document from Couch." - msg += str(ex) - msg += str(traceback.format_exc()) - self.logger.error(wfnamemsg+msg) - continue - # Prepare data to update the document in couch - if len(document['publication_retry_count']) + 1 > self.max_retry or force_failure: - data['publication_state'] = 'publication_failed' - else: - data['publication_state'] = 'publishing' - data['last_update'] = last_update - data['retry'] = now - data['publication_failure_reason'] = failure_reason - # Update the document in couch + if self.config.isOracle: + h = 0 + for lfn in files: + h += 1 + self.logger.debug("Marking failed %s" % h) + source_lfn = self.lfn_map[lfn] + docId = getHashLfn(source_lfn) + self.logger.debug("Marking failed %s" % docId) + try: + docbyId = self.oracleDB.get(self.config.oracleFileTrans.replace('filetransfers', + 'fileusertransfers'), + data=encodeRequest({'subresource': 'getById', 'id': docId})) + except Exception: + self.logger.exception("Error updating failed docs.") + continue + document = oracleOutputMapping(docbyId, None)[0] + self.logger.debug("Document: %s" % document) + + fileDoc = dict() + fileDoc['asoworker'] = 'asodciangot1' + fileDoc['subresource'] = 'updatePublication' + fileDoc['list_of_ids'] = docId + + if force_failure or document['publish_retry_count'] > self.max_retry: + fileDoc['list_of_publication_state'] = 'FAILED' + else: + fileDoc['list_of_publication_state'] = 'RETRY' + # TODO: implement retry + fileDoc['list_of_retry_value'] = 1 + fileDoc['list_of_failure_reason'] = failure_reason + + try: + result = self.oracleDB.post(self.config.oracleFileTrans, + data=encodeRequest(fileDoc)) + self.logger.debug("updated: %s " % docId) + except Exception as ex: + msg = "Error updating document" + msg += str(ex) + msg += str(traceback.format_exc()) + self.logger.error(msg) + continue + else: + for lfn in files: + data = {} + source_lfn = self.lfn_map[lfn] + docId = getHashLfn(source_lfn) + msg = "Marking file %s as failed." % (lfn) + msg += " Document id: %s (source LFN: %s)." % (docId, source_lfn) + self.logger.info(wfnamemsg+msg) + # Load document to get the retry_count + try: + document = self.db.document(docId) + except Exception as ex: + msg = "Error loading document from Couch." + msg += str(ex) + msg += str(traceback.format_exc()) + self.logger.error(wfnamemsg+msg) + continue + # Prepare data to update the document in couch + if len(document['publication_retry_count']) + 1 > self.max_retry or force_failure: + data['publication_state'] = 'publication_failed' + else: + data['publication_state'] = 'publishing' + data['last_update'] = last_update + data['retry'] = now + data['publication_failure_reason'] = failure_reason + # Update the document in couch + try: + updateUri = "/" + self.db.name + "/_design/DBSPublisher/_update/updateFile/" + docId + updateUri += "?" + urllib.urlencode(data) + self.logger.info(wfnamemsg+"URI: %s" % updateUri) + self.db.makeRequest(uri=updateUri, type="PUT", decode=False) + except Exception as ex: + msg = "Error updating document in Couch." + msg += str(ex) + msg += str(traceback.format_exc()) + self.logger.error(wfnamemsg+msg) try: - updateUri = "/" + self.db.name + "/_design/DBSPublisher/_update/updateFile/" + docId - updateUri += "?" + urllib.urlencode(data) - self.logger.info(wfnamemsg+"URI: %s" % updateUri) - self.db.makeRequest(uri = updateUri, type = "PUT", decode = False) + self.db.commit() except Exception as ex: - msg = "Error updating document in Couch." + msg = "Error committing documents in Couch." msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(wfnamemsg+msg) - try: - self.db.commit() - except Exception as ex: - msg = "Error committing documents in Couch." - msg += str(ex) - msg += str(traceback.format_exc()) - self.logger.error(wfnamemsg+msg) - def publish(self, workflow, inputDataset, sourceURL, pnn, lfn_ready): """Perform the data publication of the workflow result. @@ -486,11 +633,12 @@ def publish(self, workflow, inputDataset, sourceURL, pnn, lfn_ready): :return: the publication status or result""" wfnamemsg = "%s: " % (workflow) retdict = {} - ## Don't publish anything if there are not enough ready files to make a block in - ## any of the datasets and publication was not forced. This is a first filtering - ## so to not retrieve the filemetadata unnecesarily. - if not False in map(lambda x: len(x) < self.max_files_per_block, lfn_ready.values()) and not self.force_publication: - msg = "Skipping publication as there are not enough ready files in any of the datasets (and publication was not forced)." + # Don't publish anything if there are not enough ready files to make a block in + # any of the datasets and publication was not forced. This is a first filtering + # so to not retrieve the filemetadata unnecesarily. + if False not in [len(x) < self.max_files_per_block for x in lfn_ready.values()] and not self.force_publication: + msg = "Skipping publication as there are not enough ready files" \ + "in any of the datasets (and publication was not forced)." self.logger.info(wfnamemsg+msg) return retdict ## Get the filemetada for this workflow. @@ -504,11 +652,16 @@ def publish(self, workflow, inputDataset, sourceURL, pnn, lfn_ready): except (tarfile.ReadError, RuntimeError): msg = "Error retrieving publication description files." self.logger.error(wfnamemsg+msg) - retdict = {'unknown_datasets': {'failed': lfn_ready_list, 'failure_reason': msg, 'force_failure': False, 'published': []}} + retdict = {'unknown_datasets': {'failed': lfn_ready_list, + 'failure_reason': msg, + 'force_failure': False, + 'published': [] + } + } return retdict msg = "Number of publication description files: %s" % (len(publDescFiles_list)) self.logger.info(wfnamemsg+msg) - ## Group the filemetadata according to the output dataset. + # Group the filemetadata according to the output dataset. msg = "Grouping publication description files according to output dataset." self.logger.info(wfnamemsg+msg) publDescFiles = {} @@ -522,34 +675,34 @@ def publish(self, workflow, inputDataset, sourceURL, pnn, lfn_ready): self.logger.error(wfnamemsg+msg) publDescFiles.setdefault(dataset, []).append(publDescFile) msg = "Publication description files: %s" % (publDescFiles) - self.logger.debug(wfnamemsg+msg) - ## Discard ready files for which there is no filemetadata. + # self.logger.debug(wfnamemsg+msg) + # Discard ready files for which there is no filemetadata. msg = "Discarding ready files for which there is no publication description file available (and vice versa)." self.logger.info(wfnamemsg+msg) toPublish = self.clean(lfn_ready_list, publDescFiles) - msg = "Number of publication description files to publish: %s" % (sum(map(lambda x: len(x), toPublish.values()))) + msg = "Number of publication description files to publish: %s" % (sum(map(len, toPublish.values()))) self.logger.info(wfnamemsg+msg) msg = "Publication description files to publish: %s" % (toPublish) - self.logger.debug(wfnamemsg+msg) - ## If there is nothing to publish, return. + # self.logger.debug(wfnamemsg+msg) + # If there is nothing to publish, return. if not toPublish: if self.force_failure: msg = "Publication description files not found! Will force publication failure." self.logger.error(wfnamemsg+msg) if self.publication_failure_msg: - msg += " %s" % (self.publication_failure_msg) + msg += " %s" % self.publication_failure_msg retdict = {'unknown_datasets': {'failed': lfn_ready_list, 'failure_reason': msg, 'force_failure': True, 'published': []}} return retdict - ## Don't publish datasets for which there are not enough ready files to make a - ## block, unless publication was forced. + # Don't publish datasets for which there are not enough ready files to make a + # block, unless publication was forced. for dataset in toPublish.keys(): files = toPublish[dataset] if len(files) < self.max_files_per_block and not self.force_publication: - msg = "There are only %s (less than %s) files to publish in dataset %s." % (len(files), self.max_files_per_block, dataset) + msg = "There are only %s (less than %s) files to publish in dataset %s." % (len(files), self.max_files_per_block, dataset) msg += " Will skip publication in this dataset (publication was not forced)." self.logger.info(wfnamemsg+msg) toPublish.pop(dataset) - ## Finally... publish if there is something left to publish. + # Finally... publish if there is something left to publish. if not toPublish: msg = "Nothing to publish." self.logger.info(wfnamemsg+msg) @@ -563,18 +716,21 @@ def publish(self, workflow, inputDataset, sourceURL, pnn, lfn_ready): msg = "DBS publication results: %s" % (dbsResults) self.logger.debug(wfnamemsg+msg) for dataset in toPublish.keys(): - retdict.update({dataset: {'failed': failed.get(dataset, []), 'failure_reason': failure_reason.get(dataset, ""), 'published': published.get(dataset, [])}}) + retdict.update({dataset: {'failed': failed.get(dataset, []), + 'failure_reason': failure_reason.get(dataset, ""), + 'published': published.get(dataset, []) + } + } + ) return retdict - def getPublDescFiles(self, workflow): """ Download and read the files describing what needs to be published """ - wfnamemsg = "%s: " % (workflow) + wfnamemsg = "%s: " % workflow buf = cStringIO.StringIO() - res = [] # TODO: input sanitization header = {"Content-Type ": "application/json"} data = {'taskname': workflow, 'filetype': 'EDM'} @@ -584,7 +740,7 @@ def getPublDescFiles(self, workflow): try: _, res_ = self.connection.request(url, data, header, doseq=True, ckey=self.userProxy, cert=self.userProxy)#, verbose=True)# for debug except Exception as ex: - msg = "Error retrieving data." + msg = "Error retrieving data." msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(wfnamemsg+msg) @@ -595,14 +751,13 @@ def getPublDescFiles(self, workflow): buf.close() res = json.loads(res_) except Exception as ex: - msg = "Error loading results. Trying next time!" + msg = "Error loading results. Trying next time!" msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(wfnamemsg+msg) return {} return res['result'] - def clean(self, lfn_ready_list, publDescFiles): """ Discard ready files that have no filematadata (and vice versa). @@ -615,8 +770,10 @@ def clean(self, lfn_ready_list, publDescFiles): publDescFiles_filtered.setdefault(dataset, []).append(outfile_metadata) return publDescFiles_filtered - def format_file_3(self, file): + """ + format file for DBS + """ nf = {'logical_file_name': file['lfn'], 'file_type': 'EDM', 'check_sum': unicode(file['cksum']), @@ -634,7 +791,6 @@ def format_file_3(self, file): nf['md5'] = file['md5'] return nf - def migrateByBlockDBS3(self, workflow, migrateApi, destReadApi, sourceApi, dataset, blocks = None): """ Submit one migration request for each block that needs to be migrated. @@ -644,11 +800,11 @@ def migrateByBlockDBS3(self, workflow, migrateApi, destReadApi, sourceApi, datas if blocks: blocksToMigrate = set(blocks) else: - ## This is for the case to migrate the whole dataset, which we don't do - ## at this point Feb/2015 (we always pass blocks). - ## Make a set with the blocks that need to be migrated. - blocksInDestDBS = set([block['block_name'] for block in destReadApi.listBlocks(dataset = dataset)]) - blocksInSourceDBS = set([block['block_name'] for block in sourceApi.listBlocks(dataset = dataset)]) + # This is for the case to migrate the whole dataset, which we don't do + # at this point Feb/2015 (we always pass blocks). + # Make a set with the blocks that need to be migrated. + blocksInDestDBS = set([block['block_name'] for block in destReadApi.listBlocks(dataset=dataset)]) + blocksInSourceDBS = set([block['block_name'] for block in sourceApi.listBlocks(dataset=dataset)]) blocksToMigrate = blocksInSourceDBS - blocksInDestDBS msg = "Dataset %s in destination DBS with %d blocks; %d blocks in source DBS." msg = msg % (dataset, len(blocksInDestDBS), len(blocksInSourceDBS)) @@ -668,13 +824,13 @@ def migrateByBlockDBS3(self, workflow, migrateApi, destReadApi, sourceApi, datas numFailedSubmissions = 0 migrationIdsInProgress = [] for block in list(blocksToMigrate): - ## Submit migration request for this block. + # Submit migration request for this block. (reqid, atDestination, alreadyQueued) = self.requestBlockMigration(workflow, migrateApi, sourceApi, block) - ## If the block is already in the destination DBS instance, we don't need - ## to monitor its migration status. If the migration request failed to be - ## submitted, we retry it next time. Otherwise, save the migration request - ## id in the list of migrations in progress. - if reqid == None: + # If the block is already in the destination DBS instance, we don't need + # to monitor its migration status. If the migration request failed to be + # submitted, we retry it next time. Otherwise, save the migration request + # id in the list of migrations in progress. + if reqid is None: blocksToMigrate.remove(block) if atDestination: numBlocksAtDestination += 1 @@ -685,14 +841,14 @@ def migrateByBlockDBS3(self, workflow, migrateApi, destReadApi, sourceApi, datas else: migrationIdsInProgress.append(reqid) if numBlocksAtDestination > 0: - msg = "%d blocks already in destination DBS." % (numBlocksAtDestination) + msg = "%d blocks already in destination DBS." % numBlocksAtDestination self.logger.info(wfnamemsg+msg) if numFailedSubmissions > 0: - msg = "%d block migration requests failed to be submitted." % (numFailedSubmissions) + msg = "%d block migration requests failed to be submitted." % numFailedSubmissions msg += " Will retry them later." self.logger.info(wfnamemsg+msg) if numQueuedUnkwonIds > 0: - msg = "%d block migration requests were already queued," % (numQueuedUnkwonIds) + msg = "%d block migration requests were already queued," % numQueuedUnkwonIds msg += " but could not retrieve their request id." self.logger.info(wfnamemsg+msg) numMigrationsInProgress = len(migrationIdsInProgress) @@ -700,50 +856,50 @@ def migrateByBlockDBS3(self, workflow, migrateApi, destReadApi, sourceApi, datas msg = "No migrations in progress." self.logger.info(wfnamemsg+msg) else: - msg = "%d block migration requests successfully submitted." % (numMigrationsInProgress) + msg = "%d block migration requests successfully submitted." % numMigrationsInProgress self.logger.info(wfnamemsg+msg) - msg = "List of migration requests ids: %s" % (migrationIdsInProgress) + msg = "List of migration requests ids: %s" % migrationIdsInProgress self.logger.info(wfnamemsg+msg) - ## Wait for up to 300 seconds, then return to the main loop. Note that we - ## don't fail or cancel any migration request, but just retry it next time. - ## Migration states: - ## 0 = PENDING - ## 1 = IN PROGRESS - ## 2 = SUCCESS - ## 3 = FAILED (failed migrations are retried up to 3 times automatically) - ## 9 = Terminally FAILED - ## In the case of failure, we expect the publisher daemon to try again in - ## the future. + # Wait for up to 300 seconds, then return to the main loop. Note that we + # don't fail or cancel any migration request, but just retry it next time. + # Migration states: + # 0 = PENDING + # 1 = IN PROGRESS + # 2 = SUCCESS + # 3 = FAILED (failed migrations are retried up to 3 times automatically) + # 9 = Terminally FAILED + # In the case of failure, we expect the publisher daemon to try again in + # the future. numFailedMigrations = 0 numSuccessfulMigrations = 0 waitTime = 30 numTimes = 10 - msg = "Will monitor their status for up to %d seconds." % (waitTime * numTimes) + msg = "Will monitor their status for up to %d seconds." % waitTime * numTimes self.logger.info(wfnamemsg+msg) for _ in range(numTimes): - msg = "%d block migrations in progress." % (numMigrationsInProgress) - msg += " Will check migrations status in %d seconds." % (waitTime) + msg = "%d block migrations in progress." % numMigrationsInProgress + msg += " Will check migrations status in %d seconds." % waitTime self.logger.info(wfnamemsg+msg) time.sleep(waitTime) - ## Check the migration status of each block migration request. - ## If a block migration has succeeded or terminally failes, remove the - ## migration request id from the list of migration requests in progress. + # Check the migration status of each block migration request. + # If a block migration has succeeded or terminally failes, remove the + # migration request id from the list of migration requests in progress. for reqid in list(migrationIdsInProgress): try: - status = migrateApi.statusMigration(migration_rqst_id = reqid) + status = migrateApi.statusMigration(migration_rqst_id=reqid) state = status[0].get('migration_status') retry = status[0].get('retry_count') except Exception as ex: - msg = "Could not get status for migration id %d:\n%s" % (reqid, ex.msg) + msg = "Could not get status for migration id %d:\n%s" % reqid, ex self.logger.error(wfnamemsg+msg) else: if state == 2: - msg = "Migration id %d succeeded." % (reqid) + msg = "Migration id %d succeeded." % reqid self.logger.info(wfnamemsg+msg) migrationIdsInProgress.remove(reqid) numSuccessfulMigrations += 1 if state == 9: - msg = "Migration id %d terminally failed." % (reqid) + msg = "Migration id %d terminally failed." % reqid self.logger.info(wfnamemsg+msg) msg = "Full status for migration id %d:\n%s" % (reqid, str(status)) self.logger.debug(wfnamemsg+msg) @@ -761,58 +917,57 @@ def migrateByBlockDBS3(self, workflow, migrateApi, destReadApi, sourceApi, datas migrationIdsInProgress.remove(reqid) numFailedMigrations += 1 numMigrationsInProgress = len(migrationIdsInProgress) - ## Stop waiting if there are no more migrations in progress. + # Stop waiting if there are no more migrations in progress. if numMigrationsInProgress == 0: break - ## If after the 300 seconds there are still some migrations in progress, return - ## with status 1. + # If after the 300 seconds there are still some migrations in progress, return + # with status 1. if numMigrationsInProgress > 0: - msg = "Migration of %s is taking too long - will delay the publication." % (dataset) + msg = "Migration of %s is taking too long - will delay the publication." % dataset self.logger.info(wfnamemsg+msg) - return (1, "Migration of %s is taking too long." % (dataset)) - msg = "Migration of %s has finished." % (dataset) + return 1, "Migration of %s is taking too long." % (dataset) + msg = "Migration of %s has finished." % dataset self.logger.info(wfnamemsg+msg) - msg = "Migration status summary (from %d input blocks to migrate):" % (numBlocksToMigrate) - msg += " at destination = %d," % (numBlocksAtDestination) - msg += " succeeded = %d," % (numSuccessfulMigrations) - msg += " failed = %d," % (numFailedMigrations) - msg += " submission failed = %d," % (numFailedSubmissions) - msg += " queued with unknown id = %d." % (numQueuedUnkwonIds) + msg = "Migration status summary (from %d input blocks to migrate):" % numBlocksToMigrate + msg += " at destination = %d," % numBlocksAtDestination + msg += " succeeded = %d," % numSuccessfulMigrations + msg += " failed = %d," % numFailedMigrations + msg += " submission failed = %d," % numFailedSubmissions + msg += " queued with unknown id = %d." % numQueuedUnkwonIds self.logger.info(wfnamemsg+msg) - ## If there were failed migrations, return with status 2. + # If there were failed migrations, return with status 2. if numFailedMigrations > 0 or numFailedSubmissions > 0: msg = "Some blocks failed to be migrated." self.logger.info(wfnamemsg+msg) - return (2, "Migration of %s failed." % (dataset)) - ## If there were no failed migrations, but we could not retrieve the request id - ## from some already queued requests, return with status 3. + return 2, "Migration of %s failed." % (dataset) + # If there were no failed migrations, but we could not retrieve the request id + # from some already queued requests, return with status 3. if numQueuedUnkwonIds > 0: msg = "Some block migrations were already queued, but failed to retrieve their request id." self.logger.info(wfnamemsg+msg) - return (3, "Migration of %s in unknown status." % (dataset)) + return 3, "Migration of %s in unknown status." % dataset if (numBlocksAtDestination + numSuccessfulMigrations) != numBlocksToMigrate: - msg = "Something unexpected has happened." + msg = "Something unexpected has happened." msg += " The numbers in the migration summary are not consistent." msg += " Make sure there is no bug in the code." self.logger.info(wfnamemsg+msg) - return (4, "Migration of %s in some inconsistent status." % (dataset)) + return 4, "Migration of %s in some inconsistent status." % dataset msg = "Migration completed." self.logger.info(wfnamemsg+msg) - migratedDataset = destReadApi.listDatasets(dataset = dataset, detail = True, dataset_access_type = '*') + migratedDataset = destReadApi.listDatasets(dataset=dataset, detail=True, dataset_access_type='*') if not migratedDataset or migratedDataset[0].get('dataset', None) != dataset: - return (4, "Migration of %s in some inconsistent status." % (dataset)) - return (0, "") - + return 4, "Migration of %s in some inconsistent status." % dataset + return 0, "" def requestBlockMigration(self, workflow, migrateApi, sourceApi, block): """ Submit migration request for one block, checking the request output. """ - wfnamemsg = "%s: " % (workflow) + wfnamemsg = "%s: " % workflow atDestination = False alreadyQueued = False reqid = None - msg = "Submiting migration request for block %s ..." % (block) + msg = "Submiting migration request for block %s ..." % block self.logger.info(wfnamemsg+msg) sourceURL = sourceApi.url data = {'migration_url': sourceURL, 'migration_input': block} @@ -824,34 +979,36 @@ def requestBlockMigration(self, workflow, migrateApi, sourceApi, block): self.logger.info(wfnamemsg+msg) atDestination = True else: - msg = "Request to migrate %s failed." % (block) - msg += "\nRequest detail: %s" % (data) - msg += "\nDBS3 exception: %s" % (he.msg) + msg = "Request to migrate %s failed." % block + msg += "\nRequest detail: %s" % data + msg += "\nDBS3 exception: %s" % he.msg self.logger.error(wfnamemsg+msg) if not atDestination: - msg = "Result of migration request: %s" % (str(result)) + msg = "Result of migration request: %s" % str(result) self.logger.debug(wfnamemsg+msg) reqid = result.get('migration_details', {}).get('migration_request_id') report = result.get('migration_report') - if reqid == None: - msg = "Migration request failed to submit." - msg += "\nMigration request results: %s" % (str(result)) + if reqid is None: + msg = "Migration request failed to submit." + msg += "\nMigration request results: %s" % str(result) self.logger.error(wfnamemsg+msg) if "REQUEST ALREADY QUEUED" in report: - ## Request could be queued in another thread, then there would be - ## no id here, so look by block and use the id of the queued request. + # Request could be queued in another thread, then there would be + # no id here, so look by block and use the id of the queued request. alreadyQueued = True try: - status = migrateApi.statusMigration(block_name = block) + status = migrateApi.statusMigration(block_name=block) reqid = status[0].get('migration_request_id') except Exception: msg = "Could not get status for already queued migration of block %s." % (block) self.logger.error(wfnamemsg+msg) - return (reqid, atDestination, alreadyQueued) - + return reqid, atDestination, alreadyQueued def createBulkBlock(self, output_config, processing_era_config, primds_config, \ dataset_config, acquisition_era_config, block_config, files): + """ + manage blocks + """ file_conf_list = [] file_parent_list = [] for file in files: @@ -859,10 +1016,10 @@ def createBulkBlock(self, output_config, processing_era_config, primds_config, \ file_conf_list.append(file_conf) file_conf['lfn'] = file['logical_file_name'] for parent_lfn in file.get('file_parent_list', []): - file_parent_list.append({'logical_file_name': file['logical_file_name'], \ + file_parent_list.append({'logical_file_name': file['logical_file_name'], 'parent_logical_file_name': parent_lfn['file_parent_lfn']}) del file['file_parent_list'] - blockDump = { \ + blockDump = { 'dataset_conf_list': [output_config], 'file_conf_list': file_conf_list, 'files': files, @@ -871,7 +1028,7 @@ def createBulkBlock(self, output_config, processing_era_config, primds_config, \ 'dataset': dataset_config, 'acquisition_era': acquisition_era_config, 'block': block_config, - 'file_parent_list': file_parent_list, + 'file_parent_list': file_parent_list } blockDump['block']['file_count'] = len(files) blockDump['block']['block_size'] = sum([int(file[u'file_size']) for file in files]) @@ -939,24 +1096,24 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): acquisition_era_name = "CRAB" processing_era_config = {'processing_version': 1, 'description': 'CRAB3_processing_era'} - ## Loop over the datasets to publish. + # Loop over the datasets to publish. msg = "Starting iteration through datasets/files for publication." self.logger.debug(wfnamemsg+msg) for dataset, files in toPublish.iteritems(): - ## Make sure to add the dataset name as a key in all the dictionaries that will - ## be returned. + # Make sure to add the dataset name as a key in all the dictionaries that will + # be returned. results[dataset] = {'files': 0, 'blocks': 0, 'existingFiles': 0} published[dataset] = [] failed[dataset] = [] publish_in_next_iteration[dataset] = [] failure_reason[dataset] = "" - ## If there are no files to publish for this dataset, continue with the next - ## dataset. + # If there are no files to publish for this dataset, continue with the next + # dataset. if not files: continue appName = 'cmsRun' - appVer = files[0]["swversion"] + appVer = files[0]["swversion"] pset_hash = files[0]['publishname'].split("-")[-1] gtag = str(files[0]['globaltag']) if gtag == "None": @@ -973,30 +1130,30 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): msg = "Successfully inserted primary dataset %s." % (primName) self.logger.debug(wfnamemsg+msg) - ## Find all (valid) files already published in this dataset. + # Find all (valid) files already published in this dataset. try: - existingDBSFiles = destReadApi.listFiles(dataset = dataset, detail = True) + existingDBSFiles = destReadApi.listFiles(dataset=dataset, detail=True) existingFiles = [f['logical_file_name'] for f in existingDBSFiles] existingFilesValid = [f['logical_file_name'] for f in existingDBSFiles if f['is_file_valid']] - msg = "Dataset %s already contains %d files" % (dataset, len(existingFiles)) + msg = "Dataset %s already contains %d files" % (dataset, len(existingFiles)) msg += " (%d valid, %d invalid)." % (len(existingFilesValid), len(existingFiles) - len(existingFilesValid)) self.logger.info(wfnamemsg+msg) results[dataset]['existingFiles'] = len(existingFiles) except Exception as ex: - msg = "Error when listing files in DBS: %s" % (str(ex)) + msg = "Error when listing files in DBS: %s" % (str(ex)) msg += "\n%s" % (str(traceback.format_exc())) self.logger.error(wfnamemsg+msg) continue - ## Is there anything to do? + # Is there anything to do? workToDo = False for file in files: if file['lfn'] not in existingFilesValid: workToDo = True break - ## If there is no work to do (because all the files that were requested - ## to be published are already published and in valid state), put the - ## files in the list of published files and continue with the next dataset. + # If there is no work to do (because all the files that were requested + # to be published are already published and in valid state), put the + # files in the list of published files and continue with the next dataset. if not workToDo: msg = "Nothing uploaded, %s has these files already or not enough files." % (dataset) self.logger.info(wfnamemsg+msg) @@ -1026,96 +1183,102 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): self.logger.info(wfnamemsg+msg) del dataset_config['acquisition_era_name'] - ## List of all files that must (and can) be published. + # List of all files that must (and can) be published. dbsFiles = [] - ## Set of all the parent files from all the files requested to be published. + # Set of all the parent files from all the files requested to be published. parentFiles = set() - ## Set of parent files for which the migration to the destination DBS instance - ## should be skipped (because they were not found in DBS). + # Set of parent files for which the migration to the destination DBS instance + # should be skipped (because they were not found in DBS). parentsToSkip = set() - ## Set of parent files to migrate from the source DBS instance - ## to the destination DBS instance. + # Set of parent files to migrate from the source DBS instance + # to the destination DBS instance. localParentBlocks = set() - ## Set of parent files to migrate from the global DBS instance - ## to the destination DBS instance. + # Set of parent files to migrate from the global DBS instance + # to the destination DBS instance. globalParentBlocks = set() - ## Loop over all files to publish. + # Loop over all files to publish. for file in files: - ## Check if this file was already published and if it is valid. + # Check if this file was already published and if it is valid. if file['lfn'] not in existingFilesValid: - ## We have a file to publish. - ## Get the parent files and for each parent file do the following: - ## 1) Add it to the list of parent files. - ## 2) Find the block to which it belongs and insert that block name in - ## (one of) the set of blocks to be migrated to the destination DBS. + # We have a file to publish. + # Get the parent files and for each parent file do the following: + # 1) Add it to the list of parent files. + # 2) Find the block to which it belongs and insert that block name in + # (one of) the set of blocks to be migrated to the destination DBS. for parentFile in list(file['parents']): if parentFile not in parentFiles: parentFiles.add(parentFile) - ## Is this parent file already in the destination DBS instance? - ## (If yes, then we don't have to migrate this block.) + # Is this parent file already in the destination DBS instance? + # (If yes, then we don't have to migrate this block.) blocksDict = destReadApi.listBlocks(logical_file_name=parentFile) if not blocksDict: - ## No, this parent file is not in the destination DBS instance. - ## Maybe it is in the same DBS instance as the input dataset? + # No, this parent file is not in the destination DBS instance. + # Maybe it is in the same DBS instance as the input dataset? blocksDict = sourceApi.listBlocks(logical_file_name=parentFile) if blocksDict: - ## Yes, this parent file is in the same DBS instance as the input dataset. - ## Add the corresponding block to the set of blocks from the source DBS - ## instance that have to be migrated to the destination DBS. + # Yes, this parent file is in the same DBS instance as the input dataset. + # Add the corresponding block to the set of blocks from the source DBS + # instance that have to be migrated to the destination DBS. localParentBlocks.add(blocksDict[0]['block_name']) else: - ## No, this parent file is not in the same DBS instance as input dataset. - ## Maybe it is in global DBS instance? + # No, this parent file is not in the same DBS instance as input dataset. + # Maybe it is in global DBS instance? blocksDict = globalApi.listBlocks(logical_file_name=parentFile) if blocksDict: - ## Yes, this parent file is in global DBS instance. - ## Add the corresponding block to the set of blocks from global DBS - ## instance that have to be migrated to the destination DBS. + # Yes, this parent file is in global DBS instance. + # Add the corresponding block to the set of blocks from global DBS + # instance that have to be migrated to the destination DBS. globalParentBlocks.add(blocksDict[0]['block_name']) - ## If this parent file is not in the destination DBS instance, is not - ## the source DBS instance, and is not in global DBS instance, then it - ## means it is not known to DBS and therefore we can not migrate it. - ## Put it in the set of parent files for which migration should be skipped. + # If this parent file is not in the destination DBS instance, is not + # the source DBS instance, and is not in global DBS instance, then it + # means it is not known to DBS and therefore we can not migrate it. + # Put it in the set of parent files for which migration should be skipped. if not blocksDict: parentsToSkip.add(parentFile) - ## If this parent file should not be migrated because it is not known to DBS, - ## we remove it from the list of parents in the file-to-publish info dictionary - ## (so that when publishing, this "parent" file will not appear as a parent). + # If this parent file should not be migrated because it is not known to DBS, + # we remove it from the list of parents in the file-to-publish info dictionary + # (so that when publishing, this "parent" file will not appear as a parent). if parentFile in parentsToSkip: msg = "Skipping parent file %s, as it doesn't seem to be known to DBS." % (parentFile) self.logger.info(wfnamemsg+msg) if parentFile in file['parents']: file['parents'].remove(parentFile) - ## Add this file to the list of files to be published. + # Add this file to the list of files to be published. dbsFiles.append(self.format_file_3(file)) published[dataset].append(file['lfn']) - ## Print a message with the number of files to publish. + # Print a message with the number of files to publish. msg = "Found %d files not already present in DBS which will be published." % (len(dbsFiles)) self.logger.info(wfnamemsg+msg) - ## If there are no files to publish, continue with the next dataset. + # If there are no files to publish, continue with the next dataset. if len(dbsFiles) == 0: msg = "Nothing to do for this dataset." self.logger.info(wfnamemsg+msg) continue - ## Migrate parent blocks before publishing. - ## First migrate the parent blocks that are in the same DBS instance - ## as the input dataset. + # Migrate parent blocks before publishing. + # First migrate the parent blocks that are in the same DBS instance + # as the input dataset. if localParentBlocks: msg = "List of parent blocks that need to be migrated from %s:\n%s" % (sourceApi.url, localParentBlocks) self.logger.info(wfnamemsg+msg) - statusCode, failureMsg = self.migrateByBlockDBS3(workflow, migrateApi, destReadApi, sourceApi, inputDataset, localParentBlocks) + statusCode, failureMsg = self.migrateByBlockDBS3(workflow, + migrateApi, + destReadApi, + sourceApi, + inputDataset, + localParentBlocks + ) if statusCode: failureMsg += " Not publishing any files." self.logger.info(wfnamemsg+failureMsg) failed[dataset].extend([f['logical_file_name'] for f in dbsFiles]) failure_reason[dataset] = failureMsg - published[dataset] = filter(lambda x: x not in failed[dataset], published[dataset]) + published[dataset] = [x for x in published[dataset] if x not in failed[dataset]] continue - ## Then migrate the parent blocks that are in the global DBS instance. + # Then migrate the parent blocks that are in the global DBS instance. if globalParentBlocks: msg = "List of parent blocks that need to be migrated from %s:\n%s" % (globalApi.url, globalParentBlocks) self.logger.info(wfnamemsg+msg) @@ -1125,13 +1288,13 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): self.logger.info(wfnamemsg+failureMsg) failed[dataset].extend([f['logical_file_name'] for f in dbsFiles]) failure_reason[dataset] = failureMsg - published[dataset] = filter(lambda x: x not in failed[dataset], published[dataset]) + published[dataset] = [x for x in published[dataset] if x not in failed[dataset]] continue - ## Publish the files in blocks. The blocks must have exactly max_files_per_block - ## files, unless there are less than max_files_per_block files to publish to - ## begin with. If there are more than max_files_per_block files to publish, - ## publish as many blocks as possible and leave the tail of files for the next - ## PublisherWorker call, unless forced to published. + # Publish the files in blocks. The blocks must have exactly max_files_per_block + # files, unless there are less than max_files_per_block files to publish to + # begin with. If there are more than max_files_per_block files to publish, + # publish as many blocks as possible and leave the tail of files for the next + # PublisherWorker call, unless forced to published. block_count = 0 count = 0 while True: @@ -1139,16 +1302,18 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): files_to_publish = dbsFiles[count:count+self.max_files_per_block] try: block_config = {'block_name': block_name, 'origin_site_name': pnn, 'open_for_writing': 0} - msg = "Inserting files %s into block %s." % ([f['logical_file_name'] for f in files_to_publish], block_name) + msg = "Inserting files %s into block %s." % ([f['logical_file_name'] + for f in files_to_publish], block_name) self.logger.debug(wfnamemsg+msg) - blockDump = self.createBulkBlock(output_config, processing_era_config, primds_config, dataset_config, \ + blockDump = self.createBulkBlock(output_config, processing_era_config, + primds_config, dataset_config, acquisition_era_config, block_config, files_to_publish) - #self.logger.debug(wfnamemsg+"Block to insert: %s\n" % pprint.pformat(blockDump)) + # self.logger.debug(wfnamemsg+"Block to insert: %s\n" % pprint.pformat(blockDump)) destApi.insertBulkBlock(blockDump) block_count += 1 except Exception as ex: failed[dataset].extend([f['logical_file_name'] for f in files_to_publish]) - msg = "Error when publishing (%s) " % ", ".join(failed[dataset]) + msg = "Error when publishing (%s) " % ", ".join(failed[dataset]) msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(wfnamemsg+msg) @@ -1158,15 +1323,17 @@ def publishInDBS3(self, workflow, sourceURL, inputDataset, toPublish, pnn): if len(files_to_publish_next) < self.max_files_per_block: publish_in_next_iteration[dataset].extend([f['logical_file_name'] for f in files_to_publish_next]) break - published[dataset] = filter(lambda x: x not in failed[dataset] + publish_in_next_iteration[dataset], published[dataset]) - ## Fill number of files/blocks published for this dataset. + published[dataset] = [x for x in published[dataset] + if x not in failed[dataset] + publish_in_next_iteration[dataset]] + # Fill number of files/blocks published for this dataset. results[dataset]['files'] = len(dbsFiles) - len(failed[dataset]) - len(publish_in_next_iteration[dataset]) results[dataset]['blocks'] = block_count - ## Print a publication status summary for this dataset. - msg = "End of publication status for dataset %s:" % (dataset) + # Print a publication status summary for this dataset. + msg = "End of publication status for dataset %s:" % (dataset) msg += " failed (%s) %s" % (len(failed[dataset]), failed[dataset]) msg += ", published (%s) %s" % (len(published[dataset]), published[dataset]) - msg += ", publish_in_next_iteration (%s) %s" % (len(publish_in_next_iteration[dataset]), publish_in_next_iteration[dataset]) + msg += ", publish_in_next_iteration (%s) %s" % (len(publish_in_next_iteration[dataset]), + publish_in_next_iteration[dataset]) msg += ", results %s" % (results[dataset]) self.logger.info(wfnamemsg+msg) return failed, failure_reason, published, results diff --git a/src/python/AsyncStageOut/ReporterDaemon.py b/src/python/AsyncStageOut/ReporterDaemon.py index 97a2102..3bffe06 100644 --- a/src/python/AsyncStageOut/ReporterDaemon.py +++ b/src/python/AsyncStageOut/ReporterDaemon.py @@ -19,6 +19,7 @@ result_list = [] current_running = [] + def reporter(user, config): """ Each worker executes this function. @@ -37,9 +38,10 @@ def reporter(user, config): logging.debug("Reporter Worker cannot start!:" %e) return user else: - logging.debug("Worker cannot be initialized!") + logging.debug("Worker cannot be initialized!") return user + def log_result(result): """ Each worker executes this callback. @@ -47,6 +49,7 @@ def log_result(result): result_list.append(result) current_running.remove(result) + class ReporterDaemon(BaseDaemon): """ _TransferDaemon_ @@ -56,7 +59,7 @@ def __init__(self, config): """ Initialise class members """ - #Need a better way to test this without turning off this next line + # Need a better way to test this without turning off this next line BaseDaemon.__init__(self, config, 'AsyncTransfer') self.pool = Pool(processes=self.config.pool_size) @@ -72,8 +75,6 @@ def __init__(self, config): else: self.logger.error('Unknown error in mkdir' % e.errno) raise - result_list = [] - current_running = [] # Over riding setup() is optional, and not needed here def algorithm(self, parameters = None): @@ -83,7 +84,8 @@ def algorithm(self, parameters = None): """ users = [] for user_dir in os.listdir(self.dropbox_dir): - if os.path.isdir(os.path.join(self.dropbox_dir, user_dir)) and os.listdir(os.path.join(self.dropbox_dir, user_dir)): + if os.path.isdir(os.path.join(self.dropbox_dir, user_dir)) \ + and os.listdir(os.path.join(self.dropbox_dir, user_dir)): users.append(user_dir) self.logger.info('Active users %s' % len(users)) diff --git a/src/python/AsyncStageOut/ReporterWorker.py b/src/python/AsyncStageOut/ReporterWorker.py index a258a72..8394b99 100644 --- a/src/python/AsyncStageOut/ReporterWorker.py +++ b/src/python/AsyncStageOut/ReporterWorker.py @@ -19,15 +19,23 @@ import logging import datetime import traceback +import subprocess from WMCore.WMFactory import WMFactory from WMCore.Credential.Proxy import Proxy from WMCore.Database.CMSCouch import CouchServer +from WMCore.Services.PhEDEx.PhEDEx import PhEDEx +from WMCore.Storage.TrivialFileCatalog import readTFC from AsyncStageOut import getHashLfn from AsyncStageOut import getDNFromUserName from AsyncStageOut import getCommonLogFormatter +from RESTInteractions import HTTPRequests +from ServerUtilities import getHashLfn, generateTaskName,\ + PUBLICATIONDB_STATUSES, encodeRequest, oracleOutputMapping + + def getProxy(userdn, group, role, defaultDelegation, logger): """ _getProxy_ @@ -39,15 +47,45 @@ def getProxy(userdn, group, role, defaultDelegation, logger): config['group'] = group config['role'] = role proxy = Proxy(defaultDelegation) - proxyPath = proxy.getProxyFilename( True ) - timeleft = proxy.getTimeLeft( proxyPath ) + proxyPath = proxy.getProxyFilename(True) + timeleft = proxy.getTimeLeft(proxyPath) if timeleft is not None and timeleft > 3600: - return (True, proxyPath) + return True, proxyPath proxyPath = proxy.logonRenewMyProxy() - timeleft = proxy.getTimeLeft( proxyPath ) + timeleft = proxy.getTimeLeft(proxyPath) if timeleft is not None and timeleft > 0: - return (True, proxyPath) - return (False, None) + return True, proxyPath + return False, None + + +def execute_command( command, logger, timeout ): + """ + _execute_command_ + Funtion to manage commands. + """ + + stdout, stderr, rc = None, None, 99999 + proc = subprocess.Popen( + command, shell=True, cwd=os.environ['PWD'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + ) + t_beginning = time.time() + while True: + if proc.poll() is not None: + break + seconds_passed = time.time() - t_beginning + if timeout and seconds_passed > timeout: + proc.terminate() + logger.error('Timeout in %s execution.' % command ) + return rc, stdout, stderr + + time.sleep(0.1) + stdout, stderr = proc.communicate() + rc = proc.returncode + logger.debug('Executing : \n command : %s\n output : %s\n error: %s\n retcode : %s' % (command, stdout, stderr, rc)) + return rc, stdout, stderr class ReporterWorker: @@ -58,8 +96,10 @@ def __init__(self, user, config): """ self.user = user self.config = config + self.kibana_file = open(self.config.kibana_dir+"/"+self.user+"/"+"ended.json", 'a') self.dropbox_dir = '%s/dropbox/inputs' % self.config.componentDir logging.basicConfig(level=config.log_level) + self.site_tfc_map = {} self.logger = logging.getLogger('AsyncTransfer-Reporter-%s' % self.user) formatter = getCommonLogFormatter(self.config) for handler in logging.getLogger().handlers: @@ -74,7 +114,7 @@ def __init__(self, user, config): self.logger.debug("Trying to get DN") try: self.userDN = getDNFromUserName(self.user, self.logger) - except Exception, ex: + except Exception as ex: msg = "Error retrieving the user DN" msg += str(ex) msg += str(traceback.format_exc()) @@ -86,14 +126,13 @@ def __init__(self, user, config): return defaultDelegation = { 'logger': self.logger, - 'credServerPath' : \ - self.config.credentialDir, + 'credServerPath': self.config.credentialDir, # It will be moved to be getfrom couchDB 'myProxySvr': 'myproxy.cern.ch', 'min_time_left' : getattr(self.config, 'minTimeLeft', 36000), - 'serverDN' : self.config.serverDN, - 'uisource' : self.uiSetupScript, - 'cleanEnvironment' : getattr(self.config, 'cleanEnvironment', False) + 'serverDN': self.config.serverDN, + 'uisource': self.uiSetupScript, + 'cleanEnvironment': getattr(self.config, 'cleanEnvironment', False) } if hasattr(self.config, "cache_area"): try: @@ -108,11 +147,8 @@ def __init__(self, user, config): self.valid = False try: - self.valid, proxy = getProxy(self.userDN, "", "", defaultDelegation, self.logger) - - except Exception, ex: - + except Exception as ex: msg = "Error getting the user proxy" msg += str(ex) msg += str(traceback.format_exc()) @@ -126,25 +162,30 @@ def __init__(self, user, config): self.logger.error('Did not get valid proxy. Setting proxy to ops proxy') self.userProxy = config.opsProxy - try: - # Set up a factory for loading plugins - self.factory = WMFactory(self.config.pluginDir, namespace = self.config.pluginDir) - self.commandTimeout = 1200 - self.max_retry = config.max_retry - # Proxy management in Couch - os.environ['X509_USER_PROXY'] = self.userProxy + if self.config.isOracle: + try: + self.oracleDB = HTTPRequests(self.config.oracleDB, + config.opsProxy, + config.opsProxy) + except Exception: + self.logger.exception() + raise + else: server = CouchServer(dburl=self.config.couch_instance, ckey=self.config.opsProxy, cert=self.config.opsProxy) self.db = server.connectDatabase(self.config.files_database) - config_server = CouchServer(dburl=self.config.config_couch_instance, ckey=self.config.opsProxy, cert=self.config.opsProxy) - self.config_db = config_server.connectDatabase(self.config.config_database) - except Exception, ex: - msg = "Error contacting couchDB. " - msg += str("|"+ex+"|") - msg += str(traceback.format_exc()) - self.logger.error(msg) - self.init = False - return + # Set up a factory for loading plugins + self.factory = WMFactory(self.config.pluginDir, namespace = self.config.pluginDir) + self.commandTimeout = 1200 + self.max_retry = config.max_retry + # Proxy management in Couch + os.environ['X509_USER_PROXY'] = self.userProxy + try: + self.phedex = PhEDEx(responseType='xml', + dict={'key':self.config.opsProxy, + 'cert':self.config.opsProxy}) + except Exception as e: + self.logger.exception('PhEDEx exception: %s' % e) def __call__(self): """ @@ -160,24 +201,21 @@ def __call__(self): remove_good = True remove_failed = True failed_lfns = [] - updated_failed_lfns = [] failure_reason = [] good_lfns = [] - updated_good_lfns = [] self.logger.info("Updating %s" % input_file) - json_data = {} if os.path.basename(input_file).startswith('Reporter'): try: json_data = json.loads(open(input_file).read()) - except ValueError, e: + except ValueError as e: self.logger.error("Error loading %s" % e) self.logger.debug('Removing %s' % input_file) - os.unlink( input_file ) + os.unlink(input_file) continue - except Exception, e: + except Exception as e: self.logger.error("Error loading %s" % e) self.logger.debug('Removing %s' % input_file) - os.unlink( input_file ) + os.unlink(input_file) continue if json_data: self.logger.debug('Inputs: %s %s %s' % (json_data['LFNs'], json_data['transferStatus'], json_data['failure_reason'])) @@ -194,6 +232,12 @@ def __call__(self): failure_reason.append(json_data['failure_reason'][i]) self.logger.debug('Marking failed %s %s' %(failed_lfns, failure_reason)) updated_failed_lfns = self.mark_failed(failed_lfns, failure_reason) + + try: + self.kibana_file.write(str(time.time())+"\tFailed:\t"+str(len(updated_failed_lfns))+"\n") + except Exception as ex: + self.logger.error(ex) + if len(updated_failed_lfns) != len(failed_lfns): remove_failed = False @@ -206,6 +250,12 @@ def __call__(self): good_lfns.append(json_data['LFNs'][i]) self.logger.info('Marking good %s' %(good_lfns)) updated_good_lfns = self.mark_good(good_lfns) + + try: + self.kibana_file.write(str(time.time())+"\tFailed:\t"+str(len(updated_good_lfns))+"\n") + except Exception as ex: + self.logger.error(ex) + if len(updated_good_lfns) != len(good_lfns): remove_good = False @@ -234,64 +284,158 @@ def files_for_update(self): files_to_update.append(os.path.join(self.dropbox_dir, self.user, user_file)) return files_to_update - def mark_good(self, files=[]): + def mark_good(self, files): """ Mark the list of files as tranferred """ updated_lfn = [] - for lfn in files: + good_ids = [] + for it, lfn in enumerate(files): hash_lfn = getHashLfn(lfn) self.logger.info("Marking good %s" % hash_lfn) self.logger.debug("Marking good %s" % lfn) - try: - document = self.db.document(hash_lfn) - except Exception, ex: - msg = "Error loading document from couch" - msg += str(ex) - msg += str(traceback.format_exc()) - self.logger.error(msg) - continue - self.logger.info("Doc %s Loaded" % hash_lfn) - if document['state'] != 'killed' and document['state'] != 'done' and document['state'] != 'failed': - outputLfn = document['lfn'].replace('store/temp', 'store', 1) + if not self.config.isOracle: try: - now = str(datetime.datetime.now()) - last_update = time.time() - data = {} - data['end_time'] = now - data['state'] = 'done' - data['lfn'] = outputLfn - data['last_update'] = last_update - updateUri = "/" + self.db.name + "/_design/AsyncTransfer/_update/updateJobs/" + getHashLfn(lfn) - updateUri += "?" + urllib.urlencode(data) - self.db.makeRequest(uri = updateUri, type = "PUT", decode = False) - updated_lfn.append(lfn) - self.logger.debug("Marked good %s" % lfn) - except Exception, ex: - msg = "Error updating document in couch" + document = self.db.document(hash_lfn) + except Exception as ex: + msg = "Error loading document from couch" msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(msg) continue + self.logger.info("Doc %s Loaded" % hash_lfn) + try: + now = str(datetime.datetime.now()) + last_update = time.time() + if self.config.isOracle: + docId = getHashLfn(lfn) + good_ids.append(docId) + updated_lfn.append(lfn) + else: + if document['state'] != 'killed' and document['state'] != 'done' and document['state'] != 'failed': + outputLfn = document['lfn'].replace('store/temp', 'store', 1) + data = dict() + data['end_time'] = now + data['state'] = 'done' + data['lfn'] = outputLfn + data['last_update'] = last_update + updateUri = "/" + self.db.name + "/_design/AsyncTransfer/_update/updateJobs/" + getHashLfn(lfn) + updateUri += "?" + urllib.urlencode(data) + self.db.makeRequest(uri = updateUri, type = "PUT", decode = False) + updated_lfn.append(lfn) + self.logger.debug("Marked good %s" % lfn) + else: + updated_lfn.append(lfn) + try: + self.db.commit() + except Exception as ex: + msg = "Error commiting documents in couch" + msg += str(ex) + msg += str(traceback.format_exc()) + self.logger.error(msg) + continue + except Exception as ex: + msg = "Error updating document" + msg += str(ex) + msg += str(traceback.format_exc()) + self.logger.error(msg) + continue + if self.config.isOracle: try: - self.db.commit() - except Exception, ex: - msg = "Error commiting documents in couch" - msg += str(ex) - msg += str(traceback.format_exc()) - self.logger.error(msg) - continue - else: updated_lfn.append(lfn) - self.logger.debug("transferred file updated") + data = dict() + data['asoworker'] = self.config.asoworker + data['subresource'] = 'updateTransfers' + data['list_of_ids'] = good_ids + data['list_of_transfer_state'] = ["DONE" for x in good_ids] + result = self.oracleDB.post(self.config.oracleFileTrans, + data=encodeRequest(data)) + self.logger.debug("Marked good %s" % good_ids) + except Exception: + self.logger.exeption('Error updating document') + + self.logger.info("Transferred file %s updated, removing now source file" %docId) + try: + docbyId = self.oracleDB.get(self.config.oracleFileTrans.replace('filetransfers','fileusertransfers'), + data=encodeRequest({'subresource': 'getById', 'id': docId})) + document = oracleOutputMapping(docbyId, None)[0] + except Exception: + msg = "Error getting file from source" + self.logger.exception(msg) + raise + if document["source"] not in self.site_tfc_map: + self.logger.debug("site not found... gathering info from phedex") + self.site_tfc_map[document["source"]] = self.get_tfc_rules(document["source"]) + pfn = self.apply_tfc_to_lfn( '%s:%s' %(document["source"], lfn)) + self.logger.debug("File has to be removed now from source site: %s" %pfn) + self.remove_files(self.userProxy, pfn) + self.logger.debug("Transferred file removed from source") return updated_lfn - def mark_failed(self, files=[], failures_reasons = [], force_fail = False ): + def remove_files(self, userProxy, pfn): + + command = 'env -i X509_USER_PROXY=%s gfal-rm -v -t 180 %s' % \ + (userProxy, pfn) + logging.debug("Running remove command %s" % command) + try: + rc, stdout, stderr = execute_command(command, self.logger, 3600) + except Exception as ex: + self.logger.error(ex) + raise + if rc: + logging.info("Deletion command failed with output %s and error %s" %(stdout, stderr)) + else: + logging.info("File Deleted.") + return + + def get_tfc_rules(self, site): + """ + Get the TFC regexp for a given site. + """ + self.phedex.getNodeTFC(site) + try: + tfc_file = self.phedex.cacheFileName('tfc', inputdata={'node': site}) + except Exception: + self.logger.exception('A problem occured when getting the TFC regexp: %s') + return None + return readTFC(tfc_file) + + def apply_tfc_to_lfn(self, file): + """ + Take a CMS_NAME:lfn string and make a pfn. + Update pfn_to_lfn_mapping dictionary. + """ + try: + site, lfn = tuple(file.split(':')) + except Exception: + self.logger.exception('It does not seem to be an lfn %s' %file.split(':')) + return None + if site in self.site_tfc_map: + pfn = self.site_tfc_map[site].matchLFN('srmv2', lfn) + # TODO: improve fix for wrong tfc on sites + try: + if pfn.find("\\") != -1: pfn = pfn.replace("\\","") + if pfn.split(':')[0] != 'srm' and pfn.split(':')[0] != 'gsiftp' : + self.logger.error('Broken tfc for file %s at site %s' % (lfn, site)) + return None + except IndexError: + self.logger.error('Broken tfc for file %s at site %s' % (lfn, site)) + return None + except AttributeError: + self.logger.error('Broken tfc for file %s at site %s' % (lfn, site)) + return None + return pfn + else: + self.logger.error('Wrong site %s!' % site) + return None + + def mark_failed(self, files=[], failures_reasons=[], force_fail=False): """ Something failed for these files so increment the retry count """ updated_lfn = [] for lfn in files: data = {} + self.logger.debug("Document: %s" % lfn) if not isinstance(lfn, dict): if 'temp' not in lfn: temp_lfn = lfn.replace('store', 'store/temp', 1) @@ -304,58 +448,90 @@ def mark_failed(self, files=[], failures_reasons = [], force_fail = False ): temp_lfn = lfn['value'] docId = getHashLfn(temp_lfn) # Load document to get the retry_count - try: - document = self.db.document( docId ) - except Exception, ex: - msg = "Error loading document from couch" - msg += str(ex) - msg += str(traceback.format_exc()) - self.logger.error(msg) - continue - if document['state'] != 'killed' and document['state'] != 'done' and document['state'] != 'failed': - now = str(datetime.datetime.now()) - last_update = time.time() - # Prepare data to update the document in couch - if force_fail or len(document['retry_count']) + 1 > self.max_retry: - data['state'] = 'failed' - data['end_time'] = now - else: - data['state'] = 'retry' - fatal_error = self.determine_fatal_error(failures_reasons[files.index(lfn)]) - if fatal_error: - data['state'] = 'failed' - data['end_time'] = now + if self.config.isOracle: + try: + self.logger.debug("Document: %s" %docId) + docbyId = self.oracleDB.get(self.config.oracleFileTrans.replace('filetransfers', + 'fileusertransfers'), + data=encodeRequest({'subresource': 'getById', 'id': docId})) + document = oracleOutputMapping(docbyId)[0] + data = dict() + data['asoworker'] = self.config.asoworker + data['subresource'] = 'updateTransfers' + data['list_of_ids'] = docId - self.logger.debug("Failure list: %s" % failures_reasons) - self.logger.debug("Files: %s" % files) - self.logger.debug("LFN %s" % lfn) + if force_fail or document['transfer_retry_count'] + 1 > self.max_retry: + data['list_of_transfer_state'] = 'FAILED' + data['list_of_retry_value'] = 0 + else: + data['list_of_transfer_state'] = 'RETRY' + fatal_error = self.determine_fatal_error(failures_reasons[files.index(lfn)]) + if fatal_error: + data['list_of_transfer_state'] = 'FAILED' + data['list_of_failure_reason'] = failures_reasons[files.index(lfn)] + data['list_of_retry_value'] = 0 - data['failure_reason'] = failures_reasons[files.index(lfn)] - data['last_update'] = last_update - data['retry'] = now - # Update the document in couch - self.logger.debug("Marking failed %s" % docId) - try: - updateUri = "/" + self.db.name + "/_design/AsyncTransfer/_update/updateJobs/" + docId - updateUri += "?" + urllib.urlencode(data) - self.db.makeRequest(uri = updateUri, type = "PUT", decode = False) - updated_lfn.append(docId) - self.logger.debug("Marked failed %s" % docId) - except Exception, ex: - msg = "Error in updating document in couch" - msg += str(ex) - msg += str(traceback.format_exc()) - self.logger.error(msg) + self.logger.debug("update: %s" % data) + result = self.oracleDB.post(self.config.oracleFileTrans, + data=encodeRequest(data)) + updated_lfn.append(lfn) + self.logger.debug("Marked failed %s" % lfn) + except Exception as ex: + self.logger.error("Error updating document status: %s" %ex) continue + else: try: - self.db.commit() - except Exception, ex: - msg = "Error commiting documents in couch" + document = self.db.document( docId ) + except Exception as ex: + msg = "Error loading document from couch" msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(msg) continue - else: updated_lfn.append(docId) + if document['state'] != 'killed' and document['state'] != 'done' and document['state'] != 'failed': + now = str(datetime.datetime.now()) + last_update = time.time() + # Prepare data to update the document in couch + if force_fail or len(document['retry_count']) + 1 > self.max_retry: + data['state'] = 'failed' + data['end_time'] = now + else: + data['state'] = 'retry' + fatal_error = self.determine_fatal_error(failures_reasons[files.index(lfn)]) + if fatal_error: + data['state'] = 'failed' + data['end_time'] = now + + self.logger.debug("Failure list: %s" % failures_reasons) + self.logger.debug("Files: %s" % files) + self.logger.debug("LFN %s" % lfn) + + data['failure_reason'] = failures_reasons[files.index(lfn)] + data['last_update'] = last_update + data['retry'] = now + # Update the document in couch + self.logger.debug("Marking failed %s" % docId) + try: + updateUri = "/" + self.db.name + "/_design/AsyncTransfer/_update/updateJobs/" + docId + updateUri += "?" + urllib.urlencode(data) + self.db.makeRequest(uri = updateUri, type = "PUT", decode = False) + updated_lfn.append(docId) + self.logger.debug("Marked failed %s" % docId) + except Exception as ex: + msg = "Error in updating document in couch" + msg += str(ex) + msg += str(traceback.format_exc()) + self.logger.error(msg) + continue + try: + self.db.commit() + except Exception as ex: + msg = "Error commiting documents in couch" + msg += str(ex) + msg += str(traceback.format_exc()) + self.logger.error(msg) + continue + else: updated_lfn.append(docId) self.logger.debug("failed file updated") return updated_lfn diff --git a/src/python/AsyncStageOut/RetryManagerDaemon.py b/src/python/AsyncStageOut/RetryManagerDaemon.py index fbc9f42..c8fba6d 100644 --- a/src/python/AsyncStageOut/RetryManagerDaemon.py +++ b/src/python/AsyncStageOut/RetryManagerDaemon.py @@ -1,14 +1,12 @@ #!/usr/bin/env python -#pylint: disable-msg=W0613, W6501 +#pylint: disable-msg=W0613,invalid-name,logging-not-lazy,broad-except """ __RetryManagerPoller__ This component does the actualy retry logic. It allows to have different algorithms. """ -__all__ = [] -import os import time import urllib import logging @@ -20,19 +18,26 @@ from WMCore.Database.CMSCouch import CouchServer from AsyncStageOut.BaseDaemon import BaseDaemon +from RESTInteractions import HTTPRequests +from ServerUtilities import encodeRequest + +__all__ = [] -def convertdatetime(t): + +def convertdatetime(time_to_convert): """ Convert dates into useable format. """ - return int(time.mktime(t.timetuple())) + return int(time.mktime(time_to_convert.timetuple())) + def timestamp(): """ generate a timestamp """ - t = datetime.datetime.now() - return convertdatetime(t) + time_now = datetime.datetime.now() + return convertdatetime(time_now) + class RetryManagerException(WMException): """ @@ -41,6 +46,7 @@ class RetryManagerException(WMException): It's totally awesome, except it's not. """ + class RetryManagerDaemon(BaseDaemon): """ _RetryManagerPoller_ @@ -54,20 +60,29 @@ def __init__(self, config): """ BaseDaemon.__init__(self, config, 'RetryManager') + if self.config.isOracle: + self.oracleDB = HTTPRequests(self.config.oracleDB, + self.config.opsProxy, + self.config.opsProxy) + else: + try: + server = CouchServer(dburl=self.config.couch_instance, + ckey=self.config.opsProxy, + cert=self.config.opsProxy) + self.db = server.connectDatabase(self.config.files_database) + except Exception as e: + self.logger.exception('A problem occured when connecting to couchDB: %s' % e) + raise + self.logger.debug('Connected to files DB') + + # Set up a factory for loading plugins + self.factory = WMFactory(self.config.retryAlgoDir, namespace=self.config.retryAlgoDir) try: - server = CouchServer(dburl=self.config.couch_instance, ckey=self.config.opsProxy, cert=self.config.opsProxy) - self.db = server.connectDatabase(self.config.files_database) - except Exception, e: - self.logger.exception('A problem occured when connecting to couchDB: %s' % e) - raise - self.logger.debug('Connected to files DB') - - # Set up a factory for loading plugins - self.factory = WMFactory(self.config.retryAlgoDir, namespace = self.config.retryAlgoDir) - try: - self.plugin = self.factory.loadObject(self.config.algoName, self.config, getFromCache = False, listFlag = True) - except Exception, ex: - msg = "Error loading plugin %s on path %s\n" % (self.config.algoName, self.config.retryAlgoDir) + self.plugin = self.factory.loadObject(self.config.algoName, self.config, + getFromCache=False, listFlag=True) + except Exception as ex: + msg = "Error loading plugin %s on path %s\n" % (self.config.algoName, + self.config.retryAlgoDir) msg += str(ex) self.logger.error(msg) raise RetryManagerException(msg) @@ -81,14 +96,26 @@ def terminate(self, params): logging.debug("Terminating. doing one more pass before we die") self.algorithm(params) - - def algorithm(self, parameters = None): + def algorithm(self, parameters=None): """ Performs the doRetries method, loading the appropriate plugin for each job and handling it. """ logging.debug("Running retryManager algorithm") - self.doRetries() + if self.config.isOracle: + fileDoc = dict() + fileDoc['asoworker'] = self.config.asoworker + fileDoc['subresource'] = 'retryTransfers' + fileDoc['time_to'] = self.cooloffTime + self.logger.debug('fileDoc: %s' % fileDoc) + try: + results = self.oracleDB.post(self.config.oracleFileTrans, + data=encodeRequest(fileDoc)) + except Exception: + self.logger.exception("Failed to get retry transfers in oracleDB: %s") + logging.info("Retried files in cooloff: %s" % str(results)) + else: + self.doRetries() def processRetries(self, files): """ @@ -101,7 +128,7 @@ def processRetries(self, files): return propList = [] - fileList = self.loadFilesFromList(recList = files) + fileList = self.loadFilesFromList(recList=files) logging.debug("Files in cooloff %s" % fileList) # Now we should have the files propList = self.selectFilesToRetry(fileList) @@ -112,22 +139,22 @@ def processRetries(self, files): self.logger.debug("Trying to resubmit %s" % file['id']) try: document = self.db.document(file['id']) - except Exception, ex: + except Exception as ex: msg = "Error loading document from couch" msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(msg) continue if document['state'] != 'killed': - data = {} + data = dict() data['state'] = 'new' data['last_update'] = time.time() data['retry'] = now updateUri = "/" + self.db.name + "/_design/AsyncTransfer/_update/updateJobs/" + file['id'] updateUri += "?" + urllib.urlencode(data) try: - self.db.makeRequest(uri = updateUri, type = "PUT", decode = False) - except Exception, ex: + self.db.makeRequest(uri=updateUri, type="PUT", decode=False) + except Exception as ex: msg = "Error updating document in couch" msg += str(ex) msg += str(traceback.format_exc()) @@ -166,10 +193,10 @@ def selectFilesToRetry(self, fileList): for file in fileList: logging.debug("Current file %s" %file) try: - if self.plugin.isReady(file = file, cooloffTime = self.cooloffTime): + if self.plugin.isReady(file=file, cooloffTime=self.cooloffTime): result.append(file) - except Exception, ex: - msg = "Exception while checking for cooloff timeout for file %s\n" % file + except Exception as ex: + msg = "Exception while checking for cooloff timeout for file %s\n" % file msg += str(ex) logging.error(msg) logging.debug("File: %s\n" % file) @@ -186,8 +213,9 @@ def doRetries(self): query = {'stale': 'ok'} try: files = self.db.loadView('AsyncTransfer', 'getFilesToRetry', query)['rows'] - except Exception, e: - self.logger.exception('A problem occured when contacting couchDB to retrieve LFNs: %s' % e) + except Exception as e: + self.logger.exception('A problem occured when contacting \ + couchDB to retrieve LFNs: %s' % e) return logging.info("Found %s files in cooloff" % len(files)) self.processRetries(files) diff --git a/src/python/AsyncStageOut/TransferDaemon.py b/src/python/AsyncStageOut/TransferDaemon.py index ad9cf31..c8e3c86 100644 --- a/src/python/AsyncStageOut/TransferDaemon.py +++ b/src/python/AsyncStageOut/TransferDaemon.py @@ -1,5 +1,4 @@ -#pylint: disable=C0103,W0105 - +#pylint: disable=C0103,W0105,W0703,W1201,W0141 """ Here's the algorithm @@ -8,14 +7,16 @@ 3. get active sites and build up a dictionary of TFC's 4. create a multiprocessing Pool of size N 5. spawn a process per user that - a. makes the ftscp copyjob - b. submits ftscp - c. deletes successfully transferred files + a. makes rest copyjob + b. submits to FTS """ import os import logging from multiprocessing import Pool +from RESTInteractions import HTTPRequests +from ServerUtilities import encodeRequest, oracleOutputMapping + from WMCore.WMFactory import WMFactory from WMCore.Database.CMSCouch import CouchServer from WMCore.Services.PhEDEx.PhEDEx import PhEDEx @@ -28,6 +29,7 @@ result_list = [] current_running = [] + def ftscp(user, tfc_map, config): """ Each worker executes this function. @@ -50,6 +52,7 @@ def ftscp(user, tfc_map, config): logging.debug("Worker cannot be initialized!") return user + def log_result(result): """ Each worker executes this callback. @@ -57,6 +60,7 @@ def log_result(result): result_list.append(result) current_running.remove(result) + class TransferDaemon(BaseDaemon): """ _TransferDaemon_ @@ -64,12 +68,19 @@ class TransferDaemon(BaseDaemon): """ def __init__(self, config): """ - Initialise class members + Initialise class members: + 1. check and create dropbox dir + 2. define oracle and couch (config and file instance) server connection + 3. PhEDEx connection + 4. Setup wmcore factory """ - #Need a better way to test this without turning off this next line + + self.doc_acq = '' + # Need a better way to test this without turning off this next line BaseDaemon.__init__(self, config, 'AsyncTransfer') self.dropbox_dir = '%s/dropbox/outputs' % self.config.componentDir + if not os.path.isdir(self.dropbox_dir): try: os.makedirs(self.dropbox_dir) @@ -86,88 +97,160 @@ def __init__(self, config): self.logger.exception('Unknown error in mkdir' % e.errno) raise - server = CouchServer(dburl=self.config.couch_instance, ckey=self.config.opsProxy, cert=self.config.opsProxy) - self.db = server.connectDatabase(self.config.files_database) config_server = CouchServer(dburl=self.config.config_couch_instance) self.config_db = config_server.connectDatabase(self.config.config_database) + if self.config.isOracle: + self.oracleDB = HTTPRequests(self.config.oracleDB, + self.config.opsProxy, + self.config.opsProxy) + else: + server = CouchServer(dburl=self.config.couch_instance, + ckey=self.config.opsProxy, + cert=self.config.opsProxy) + self.db = server.connectDatabase(self.config.files_database) self.logger.debug('Connected to CouchDB') self.pool = Pool(processes=self.config.pool_size) try: - self.phedex = PhEDEx(responseType='xml', dict = {'key': self.config.opsProxy, 'cert': self.config.opsProxy}) + self.phedex = PhEDEx(responseType='xml', + dict={'key':self.config.opsProxy, + 'cert':self.config.opsProxy}) except Exception as e: self.logger.exception('PhEDEx exception: %s' % e) # Set up a factory for loading plugins - self.factory = WMFactory(self.config.schedAlgoDir, namespace = self.config.schedAlgoDir) - - result_list = [] - current_running = [] + self.factory = WMFactory(self.config.schedAlgoDir, + namespace=self.config.schedAlgoDir) # Over riding setup() is optional, and not needed here - def algorithm(self, parameters = None): + def algorithm(self, parameters=None): """ - 1. Get a list of users with files to transfer from the couchdb instance - 2. For each user get a suitably sized input for ftscp (call to a list) - 3. Submit the ftscp to a subprocess + 1 Get transfer config from couchdb config instance + 2. Get a list of users with files to transfer from the db instance + (oracle or couch, by config flag) + 3. For each user get a suitably sized input for submission (call to a list) + 4. Submit to a subprocess """ - query = {'stale':'ok'} - try: - params = self.config_db.loadView('asynctransfer_config', 'GetTransferConfig', query) - self.config.max_files_per_transfer = params['rows'][0]['key'][1] - self.config.algoName = params['rows'][0]['key'][2] - except IndexError: - self.logger.exception('Config data could not be retrieved from the config database. Fallback to the config file') - except Exception as e: - self.logger.exception('A problem occured when contacting couchDB: %s' % e) - users = self.active_users(self.db) + if self.config.isOracle: + sites, users = self.oracleSiteUser(self.oracleDB) + else: + users = self.active_users(self.db) - sites = self.active_sites() - self.logger.info('%s active sites' % len(sites)) - self.logger.debug('Active sites are: %s' % sites) + sites = self.active_sites() + self.logger.info('%s active sites' % len(sites)) + self.logger.debug('Active sites are: %s' % sites) site_tfc_map = {} for site in sites: - # TODO: Remove this check once the ASO request will be validated before the upload. if site and str(site) != 'None' and str(site) != 'unknown': site_tfc_map[site] = self.get_tfc_rules(site) + self.logger.debug('tfc site: %s %s' % (site, self.get_tfc_rules(site))) self.logger.debug('kicking off pool') for u in users: - self.logger.debug('current_running %s' %current_running) + self.logger.debug('current_running %s' % current_running) if u not in current_running: - self.logger.debug('processing %s' %u) + self.logger.debug('processing %s' % u) current_running.append(u) - self.logger.debug('processing %s' %current_running) - self.pool.apply_async(ftscp,(u, site_tfc_map, self.config), callback = log_result) + self.logger.debug('processing %s' % current_running) + self.pool.apply_async(ftscp, (u, site_tfc_map, self.config), + callback=log_result) + + def oracleSiteUser(self, db): + """ + 1. Acquire transfers from DB + 2. Get acquired users and destination sites + """ + fileDoc = dict() + fileDoc['asoworker'] = self.config.asoworker + fileDoc['subresource'] = 'acquireTransfers' + + self.logger.debug("Retrieving transfers from oracleDB") + + try: + result = db.post(self.config.oracleFileTrans, + data=encodeRequest(fileDoc)) + except Exception as ex: + self.logger.error("Failed to acquire transfers \ + from oracleDB: %s" %ex) + pass + + self.doc_acq = str(result) + + fileDoc = dict() + fileDoc['asoworker'] = self.config.asoworker + fileDoc['subresource'] = 'acquiredTransfers' + fileDoc['grouping'] = 0 + + self.logger.debug("Retrieving users from oracleDB") + + try: + results = db.get(self.config.oracleFileTrans, + data=encodeRequest(fileDoc)) + except Exception: + self.logger.exception("Failed to get acquired transfers \ + from oracleDB.") + results = None + pass + + documents = oracleOutputMapping(results) + + for doc in documents: + if doc['user_role'] is None: + doc['user_role'] = "" + if doc['user_group'] is None: + doc['user_group'] = "" + + unique_users = [] + try: + unique_users = [list(i) for i in set(tuple([x['username'], + x['user_group'], + x['user_role']]) for x in documents)] + except Exception as ex: + self.logger.error("Failed to map active users: %s" %ex) + + if len(unique_users) <= self.config.pool_size: + active_users = unique_users + else: + active_users = unique_users[:self.config.pool_size] + + self.logger.info('%s active users' % len(active_users)) + self.logger.debug('Active users are: %s' % active_users) + + active_sites_dest = [x['destination'] for x in documents] + active_sites = active_sites_dest + [x['source'] for x in documents] + + self.logger.debug('Active sites are: %s' % list(set(active_sites))) + return list(set(active_sites)), active_users def active_users(self, db): """ - Query a view for users with files to transfer. Get this from the - following view: - ftscp?group=true&group_level=1 + Query a view for users with files to transfer. + get this from the following view: + ftscp?group=true&group_level=1 """ - #TODO: Remove stale=ok for now until tested - #query = {'group': True, 'group_level': 3, 'stale': 'ok'} query = {'group': True, 'group_level': 3} try: users = db.loadView(self.config.ftscp_design, 'ftscp_all', query) except Exception as e: - self.logger.exception('A problem occured when contacting couchDB: %s' % e) + self.logger.exception('A problem occured when\ + contacting couchDB: %s' % e) return [] - active_users = [ x['key'] for x in users['rows'] ] - - self.logger.info('Requested %s active users' % len(active_users)) - self.logger.debug('Requested active users are: %s' % active_users) - - if not len(active_users) <= self.config.pool_size: - active_users = active_users[:self.config.pool_size] - - self.logger.info('Selecting %s active users' % len(active_users)) - self.logger.debug('Selected active users are: %s' % active_users) - + if len(users['rows']) <= self.config.pool_size: + active_users = [x['key'] for x in users['rows']] + else: + sorted_users = self.factory.loadObject(self.config.algoName, + args=[self.config, + self.logger, + users['rows'], + self.config.pool_size], + getFromCache=False, + listFlag=True) + active_users = sorted_users()[:self.config.pool_size] + self.logger.info('%s active users' % len(active_users)) + self.logger.debug('Active users are: %s' % active_users) return active_users - def active_sites(self): + def active_sites(self): """ Get a list of all sites involved in transfers. """ @@ -175,7 +258,8 @@ def active_sites(self): try: sites = self.db.loadView('AsyncTransfer', 'sites', query) except Exception as e: - self.logger.exception('A problem occured when contacting couchDB: %s' % e) + self.logger.exception('A problem occured \ + when contacting couchDB: %s' % e) return [] def keys_map(inputDict): @@ -196,12 +280,13 @@ def get_tfc_rules(self, site): except Exception as e: self.logger.exception('PhEDEx exception: %s' % e) try: - tfc_file = self.phedex.cacheFileName('tfc', inputdata={'node': site}) + tfc_file = self.phedex.cacheFileName('tfc', + inputdata={'node': site}) except Exception as e: self.logger.exception('PhEDEx cache exception: %s' % e) return readTFC(tfc_file) - def terminate(self, parameters = None): + def terminate(self, parameters=None): """ Called when thread is being terminated. """ diff --git a/src/python/AsyncStageOut/TransferWorker.py b/src/python/AsyncStageOut/TransferWorker.py index 28637fe..4cd4718 100644 --- a/src/python/AsyncStageOut/TransferWorker.py +++ b/src/python/AsyncStageOut/TransferWorker.py @@ -1,12 +1,13 @@ -#!/usr/bin/env +#!/usr/bin/ent: disable=C0103,W0105,broad-except,logging-not-lazy,bad-builtin ''' The TransferWorker does the following: -a. make the ftscp copyjob -b. submit ftscp and watch -c. delete successfully transferred files from the database +a. create REST FTS jobs to be submitted +b. submit FTS jobs and create dashboard report +c. update state in the db (couch or oracle, by aso config flag) +d. create json in dropbox folder for FTS monitor -There should be one worker per user transfer. +There should be one worker per user. ''' import os import re @@ -18,6 +19,7 @@ import StringIO import traceback import subprocess +import itertools from WMCore.WMFactory import WMFactory from WMCore.Database.CMSCouch import CouchServer @@ -29,6 +31,11 @@ from AsyncStageOut import getDNFromUserName from AsyncStageOut import getCommonLogFormatter +from RESTInteractions import HTTPRequests +from ServerUtilities import generateTaskName,\ + PUBLICATIONDB_STATUSES, encodeRequest, oracleOutputMapping + + def execute_command(command, logger, timeout): """ _execute_command_ @@ -40,7 +47,6 @@ def execute_command(command, logger, timeout): stderr=subprocess.PIPE, stdin=subprocess.PIPE,) t_beginning = time.time() - seconds_passed = 0 while True: if proc.poll() is not None: break @@ -52,14 +58,18 @@ def execute_command(command, logger, timeout): time.sleep(0.1) stdout, stderr = proc.communicate() rc = proc.returncode - logger.debug('Executing : \n command : %s\n output : %s\n error: %s\n retcode : %s' % (command, stdout, stderr, rc)) + logger.debug('Executing : \n command : %s\n output : %s\n \ + error: %s\n retcode : %s' % (command, stdout, stderr, rc)) return stdout, rc -class TransferWorker: +class TransferWorker: + """ + Submit user transfers to FTS + """ def __init__(self, user, tfc_map, config): """ - store the user and tfc the worker + store the user transfer info and retrieve user proxy. """ self.user = user[0] self.group = user[1] @@ -106,40 +116,44 @@ def __init__(self, user, tfc_map, config): # Set up a factory for loading plugins self.factory = WMFactory(self.config.pluginDir, namespace=self.config.pluginDir) self.commandTimeout = 1200 - server = CouchServer(dburl=self.config.couch_instance, ckey=self.config.opsProxy, cert=self.config.opsProxy) - self.db = server.connectDatabase(self.config.files_database) - config_server = CouchServer(dburl=self.config.config_couch_instance, ckey=self.config.opsProxy, cert=self.config.opsProxy) - self.config_db = config_server.connectDatabase(self.config.config_database) + if self.config.isOracle: + self.oracleDB = HTTPRequests(self.config.oracleDB, + self.config.opsProxy, + self.config.opsProxy) + else: + server = CouchServer(dburl=self.config.couch_instance, ckey=self.config.opsProxy, cert=self.config.opsProxy) + self.db = server.connectDatabase(self.config.files_database) self.fts_server_for_transfer = getFTServer("T1_UK_RAL", 'getRunningFTSserver', self.config_db, self.logger) - self.cache_area="" + self.cache_area = "" if hasattr(self.config, "cache_area"): self.cache_area = self.config.cache_area - query = {'key':self.user} - try: - self.user_cache_area = self.db.loadView('DBSPublisher', 'cache_area', query)['rows'] - self.cache_area = "https://"+self.user_cache_area[0]['value'][0]+self.user_cache_area[0]['value'][1]+"/filemetadata" - except Exception as ex: - msg = "Error getting user cache_area." - msg += str(ex) - msg += str(traceback.format_exc()) - self.logger.error(msg) - pass + + if not self.config.isOracle: + query = {'key': self.user} + + try: + self.user_cache_area = self.db.loadView('DBSPublisher', 'cache_area', query)['rows'] + self.cache_area = "https://" + self.user_cache_area[0]['value'][0] + \ + self.user_cache_area[0]['value'][1] + "/filemetadata" + except Exception: + self.logger.exception("Error getting user cache_area.") + pass try: defaultDelegation['myproxyAccount'] = re.compile('https?://([^/]*)/.*').findall(self.cache_area)[0] except IndexError: - self.logger.error('MyproxyAccount parameter cannot be retrieved from %s . ' % (self.config.cache_area)) - + self.logger.error('MyproxyAccount parameter cannot be retrieved from %s . ' % self.config.cache_area) if getattr(self.config, 'serviceCert', None): defaultDelegation['server_cert'] = self.config.serviceCert if getattr(self.config, 'serviceKey', None): defaultDelegation['server_key'] = self.config.serviceKey self.valid_proxy = False - self.user_proxy = None + self.user_proxy = self.config.opsProxy try: defaultDelegation['userDN'] = self.userDN defaultDelegation['group'] = self.group defaultDelegation['role'] = self.role + self.logger.debug('delegation: %s' % defaultDelegation ) self.valid_proxy, self.user_proxy = getProxy(defaultDelegation, self.logger) except Exception as ex: msg = "Error getting the user proxy" @@ -149,9 +163,9 @@ def __init__(self, user, tfc_map, config): def __call__(self): """ - a. makes the ftscp copyjob - b. submits ftscp - c. deletes successfully transferred files from the DB + a. makes the RESTFTS job + b. submits FTS + c. update status and create dropbox json """ stdout, stderr, rc = None, None, 99999 fts_url_delegation = self.fts_server_for_transfer.replace('8446', '8443') @@ -163,7 +177,7 @@ def __call__(self): stdout, rc = execute_command(command, self.logger, self.commandTimeout) if not rc or not self.valid_proxy: jobs, jobs_lfn, jobs_pfn, jobs_report = self.files_for_transfer() - self.logger.debug("Processing files for %s " %self.user_proxy) + self.logger.debug("Processing files for %s " % self.user_proxy) if jobs: self.command(jobs, jobs_lfn, jobs_pfn, jobs_report) else: @@ -175,44 +189,99 @@ def source_destinations_by_user(self): """ Get all the destinations for a user """ - query = {'group': True, - 'startkey':[self.user, self.group, self.role], 'endkey':[self.user, self.group, self.role, {}, {}]} - #'stale': 'ok'} - try: - sites = self.db.loadView(self.config.ftscp_design, 'ftscp_all', query) - except: - return [] - def keys_map(dict): - return dict['key'][4], dict['key'][3] - return map(keys_map, sites['rows']) + if self.config.isOracle: + fileDoc = dict() + fileDoc['asoworker'] = self.config.asoworker + fileDoc['subresource'] = 'acquiredTransfers' + fileDoc['grouping'] = 1 + fileDoc['username'] = self.user + result = [] + try: + results = self.oracleDB.get(self.config.oracleFileTrans, + data=encodeRequest(fileDoc)) + result = oracleOutputMapping(results) + res = [[x['source'], x['destination']] for x in result] + res.sort() + res = list(k for k, _ in itertools.groupby(res)) + except Exception as ex: + self.logger.error("Failed to get acquired transfers \ + from oracleDB: %s" %ex) + return res, result + else: + query = {'group': True, + 'startkey':[self.user, self.group, self.role], 'endkey':[self.user, self.group, self.role, {}, {}]} + try: + sites = self.db.loadView(self.config.ftscp_design, 'ftscp_all', query) + except: + return [] + return [[x[4], x[3]] for x in sites['rows']] def files_for_transfer(self): """ Process a queue of work per transfer source:destination for a user. Return one - ftscp copyjob per source:destination. + job per source:destination. """ - source_dests = self.source_destinations_by_user() + if self.config.isOracle: + source_dests, docs = self.source_destinations_by_user() + else: + source_dests = self.source_destinations_by_user() jobs = {} jobs_lfn = {} jobs_pfn = {} jobs_report = {} - failed_files = [] self.logger.info('%s has %s links to transfer on: %s' % (self.user, len(source_dests), str(source_dests))) try: for (source, destination) in source_dests: - # We could push applying the TFC into the list function, not sure if - # this would be faster, but might use up less memory. Probably more - # complicated, though. - query = {'reduce':False, - 'limit': self.config.max_files_per_transfer, - 'key':[self.user, self.group, self.role, destination, source], - 'stale': 'ok'} - try: - active_files = self.db.loadView(self.config.ftscp_design, 'ftscp_all', query)['rows'] - except: - continue - self.logger.debug('%s has %s files to transfer from %s to %s' % (self.user, len(active_files), - source, destination)) + self.logger.info('dest1: %s source: %s' % (docs[0]['destination'],source)) + if self.config.isOracle: + if self.group == '': + group = None + else: + group = self.group + if self.role == '': + role = None + else: + role = self.role + active_docs = [x for x in docs + if x['destination'] == destination + and x['source'] == source + and x['username'] == self.user + and x['user_group'] == group + and x['user_role'] == role + ] + # self.logger.info('%s' % active_docs) + + def map_active(inputdoc): + """ + map active_users + """ + outDict = dict() + outDict['key'] = [inputdoc['username'], + inputdoc['user_group'], + inputdoc['user_role'], + inputdoc['destination'], + inputdoc['source'], + inputdoc['id']] + outDict['value'] = [inputdoc['source_lfn'], inputdoc['destination_lfn']] + return outDict + active_files = [map_active(x) for x in active_docs] + self.logger.debug('%s has %s files to transfer \ + from %s to %s' % (self.user, + len(active_files), + source, + destination)) + else: + query = {'reduce': False, + 'limit': self.config.max_files_per_transfer, + 'key': [self.user, self.group, + self.role, destination, source], + 'stale': 'ok'} + try: + active_files = self.db.loadView(self.config.ftscp_design, 'ftscp_all', query)['rows'] + except: + continue + self.logger.debug('%s has %s files to transfer from %s to %s' % (self.user, len(active_files), + source, destination)) new_job = [] lfn_list = [] pfn_list = [] @@ -223,10 +292,14 @@ def tfc_map(item): self.logger.debug('Preparing PFNs...') source_pfn = self.apply_tfc_to_lfn('%s:%s' % (source, item['value'][0])) destination_pfn = self.apply_tfc_to_lfn('%s:%s' % (destination, item['value'][1])) - self.logger.debug('PFNs prepared...') + self.logger.debug('PFNs prepared... %s %s' %(destination_pfn,source_pfn)) if source_pfn and destination_pfn and self.valid_proxy: - acquired_file, dashboard_report = self.mark_acquired([item]) - self.logger.debug('Files have been marked acquired') + try: + acquired_file, dashboard_report = self.mark_acquired([item]) + self.logger.debug('Files have been marked acquired') + except Exception as ex: + self.logger.error("%s" % ex) + raise if acquired_file: self.logger.debug('Starting FTS Job creation...') # Prepare Monitor metadata @@ -240,7 +313,7 @@ def tfc_map(item): pass else: self.mark_failed([item]) - self.logger.debug('Preparing job...') + #self.logger.debug('Preparing job... %s' % (active_files)) map(tfc_map, active_files) self.logger.debug('Job prepared...') if new_job: @@ -252,7 +325,7 @@ def tfc_map(item): self.logger.debug('ftscp input created for %s (%s jobs)' % (self.user, len(jobs.keys()))) return jobs, jobs_lfn, jobs_pfn, jobs_report - except: + except Exception: self.logger.exception("fail") return jobs, jobs_lfn, jobs_pfn, jobs_report @@ -296,34 +369,33 @@ def command(self, jobs, jobs_lfn, jobs_pfn, jobs_report): Submit the copyjob to the appropriate FTS server Parse the output of the FTS transfer and return complete and failed files for recording """ - # Output: {"userProxyPath":"/path/to/proxy","LFNs":["lfn1","lfn2","lfn3"],"PFNs":["pfn1","pfn2","pfn3"],"FTSJobid":'id-of-fts-job', "username": 'username'} - #Loop through all the jobs for the links we have + # Output: {"userProxyPath":"/path/to/proxy","LFNs":["lfn1","lfn2","lfn3"],"PFNs":["pfn1","pfn2","pfn3"], + # "FTSJobid":'id-of-fts-job', "username": 'username'} + # Loop through all the jobs for the links we have failure_reasons = [] for link, copyjob in jobs.items(): submission_error = False - status_error = False fts_job = {} # Validate copyjob file before doing anything self.logger.debug("Valid %s" % self.validate_copyjob(copyjob)) if not self.validate_copyjob(copyjob): continue rest_copyjob = { - "params":{ - "bring_online": None, - "verify_checksum": False, - "copy_pin_lifetime": -1, - "max_time_in_queue": self.config.max_h_in_queue, - "job_metadata":{"issuer": "ASO"}, - "spacetoken": None, - "source_spacetoken": None, - "fail_nearline": False, - "overwrite": True, - "gridftp": None - }, - "files":[] + "params":{ + "bring_online": None, + "verify_checksum": False, + "copy_pin_lifetime": -1, + "max_time_in_queue": self.config.max_h_in_queue, + "job_metadata": {"issuer": "ASO"}, + "spacetoken": None, + "source_spacetoken": None, + "fail_nearline": False, + "overwrite": True, + "gridftp": None + }, + "files": [] } - pairs = [] for SrcDest in copyjob: tempDict = {"sources": [], "metadata": None, "destinations": []} @@ -331,23 +403,23 @@ def command(self, jobs, jobs_lfn, jobs_pfn, jobs_report): tempDict["destinations"].append(SrcDest.split(" ")[1]) rest_copyjob["files"].append(tempDict) - self.logger.debug("Subbmitting this REST copyjob %s" % rest_copyjob) url = self.fts_server_for_transfer + '/jobs' self.logger.debug("Running FTS submission command") self.logger.debug("FTS server: %s" % self.fts_server_for_transfer) self.logger.debug("link: %s -> %s" % link) - heade = {"Content-Type ":"application/json"} + heade = {"Content-Type ": "application/json"} buf = StringIO.StringIO() try: - connection = RequestHandler(config={'timeout': 300, 'connecttimeout' : 300}) + connection = RequestHandler(config={'timeout': 300, 'connecttimeout': 300}) except Exception as ex: msg = str(ex) msg += str(traceback.format_exc()) self.logger.debug(msg) try: - response, datares = connection.request(url, rest_copyjob, heade, verb='POST', doseq=True, ckey=self.user_proxy, \ - cert=self.user_proxy, capath='/etc/grid-security/certificates', \ + response, datares = connection.request(url, rest_copyjob, heade, verb='POST', + doseq=True, ckey=self.user_proxy, + cert=self.user_proxy, capath='/etc/grid-security/certificates', cainfo=self.user_proxy, verbose=True) self.logger.debug("Submission done") self.logger.debug('Submission header status: %s' % response.status) @@ -381,8 +453,9 @@ def command(self, jobs, jobs_lfn, jobs_pfn, jobs_report): self.logger.debug("Submitting to %s" % file_url) file_buf = StringIO.StringIO() try: - response, files_ = connection.request(file_url, {}, heade, doseq=True, ckey=self.user_proxy, \ - cert=self.user_proxy, capath='/etc/grid-security/certificates', \ + response, files_ = connection.request(file_url, {}, heade, doseq=True, ckey=self.user_proxy, + cert=self.user_proxy, + capath='/etc/grid-security/certificates', cainfo=self.user_proxy, verbose=True) files_res = json.loads(files_) except Exception as ex: @@ -407,7 +480,7 @@ def command(self, jobs, jobs_lfn, jobs_pfn, jobs_report): self.logger.debug("Submission failed") self.logger.info("Mark failed %s files" % len(jobs_lfn[link])) self.logger.debug("Mark failed %s files" % jobs_lfn[link]) - failed_files = self.mark_failed(jobs_lfn[link], force_fail=False, submission_error=True, failure_reasons=failure_reasons) + failed_files = self.mark_failed(jobs_lfn[link], force_fail=False, submission_error=True) self.logger.info("Marked failed %s" % len(failed_files)) continue fts_job['userProxyPath'] = self.user_proxy @@ -424,18 +497,21 @@ def command(self, jobs, jobs_lfn, jobs_pfn, jobs_report): self.logger.debug("%s ready." % fts_job) # Prepare Dashboard report for lfn in fts_job['LFNs']: - lfn_report = {} + lfn_report = dict() lfn_report['FTSJobid'] = fts_job['FTSJobid'] index = fts_job['LFNs'].index(lfn) lfn_report['PFN'] = fts_job['PFNs'][index] lfn_report['FTSFileid'] = fts_job['files_id'][index] lfn_report['Workflow'] = jobs_report[link][index][2] lfn_report['JobVersion'] = jobs_report[link][index][1] - job_id = '%d_https://glidein.cern.ch/%d/%s_%s' % (int(jobs_report[link][index][0]), int(jobs_report[link][index][0]), lfn_report['Workflow'].replace("_", ":"), lfn_report['JobVersion']) + job_id = '%d_https://glidein.cern.ch/%d/%s_%s' % (int(jobs_report[link][index][0]), + int(jobs_report[link][index][0]), + lfn_report['Workflow'].replace("_", ":"), + lfn_report['JobVersion']) lfn_report['JobId'] = job_id lfn_report['URL'] = self.fts_server_for_transfer self.logger.debug("Creating json file %s in %s for FTS3 Dashboard" % (lfn_report, self.dropbox_dir)) - dash_job_file = open('/tmp/DashboardReport/Dashboard.%s.json' % getHashLfn(lfn_report['PFN']), 'w') + dash_job_file = open('/tmp/Dashboard.%s.json' % getHashLfn(lfn_report['PFN']), 'w') jsondata = json.dumps(lfn_report) dash_job_file.write(jsondata) dash_job_file.close() @@ -456,42 +532,76 @@ def mark_acquired(self, files=[]): """ lfn_in_transfer = [] dash_rep = () - for lfn in files: - if lfn['value'][0].find('temp') == 7: - docId = getHashLfn(lfn['value'][0]) - self.logger.debug("Marking acquired %s" % docId) - # Load document to get the retry_count - try: - document = self.db.document(docId) - except Exception as ex: - msg = "Error loading document from couch" - msg += str(ex) - msg += str(traceback.format_exc()) - self.logger.error(msg) - continue - if (document['state'] == 'new' or document['state'] == 'retry'): - data = {} - data['state'] = 'acquired' - data['last_update'] = time.time() - updateUri = "/" + self.db.name + "/_design/AsyncTransfer/_update/updateJobs/" + docId - updateUri += "?" + urllib.urlencode(data) + if self.config.isOracle: + toUpdate = list() + for lfn in files: + if lfn['value'][0].find('temp') == 7: + self.logger.debug("Marking acquired %s" % lfn) + docId = lfn['key'][5] + self.logger.debug("Marking acquired %s" % docId) + toUpdate.append(docId) try: - self.db.makeRequest(uri=updateUri, type="PUT", decode=False) + docbyId = self.oracleDB.get(self.config.oracleFileTrans.replace('filetransfers','fileusertransfers'), + data=encodeRequest({'subresource': 'getById', 'id': docId})) + document = oracleOutputMapping(docbyId, None)[0] except Exception as ex: - msg = "Error updating document in couch" + self.logger.error("Error during dashboard report update: %s" %ex) + + lfn_in_transfer.append(lfn) + dash_rep = (document['jobid'], document['job_retry_count'], document['taskname']) + + try: + fileDoc = dict() + fileDoc['asoworker'] = self.config.asoworker + fileDoc['subresource'] = 'updateTransfers' + fileDoc['list_of_ids'] = toUpdate + fileDoc['list_of_transfer_state'] = ["SUBMITTED" for x in toUpdate] + + result = self.oracleDB.post(self.config.oracleFileTrans, + data=encodeRequest(fileDoc)) + except Exception as ex: + self.logger.error("Error during status update: %s" %ex) + + self.logger.debug("Marked acquired %s of %s" % (docId, lfn)) + # TODO: no need of mark good right? the postjob should updated the status in case of direct stageout I think + return lfn_in_transfer, dash_rep + else: + for lfn in files: + if lfn['value'][0].find('temp') == 7: + docId = getHashLfn(lfn['value'][0]) + self.logger.debug("Marking acquired %s" % docId) + # Load document to get the retry_count + try: + document = self.db.document(docId) + except Exception as ex: + msg = "Error loading document from couch" msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(msg) continue - self.logger.debug("Marked acquired %s of %s" % (docId, lfn)) - lfn_in_transfer.append(lfn) - dash_rep = (document['jobid'], document['job_retry_count'], document['workflow']) + if document['state'] == 'new' or document['state'] == 'retry': + data = dict() + data['state'] = 'acquired' + data['last_update'] = time.time() + updateUri = "/" + self.db.name + "/_design/AsyncTransfer/_update/updateJobs/" + docId + updateUri += "?" + urllib.urlencode(data) + try: + self.db.makeRequest(uri=updateUri, type="PUT", decode=False) + except Exception as ex: + msg = "Error updating document in couch" + msg += str(ex) + msg += str(traceback.format_exc()) + self.logger.error(msg) + continue + self.logger.debug("Marked acquired %s of %s" % (docId, lfn)) + lfn_in_transfer.append(lfn) + dash_rep = (document['jobid'], document['job_retry_count'], document['workflow']) + else: + continue else: - continue - else: - good_lfn = lfn['value'][0].replace('store', 'store/temp', 1) - self.mark_good([good_lfn]) - return lfn_in_transfer, dash_rep + good_lfn = lfn['value'][0].replace('store', 'store/temp', 1) + self.mark_good([good_lfn]) + return lfn_in_transfer, dash_rep def mark_good(self, files=[]): """ @@ -511,7 +621,7 @@ def mark_good(self, files=[]): try: now = str(datetime.datetime.now()) last_update = time.time() - data = {} + data = dict() data['end_time'] = now data['state'] = 'done' data['lfn'] = outputLfn @@ -535,7 +645,7 @@ def mark_good(self, files=[]): continue self.logger.debug("transferred file updated") - def mark_failed(self, files=[], force_fail=False, submission_error=False, failure_reasons=[]): + def mark_failed(self, files=[], force_fail=False, submission_error=False): """ Something failed for these files so increment the retry count """ @@ -553,60 +663,104 @@ def mark_failed(self, files=[], force_fail=False, submission_error=False, failur else: temp_lfn = lfn['value'][0] - docId = getHashLfn(temp_lfn) + # Load document and get the retry_count + if self.config.isOracle: + docId = getHashLfn(temp_lfn) + self.logger.debug("Marking failed %s" % docId) + try: + docbyId = self.oracleDB.get(self.config.oracleFileTrans, + data=encodeRequest({'subresource': 'getById', 'id': docId})) + except Exception as ex: + self.logger.error("Error updating failed docs: %s" %ex) + continue + document = oracleOutputMapping(docbyId, None)[0] + self.logger.debug("Document: %s" % document) - # Load document to get the retry_count - try: - document = self.db.document(docId) - except Exception as ex: - msg = "Error loading document from couch" - msg += str(ex) - msg += str(traceback.format_exc()) - self.logger.error(msg) - continue - if document['state'] != 'killed' and document['state'] != 'done' and document['state'] != 'failed': - now = str(datetime.datetime.now()) - last_update = time.time() - # Prepare data to update the document in couch - if force_fail or len(document['retry_count']) + 1 > self.max_retry: - data['state'] = 'failed' + fileDoc = dict() + fileDoc['asoworker'] = self.config.asoworker + fileDoc['subresource'] = 'updateTransfers' + fileDoc['list_of_ids'] = docId + + if force_fail or document['transfer_retry_count'] + 1 > self.max_retry: + fileDoc['list_of_transfer_state'] = 'FAILED' + fileDoc['list_of_retry_value'] = 1 else: - data['state'] = 'retry' + fileDoc['list_of_transfer_state'] = 'RETRY' if submission_error: - data['failure_reason'] = "Job could not be submitted to FTS: temporary problem of FTS" + fileDoc['list_of_failure_reason'] = "Job could not be submitted to FTS: temporary problem of FTS" + fileDoc['list_of_retry_value'] = 1 elif not self.valid_proxy: - data['failure_reason'] = "Job could not be submitted to FTS: user's proxy expired" + fileDoc['list_of_failure_reason'] = "Job could not be submitted to FTS: user's proxy expired" + fileDoc['list_of_retry_value'] = 1 else: - data['failure_reason'] = "Site config problem." - data['last_update'] = last_update - data['retry'] = now + fileDoc['list_of_failure_reason'] = "Site config problem." + fileDoc['list_of_retry_value'] = 1 - # Update the document in couch - self.logger.debug("Marking failed %s" % docId) + self.logger.debug("update: %s" % fileDoc) try: - updateUri = "/" + self.db.name + "/_design/AsyncTransfer/_update/updateJobs/" + docId - updateUri += "?" + urllib.urlencode(data) - self.db.makeRequest(uri=updateUri, type="PUT", decode=False) updated_lfn.append(docId) - self.logger.debug("Marked failed %s" % docId) + result = self.oracleDB.post(self.config.oracleFileTrans, + data=encodeRequest(fileDoc)) except Exception as ex: - msg = "Error in updating document in couch" + msg = "Error updating document" msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(msg) continue + + else: + docId = getHashLfn(temp_lfn) try: - self.db.commit() + document = self.db.document(docId) except Exception as ex: - msg = "Error commiting documents in couch" + msg = "Error loading document from couch" msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(msg) continue - self.logger.debug("failed file updated") - return updated_lfn + if document['state'] != 'killed' and document['state'] != 'done' and document['state'] != 'failed': + now = str(datetime.datetime.now()) + last_update = time.time() + # Prepare data to update the document in couch + if force_fail or len(document['retry_count']) + 1 > self.max_retry: + data['state'] = 'failed' + else: + data['state'] = 'retry' + if submission_error: + data['failure_reason'] = "Job could not be submitted to FTS: temporary problem of FTS" + elif not self.valid_proxy: + data['failure_reason'] = "Job could not be submitted to FTS: user's proxy expired" + else: + data['failure_reason'] = "Site config problem." + data['last_update'] = last_update + data['retry'] = now + + # Update the document in couch + self.logger.debug("Marking failed %s" % docId) + try: + updateUri = "/" + self.db.name + "/_design/AsyncTransfer/_update/updateJobs/" + docId + updateUri += "?" + urllib.urlencode(data) + self.db.makeRequest(uri=updateUri, type="PUT", decode=False) + updated_lfn.append(docId) + self.logger.debug("Marked failed %s" % docId) + except Exception as ex: + msg = "Error in updating document in couch" + msg += str(ex) + msg += str(traceback.format_exc()) + self.logger.error(msg) + continue + try: + self.db.commit() + except Exception as ex: + msg = "Error commiting documents in couch" + msg += str(ex) + msg += str(traceback.format_exc()) + self.logger.error(msg) + continue + self.logger.debug("failed file updated") + return updated_lfn - def mark_incomplete(self, files=[]): + def mark_incomplete(self): """ Mark the list of files as acquired """ diff --git a/src/python/test.py b/src/python/test.py new file mode 100644 index 0000000..3ebe3eb --- /dev/null +++ b/src/python/test.py @@ -0,0 +1,65 @@ +""" +asoOracle test API +""" +#pylint: disable=C0103,W0105 +#!/usr/bin/env python + +# -*- coding: utf-8 -*- +from __future__ import print_function +from __future__ import division + + +from RESTInteractions import HTTPRequests +from ServerUtilities import encodeRequest, oracleOutputMapping + +server = HTTPRequests('cmsweb-testbed.cern.ch', + '/data/srv/asyncstageout/state/asyncstageout/creds/OpsProxy', + '/data/srv/asyncstageout/state/asyncstageout/creds/OpsProxy') + +""" +fileDoc = {} +fileDoc['asoworker'] = 'asodciangot1' +fileDoc['subresource'] = 'acquireTransfers' + +result = server.post('/crabserver/dev/filetransfers', + data=encodeRequest(fileDoc)) + + +print(result) + +fileDoc = {} +fileDoc['asoworker'] = 'asodciangot1' +fileDoc['subresource'] = 'acquiredTransfers' +fileDoc['grouping'] = 0 + +result = server.get('/crabserver/dev/filetransfers', + data=encodeRequest(fileDoc)) + +#print(oracleOutputMapping(result)) + +ids = [str(x['id']) for x in oracleOutputMapping(result)] + +fileDoc = {} +fileDoc['subresource'] = 'groupedTransferStatistics' +fileDoc['grouping'] = 0 + +result = server.get('/crabserver/dev/filetransfers', + data=encodeRequest(fileDoc)) +""" +#print (oracleOutputMapping(result)) +fileDoc = {} +fileDoc['asoworker'] = 'asodciangot1' +fileDoc['subresource'] = 'updateTransfers' +fileDoc['list_of_ids'] = '64856469f4602d45c26a23bc6d3b94c3d5f47ba5143ddf84f8b3c1e4' +fileDoc['list_of_transfer_state'] = 4 +fileDoc['retry_value'] = 1 +fileDoc['fail_reason'] = 'fail_reason' +#print(encodeRequest(fileDoc)) + +fileDoc = {'list_of_publication_state': 'FAILED', 'subresource': 'updatePublication', 'list_of_ids': '9327a427210deb30d5407500e8380ad6f8950999bc0facb51c00f343', 'asoworker': 'asodciangot1', 'list_of_failure_reason': 'Publication description files not found! Will force publication failure. Workflow 160804_141528:jmsilva_crab_HG1608a-rc1-MinBias_PrivateMC_EventBased-L-T_O-T_P-T_IL-F expired since 9h:40m:4s!', 'list_of_retry_value': 1} + +result = server.post('/crabserver/preprod/filetransfers', + data=encodeRequest(fileDoc)) +print(result) + +