diff --git a/src/python/AsyncStageOut/PublisherDaemon.py b/src/python/AsyncStageOut/PublisherDaemon.py index d9e4ef1..fe6f4c6 100644 --- a/src/python/AsyncStageOut/PublisherDaemon.py +++ b/src/python/AsyncStageOut/PublisherDaemon.py @@ -67,16 +67,18 @@ def __init__(self, config): server = CouchServer(dburl=self.config.couch_instance, ckey=self.config.opsProxy, cert=self.config.opsProxy) - self.db = server.connectDatabase(self.config.files_database) self.logger.debug('Connected to CouchDB') # Set up a factory for loading plugins self.factory = WMFactory(self.config.schedAlgoDir, namespace=self.config.schedAlgoDir) self.pool = Pool(processes=self.config.publication_pool_size) - self.oracleDB = HTTPRequests(self.config.oracleDB, - self.config.opsProxy, - self.config.opsProxy) + if self.config.isOracle: + self.oracleDB = HTTPRequests(self.config.oracleDB, + self.config.opsProxy, + self.config.opsProxy) + else: + self.db = server.connectDatabase(self.config.files_database) def algorithm(self, parameters=None): """ @@ -84,10 +86,10 @@ def algorithm(self, parameters=None): 2. For each user get a suitably sized input for publish 3. Submit the publish to a subprocess """ - if self.config.isOracle: - users = self.active_users(self.oracleDB) - else: - users = self.active_users(self.db) + if self.config.isOracle: + users = self.active_users(self.oracleDB) + else: + users = self.active_users(self.db) self.logger.debug('kicking off pool %s' %users) for u in users: self.logger.debug('current_running %s' %current_running) @@ -131,14 +133,15 @@ def active_users(self, db): try: results = db.get(self.config.oracleFileTrans, data=encodeRequest(fileDoc)) + result = oracleOutputMapping(results) except Exception as ex: self.logger.error("Failed to acquire publications \ from oracleDB: %s" %ex) - result = oracleOutputMapping(results) + self.logger.debug("%s acquired puclications retrieved" % len(result)) #TODO: join query for publisher (same of submitter) unique_users = [list(i) for i in set(tuple([x['username'], x['user_group'], x['user_role']]) for x in result - if x['transfer_state']==3)] + if x['transfer_state'] == 3)] return unique_users else: # TODO: Remove stale=ok for now until tested diff --git a/src/python/AsyncStageOut/PublisherWorker.py b/src/python/AsyncStageOut/PublisherWorker.py index fc8cb6c..f4f442b 100644 --- a/src/python/AsyncStageOut/PublisherWorker.py +++ b/src/python/AsyncStageOut/PublisherWorker.py @@ -36,7 +36,7 @@ from AsyncStageOut import getCommonLogFormatter from RESTInteractions import HTTPRequests -from ServerUtilities import getHashLfn, PUBLICATIONDB_STATUSES, encodeRequest, oracleOutputMapping +from ServerUtilities import getColumn, getHashLfn, PUBLICATIONDB_STATUSES, encodeRequest, oracleOutputMapping class PublisherWorker: """ @@ -94,13 +94,7 @@ def __init__(self, user, config): 'uisource': self.uiSetupScript } # If we're just testing publication, we skip the DB connection. - if os.getenv("TEST_ASO"): - self.db = None - else: - server = CouchServer(dburl=self.config.couch_instance, - ckey=self.config.opsProxy, - cert=self.config.opsProxy) - self.db = server.connectDatabase(self.config.files_database) + if hasattr(self.config, "cache_area"): try: getCache = re.compile('https?://([^/]*)/.*') @@ -156,6 +150,20 @@ def __init__(self, user, config): self.logger.error('Did not get valid proxy. Setting proxy to ops proxy') self.userProxy = self.config.opsProxy # self.cache_area = self.config.cache_area + if os.getenv("TEST_ASO"): + self.db = None + elif not self.config.isOracle: + server = CouchServer(dburl=self.config.couch_instance, + ckey=self.config.opsProxy, + cert=self.config.opsProxy) + self.db = server.connectDatabase(self.config.files_database) + else: + self.oracleDB = HTTPRequests(self.config.oracleDB, + self.config.opsProxy, + self.config.opsProxy) + self.oracleDB_user = HTTPRequests(self.config.oracleDB, + self.userProxy, + self.userProxy) self.phedexApi = PhEDEx(responseType='json') self.max_files_per_block = max(1, self.config.max_files_per_block) self.block_publication_timeout = self.config.block_closure_timeout @@ -181,10 +189,6 @@ def __init__(self, user, config): msg += str(traceback.format_exc()) self.logger.debug(msg) - self.oracleDB = HTTPRequests(self.config.oracleDB, - self.config.opsProxy, - self.config.opsProxy) - def __call__(self): """ 1- check the nubmer of files in wf to publish if it is < max_files_per_block @@ -247,6 +251,7 @@ def __call__(self): ]} for x in toPub_docs if x['transfer_state']==3 and x['publication_state'] not in [2,3,5]] # self.logger.debug("active_user_workflows: %s" %active_user_workflows) + workflow = '' for user_wf in unique_user_workflows: workflow = str(user_wf['key'][3]) wfnamemsg = "%s: " % (workflow) @@ -363,33 +368,36 @@ def __call__(self): cert=self.userProxy )# , verbose=True) # for debug except Exception as ex: - msg = "Error retrieving status from cache. Fall back to user cache area" - msg += str(ex) - msg += str(traceback.format_exc()) - self.logger.error(wfnamemsg+msg) - query = {'key': self.user} - try: - self.user_cache_area = self.db.loadView('DBSPublisher', 'cache_area', query)['rows'] - except Exception as ex: - msg = "Error getting user cache_area" + if self.config.isOracle: + self.logger.exception('Error retrieving status from cache.') + else: + msg = "Error retrieving status from cache. Fall back to user cache area" msg += str(ex) msg += str(traceback.format_exc()) - self.logger.error(msg) + self.logger.error(wfnamemsg+msg) + query = {'key': self.user} + try: + self.user_cache_area = self.db.loadView('DBSPublisher', 'cache_area', query)['rows'] + except Exception as ex: + msg = "Error getting user cache_area" + msg += str(ex) + msg += str(traceback.format_exc()) + self.logger.error(msg) - self.cache_area = self.user_cache_area[0]['value'][0]+self.user_cache_area[0]['value'][1]+'/filemetadata' - try: - _, res_ = self.connection.request(url, - data, - header, - doseq=True, - ckey=self.userProxy, - cert=self.userProxy + self.cache_area = self.user_cache_area[0]['value'][0]+self.user_cache_area[0]['value'][1]+'/filemetadata' + try: + _, res_ = self.connection.request(url, + data, + header, + doseq=True, + ckey=self.userProxy, + cert=self.userProxy )#, verbose=True)# for debug - except Exception as ex: - msg = "Error retrieving status from user cache area." - msg += str(ex) - msg += str(traceback.format_exc()) - self.logger.error(wfnamemsg+msg) + except Exception as ex: + msg = "Error retrieving status from user cache area." + msg += str(ex) + msg += str(traceback.format_exc()) + self.logger.error(wfnamemsg+msg) msg = "Status retrieved from cache. Loading task status." self.logger.info(wfnamemsg+msg) @@ -423,41 +431,58 @@ def __call__(self): # Get when was the last time a publication was done for this workflow (this # should be more or less independent of the output dataset in case there are # more than one). - query = {'reduce': True, 'key': user_wf['key']} - try: - last_publication_time = self.db.loadView('DBSPublisher', 'last_publication', query)['rows'] - except Exception as ex: - msg = "Cannot get last publication time for %s: %s" % (user_wf['key'], ex) - self.logger.error(wfnamemsg+msg) + last_publication_time = None + if not self.config.isOracle: + query = {'reduce': True, 'key': user_wf['key']} + try: + last_publication_time = self.db.loadView('DBSPublisher', 'last_publication', query)['rows'] + except Exception as ex: + msg = "Cannot get last publication time for %s: %s" % (user_wf['key'], ex) + self.logger.error(wfnamemsg+msg) + else: + data['workflow'] = workflow + data['subresource'] = 'search' + try: + result = self.oracleDB.get(self.config.oracleFileTrans.replace('filetransfers','task'), + data=encodeRequest(data)) + self.logger.debug("task: %s " % str(result[0])) + self.logger.debug("task: %s " % getColumn(result[0],'tm_last_publication')) + except Exception as ex: + self.logger.error("Error during task doc retrieving: %s" %ex) + if last_publication_time: + date = oracleOutputMapping(result)['last_publication'] + seconds = datetime.strptime(date, "%Y-%m-%d %H:%M:%S.%f").timetuple() + last_publication_time = time.mktime(seconds) + + msg = "Last publication time: %s." % str(last_publication_time) + self.logger.debug(wfnamemsg+msg) + # If this is the first time a publication would be done for this workflow, go + # ahead and publish. + if not last_publication_time: + self.force_publication = True + msg = "There was no previous publication. Will force publication." + self.logger.info(wfnamemsg+msg) + # Otherwise... else: - msg = "Last publication time: %s." % (last_publication_time) + last = last_publication_time + msg = "Last published block: %s" % (last) self.logger.debug(wfnamemsg+msg) - # If this is the first time a publication would be done for this workflow, go - # ahead and publish. - if not last_publication_time: + # If the last publication was long time ago (> our block publication timeout), + # go ahead and publish. + time_since_last_publication = now - last + hours = int(time_since_last_publication/60/60) + minutes = int((time_since_last_publication - hours*60*60)/60) + timeout_hours = int(self.block_publication_timeout/60/60) + timeout_minutes = int((self.block_publication_timeout - timeout_hours*60*60)/60) + msg = "Last publication was %sh:%sm ago" % (hours, minutes) + if time_since_last_publication > self.block_publication_timeout: self.force_publication = True - msg = "There was no previous publication. Will force publication." - self.logger.info(wfnamemsg+msg) - # Otherwise... + msg += " (more than the timeout of %sh:%sm)." % (timeout_hours, timeout_minutes) + msg += " Will force publication." else: - msg = "Last published block: %s" % (last_publication_time[0]['value']['max']) - self.logger.debug(wfnamemsg+msg) - # If the last publication was long time ago (> our block publication timeout), - # go ahead and publish. - time_since_last_publication = now - last_publication_time[0]['value']['max'] - hours = int(time_since_last_publication/60/60) - minutes = int((time_since_last_publication - hours*60*60)/60) - timeout_hours = int(self.block_publication_timeout/60/60) - timeout_minutes = int((self.block_publication_timeout - timeout_hours*60*60)/60) - msg = "Last publication was %sh:%sm ago" % (hours, minutes) - if time_since_last_publication > self.block_publication_timeout: - self.force_publication = True - msg += " (more than the timeout of %sh:%sm)." % (timeout_hours, timeout_minutes) - msg += " Will force publication." - else: - msg += " (less than the timeout of %sh:%sm)." % (timeout_hours, timeout_minutes) - msg += " Not enough to force publication." - self.logger.info(wfnamemsg+msg) + msg += " (less than the timeout of %sh:%sm)." % (timeout_hours, timeout_minutes) + msg += " Not enough to force publication." + self.logger.info(wfnamemsg+msg) # Call the publish method with the lists of ready files to publish for this # workflow grouped by datasets. result = self.publish(workflow, input_dataset, input_dbs_url, pnn, lfn_ready) @@ -471,9 +496,19 @@ def __call__(self): force_failure = result[dataset].get('force_failure', False) self.mark_failed(workflow, failed_files, failure_reason, force_failure) - self.logger.info("Publications for user %s (group: %s, role: %s) completed." % (self.user, - self.group, - self.role)) + try: + # TODO: update last publication time db task updatepublicationtime,workflow, + self.logger.debug("Updating last publication type for: %s " % workflow) + data['workflow'] = workflow + data['subresource'] = 'updatepublicationtime' + result = self.oracleDB_user.get(self.config.oracleFileTrans.replace('filetransfers','task'), + data=encodeRequest(data)) + self.logger.debug("%s last publication type update: %s " % (workflow,str(result))) + self.logger.info("Publications for user %s (group: %s, role: %s) completed." % (self.user, + self.group, + self.role)) + except Exception as ex: + self.logger.error("Error during task doc retrieving: %s" %ex) def mark_good(self, workflow, files): """ diff --git a/src/python/AsyncStageOut/ReporterWorker.py b/src/python/AsyncStageOut/ReporterWorker.py index 8394b99..a7b2f2a 100644 --- a/src/python/AsyncStageOut/ReporterWorker.py +++ b/src/python/AsyncStageOut/ReporterWorker.py @@ -96,7 +96,6 @@ def __init__(self, user, config): """ self.user = user self.config = config - self.kibana_file = open(self.config.kibana_dir+"/"+self.user+"/"+"ended.json", 'a') self.dropbox_dir = '%s/dropbox/inputs' % self.config.componentDir logging.basicConfig(level=config.log_level) self.site_tfc_map = {} @@ -233,13 +232,6 @@ def __call__(self): self.logger.debug('Marking failed %s %s' %(failed_lfns, failure_reason)) updated_failed_lfns = self.mark_failed(failed_lfns, failure_reason) - try: - self.kibana_file.write(str(time.time())+"\tFailed:\t"+str(len(updated_failed_lfns))+"\n") - except Exception as ex: - self.logger.error(ex) - - if len(updated_failed_lfns) != len(failed_lfns): - remove_failed = False if 'Done' or 'FINISHED' in json_data['transferStatus']: # Sort good files @@ -249,20 +241,14 @@ def __call__(self): for i in good_indexes: good_lfns.append(json_data['LFNs'][i]) self.logger.info('Marking good %s' %(good_lfns)) - updated_good_lfns = self.mark_good(good_lfns) - try: - self.kibana_file.write(str(time.time())+"\tFailed:\t"+str(len(updated_good_lfns))+"\n") - except Exception as ex: - self.logger.error(ex) + updated_good_lfns = self.mark_good(good_lfns) + except: + self.logger.exception('Either no files to mark or failed to update state') - if len(updated_good_lfns) != len(good_lfns): - remove_good = False - - if remove_good and remove_failed: - # Remove the json file - self.logger.debug('Removing %s' % input_file) - os.unlink( input_file ) + # Remove the json file + self.logger.debug('Removing %s' % input_file) + os.unlink( input_file ) else: self.logger.info('Empty file %s' % input_file) @@ -290,6 +276,8 @@ def mark_good(self, files): """ updated_lfn = [] good_ids = [] + if len(files) == 0: + return updated_lfn for it, lfn in enumerate(files): hash_lfn = getHashLfn(lfn) self.logger.info("Marking good %s" % hash_lfn) @@ -340,35 +328,37 @@ def mark_good(self, files): msg += str(traceback.format_exc()) self.logger.error(msg) continue - if self.config.isOracle: - try: - data = dict() - data['asoworker'] = self.config.asoworker - data['subresource'] = 'updateTransfers' - data['list_of_ids'] = good_ids - data['list_of_transfer_state'] = ["DONE" for x in good_ids] - result = self.oracleDB.post(self.config.oracleFileTrans, - data=encodeRequest(data)) - self.logger.debug("Marked good %s" % good_ids) - except Exception: - self.logger.exeption('Error updating document') - - self.logger.info("Transferred file %s updated, removing now source file" %docId) + if self.config.isOracle: try: - docbyId = self.oracleDB.get(self.config.oracleFileTrans.replace('filetransfers','fileusertransfers'), - data=encodeRequest({'subresource': 'getById', 'id': docId})) - document = oracleOutputMapping(docbyId, None)[0] + data = dict() + data['asoworker'] = self.config.asoworker + data['subresource'] = 'updateTransfers' + data['list_of_ids'] = good_ids + data['list_of_transfer_state'] = ["DONE" for x in good_ids] + result = self.oracleDB.post(self.config.oracleFileTrans, + data=encodeRequest(data)) + self.logger.debug("Marked good %s" % good_ids) except Exception: - msg = "Error getting file from source" - self.logger.exception(msg) - raise - if document["source"] not in self.site_tfc_map: - self.logger.debug("site not found... gathering info from phedex") - self.site_tfc_map[document["source"]] = self.get_tfc_rules(document["source"]) - pfn = self.apply_tfc_to_lfn( '%s:%s' %(document["source"], lfn)) - self.logger.debug("File has to be removed now from source site: %s" %pfn) - self.remove_files(self.userProxy, pfn) - self.logger.debug("Transferred file removed from source") + self.logger.exception('Error updating document') + return {} + + self.logger.info("Transferred file %s updated, removing now source file" %docId) + try: + docbyId = self.oracleDB.get(self.config.oracleFileTrans.replace('filetransfers','fileusertransfers'), + data=encodeRequest({'subresource': 'getById', 'id': docId})) + document = oracleOutputMapping(docbyId, None)[0] + except Exception: + msg = "Error getting file from source" + self.logger.exception(msg) + return {} + + if document["source"] not in self.site_tfc_map: + self.logger.debug("site not found... gathering info from phedex") + self.site_tfc_map[document["source"]] = self.get_tfc_rules(document["source"]) + pfn = self.apply_tfc_to_lfn( '%s:%s' %(document["source"], lfn)) + self.logger.debug("File has to be removed now from source site: %s" %pfn) + self.remove_files(self.userProxy, pfn) + self.logger.debug("Transferred file removed from source") return updated_lfn def remove_files(self, userProxy, pfn): @@ -380,12 +370,11 @@ def remove_files(self, userProxy, pfn): rc, stdout, stderr = execute_command(command, self.logger, 3600) except Exception as ex: self.logger.error(ex) - raise if rc: logging.info("Deletion command failed with output %s and error %s" %(stdout, stderr)) else: logging.info("File Deleted.") - return + return def get_tfc_rules(self, site): """ @@ -468,13 +457,15 @@ def mark_failed(self, files=[], failures_reasons=[], force_fail=False): fatal_error = self.determine_fatal_error(failures_reasons[files.index(lfn)]) if fatal_error: data['list_of_transfer_state'] = 'FAILED' + data['list_of_failure_reason'] = failures_reasons[files.index(lfn)] data['list_of_retry_value'] = 0 self.logger.debug("update: %s" % data) result = self.oracleDB.post(self.config.oracleFileTrans, data=encodeRequest(data)) - updated_lfn.append(lfn) + if not data['list_of_transfer_state'] == 'RETRY': + updated_lfn.append(lfn) self.logger.debug("Marked failed %s" % lfn) except Exception as ex: self.logger.error("Error updating document status: %s" %ex) diff --git a/src/python/AsyncStageOut/TransferDaemon.py b/src/python/AsyncStageOut/TransferDaemon.py index c8e3c86..f113987 100644 --- a/src/python/AsyncStageOut/TransferDaemon.py +++ b/src/python/AsyncStageOut/TransferDaemon.py @@ -13,6 +13,7 @@ import os import logging from multiprocessing import Pool +import json from RESTInteractions import HTTPRequests from ServerUtilities import encodeRequest, oracleOutputMapping @@ -110,28 +111,43 @@ def __init__(self, config): self.db = server.connectDatabase(self.config.files_database) self.logger.debug('Connected to CouchDB') self.pool = Pool(processes=self.config.pool_size) + self.factory = WMFactory(self.config.schedAlgoDir, + namespace=self.config.schedAlgoDir) + + self.site_tfc_map = {} try: self.phedex = PhEDEx(responseType='xml', dict={'key':self.config.opsProxy, 'cert':self.config.opsProxy}) except Exception as e: self.logger.exception('PhEDEx exception: %s' % e) - # Set up a factory for loading plugins - self.factory = WMFactory(self.config.schedAlgoDir, - namespace=self.config.schedAlgoDir) + # TODO: decode xml + try: + self.phedex2 = PhEDEx(responseType='json', + dict={'key':self.config.opsProxy, + 'cert':self.config.opsProxy}) + except Exception as e: + self.logger.exception('PhEDEx exception: %s' % e) + + self.logger.debug(type((self.phedex2.getNodeMap())['phedex']['node'])) + for site in [x['name'] for x in self.phedex2.getNodeMap()['phedex']['node']]: + if site and str(site) != 'None' and str(site) != 'unknown': + self.site_tfc_map[site] = self.get_tfc_rules(site) + self.logger.debug('tfc site: %s %s' % (site, self.get_tfc_rules(site))) + # Over riding setup() is optional, and not needed here def algorithm(self, parameters=None): """ - 1 Get transfer config from couchdb config instance - 2. Get a list of users with files to transfer from the db instance + 1 Get transfer config from couchdb config instance + 2. Get a list of users with files to transfer from the db instance (oracle or couch, by config flag) 3. For each user get a suitably sized input for submission (call to a list) 4. Submit to a subprocess """ if self.config.isOracle: - sites, users = self.oracleSiteUser(self.oracleDB) + users = self.oracleSiteUser(self.oracleDB) else: users = self.active_users(self.db) @@ -139,19 +155,19 @@ def algorithm(self, parameters=None): self.logger.info('%s active sites' % len(sites)) self.logger.debug('Active sites are: %s' % sites) - site_tfc_map = {} - for site in sites: - if site and str(site) != 'None' and str(site) != 'unknown': - site_tfc_map[site] = self.get_tfc_rules(site) - self.logger.debug('tfc site: %s %s' % (site, self.get_tfc_rules(site))) self.logger.debug('kicking off pool') for u in users: + for i in range(len(u)): + if not u[i]: + u[i] = '' + self.logger.debug('current_running %s' % current_running) + self.logger.debug('BBBBBB: %s %s %s' % (u, current_running, (u not in current_running))) if u not in current_running: self.logger.debug('processing %s' % u) current_running.append(u) self.logger.debug('processing %s' % current_running) - self.pool.apply_async(ftscp, (u, site_tfc_map, self.config), + self.pool.apply_async(ftscp, (u, self.site_tfc_map, self.config), callback=log_result) def oracleSiteUser(self, db): @@ -159,67 +175,63 @@ def oracleSiteUser(self, db): 1. Acquire transfers from DB 2. Get acquired users and destination sites """ + + self.logger.info('Retrieving users...') fileDoc = dict() + fileDoc['subresource'] = 'activeUsers' + fileDoc['grouping'] = 0 fileDoc['asoworker'] = self.config.asoworker - fileDoc['subresource'] = 'acquireTransfers' - - self.logger.debug("Retrieving transfers from oracleDB") + result = dict() try: - result = db.post(self.config.oracleFileTrans, + result = db.get(self.config.oracleFileTrans, data=encodeRequest(fileDoc)) except Exception as ex: self.logger.error("Failed to acquire transfers \ - from oracleDB: %s" %ex) - pass - - self.doc_acq = str(result) + from oracleDB: %s" % ex) + + self.logger.debug(oracleOutputMapping(result)) + # TODO: translate result into list((user,group,role),...) + if len(oracleOutputMapping(result)) != 0: + self.logger.debug(type( [[x['username'].encode('ascii','ignore'), x['user_group'], x['user_role']] for x in oracleOutputMapping(result)])) + try: + docs = oracleOutputMapping(result) + users = [[x['username'], x['user_group'], x['user_role']] for x in docs] + self.logger.info('Users to process: %s' % str(users)) + except: + self.logger.exception('User data malformed. ') + else: + self.logger.info('No new user to acquire') + return [] - fileDoc = dict() - fileDoc['asoworker'] = self.config.asoworker - fileDoc['subresource'] = 'acquiredTransfers' - fileDoc['grouping'] = 0 + actives = list() + for user in users: + fileDoc = dict() + fileDoc['asoworker'] = self.config.asoworker + fileDoc['subresource'] = 'acquireTransfers' + fileDoc['username'] = user[0] - self.logger.debug("Retrieving users from oracleDB") + self.logger.debug("Retrieving transfers from oracleDB for user: %s " % user[0]) - try: - results = db.get(self.config.oracleFileTrans, - data=encodeRequest(fileDoc)) - except Exception: - self.logger.exception("Failed to get acquired transfers \ - from oracleDB.") - results = None - pass - - documents = oracleOutputMapping(results) - - for doc in documents: - if doc['user_role'] is None: - doc['user_role'] = "" - if doc['user_group'] is None: - doc['user_group'] = "" - - unique_users = [] - try: - unique_users = [list(i) for i in set(tuple([x['username'], - x['user_group'], - x['user_role']]) for x in documents)] - except Exception as ex: - self.logger.error("Failed to map active users: %s" %ex) + try: + result = db.post(self.config.oracleFileTrans, + data=encodeRequest(fileDoc)) + except Exception as ex: + self.logger.error("Failed to acquire transfers \ + from oracleDB: %s" %ex) + continue - if len(unique_users) <= self.config.pool_size: - active_users = unique_users - else: - active_users = unique_users[:self.config.pool_size] + self.doc_acq = str(result) + for i in range(len(user)): + if not user[i]: + user[i] = '' + user[i] = str(user[i]) + actives.append(user) - self.logger.info('%s active users' % len(active_users)) - self.logger.debug('Active users are: %s' % active_users) - active_sites_dest = [x['destination'] for x in documents] - active_sites = active_sites_dest + [x['source'] for x in documents] + self.logger.debug("Transfers retrieved from oracleDB. %s " % users) - self.logger.debug('Active sites are: %s' % list(set(active_sites))) - return list(set(active_sites)), active_users + return users def active_users(self, db): """ diff --git a/src/python/AsyncStageOut/TransferWorker.py b/src/python/AsyncStageOut/TransferWorker.py index 4cd4718..089aa4a 100644 --- a/src/python/AsyncStageOut/TransferWorker.py +++ b/src/python/AsyncStageOut/TransferWorker.py @@ -20,11 +20,12 @@ import traceback import subprocess import itertools - +from datetime import timedelta from WMCore.WMFactory import WMFactory from WMCore.Database.CMSCouch import CouchServer from WMCore.Services.pycurl_manager import RequestHandler +import fts3.rest.client.easy as fts3 from AsyncStageOut import getProxy from AsyncStageOut import getHashLfn from AsyncStageOut import getFTServer @@ -116,14 +117,19 @@ def __init__(self, user, tfc_map, config): # Set up a factory for loading plugins self.factory = WMFactory(self.config.pluginDir, namespace=self.config.pluginDir) self.commandTimeout = 1200 - if self.config.isOracle: - self.oracleDB = HTTPRequests(self.config.oracleDB, - self.config.opsProxy, - self.config.opsProxy) - else: - server = CouchServer(dburl=self.config.couch_instance, ckey=self.config.opsProxy, cert=self.config.opsProxy) - self.db = server.connectDatabase(self.config.files_database) - self.fts_server_for_transfer = getFTServer("T1_UK_RAL", 'getRunningFTSserver', self.config_db, self.logger) + try: + if self.config.isOracle: + self.oracleDB = HTTPRequests(self.config.oracleDB, + self.config.opsProxy, + self.config.opsProxy) + else: + server = CouchServer(dburl=self.config.couch_instance, ckey=self.config.opsProxy, cert=self.config.opsProxy) + self.db = server.connectDatabase(self.config.files_database) + config_server = CouchServer(dburl=self.config.config_couch_instance, ckey=self.config.opsProxy, cert=self.config.opsProxy) + self.config_db = config_server.connectDatabase(self.config.config_database) + self.fts_server_for_transfer = getFTServer("T1_UK_RAL", 'getRunningFTSserver', self.config_db, self.logger) + except Exception: + self.logger.exception('Failed to contact DB') self.cache_area = "" if hasattr(self.config, "cache_area"): @@ -160,6 +166,7 @@ def __init__(self, user, tfc_map, config): msg += str(ex) msg += str(traceback.format_exc()) self.logger.error(msg) + self.context = dict() def __call__(self): """ @@ -168,15 +175,15 @@ def __call__(self): c. update status and create dropbox json """ stdout, stderr, rc = None, None, 99999 - fts_url_delegation = self.fts_server_for_transfer.replace('8446', '8443') + #fts_url_delegation = self.fts_server_for_transfer.replace('8446', '8443') if self.user_proxy: - command = 'export X509_USER_PROXY=%s ; source %s ; %s -s %s' % (self.user_proxy, self.uiSetupScript, - 'glite-delegation-init', fts_url_delegation) - init_time = str(time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime())) - self.logger.debug("executing command: %s at: %s for: %s" % (command, init_time, self.userDN)) - stdout, rc = execute_command(command, self.logger, self.commandTimeout) - if not rc or not self.valid_proxy: - jobs, jobs_lfn, jobs_pfn, jobs_report = self.files_for_transfer() + try: + self.context = fts3.Context(self.fts_server_for_transfer, self.user_proxy, self.user_proxy, verify=True) + self.logger.debug(fts3.delegate(self.context, lifetime=timedelta(hours=48), force=False)) + init_time = str(time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime())) + jobs, jobs_lfn, jobs_pfn, jobs_report = self.files_for_transfer() + except: + self.logger.exception('delegation failed') self.logger.debug("Processing files for %s " % self.user_proxy) if jobs: self.command(jobs, jobs_lfn, jobs_pfn, jobs_report) @@ -190,12 +197,22 @@ def source_destinations_by_user(self): Get all the destinations for a user """ if self.config.isOracle: + self.logger.debug('Running acquiredTransfers query... ' + self.user) fileDoc = dict() fileDoc['asoworker'] = self.config.asoworker fileDoc['subresource'] = 'acquiredTransfers' fileDoc['grouping'] = 1 fileDoc['username'] = self.user + if self.group == '': + group = None + if self.role == '': + role = None + fileDoc['vogroup'] = group + fileDoc['vorole'] = role + fileDoc['limit'] = self.config.max_files_per_transfer result = [] + + self.logger.debug('Request: ' + str(fileDoc)) try: results = self.oracleDB.get(self.config.oracleFileTrans, data=encodeRequest(fileDoc)) @@ -206,6 +223,7 @@ def source_destinations_by_user(self): except Exception as ex: self.logger.error("Failed to get acquired transfers \ from oracleDB: %s" %ex) + return [], {} return res, result else: query = {'group': True, @@ -231,7 +249,9 @@ def files_for_transfer(self): jobs_report = {} self.logger.info('%s has %s links to transfer on: %s' % (self.user, len(source_dests), str(source_dests))) try: + count = 0 for (source, destination) in source_dests: + count += 1 self.logger.info('dest1: %s source: %s' % (docs[0]['destination'],source)) if self.config.isOracle: if self.group == '': @@ -265,8 +285,9 @@ def map_active(inputdoc): outDict['value'] = [inputdoc['source_lfn'], inputdoc['destination_lfn']] return outDict active_files = [map_active(x) for x in active_docs] - self.logger.debug('%s has %s files to transfer \ - from %s to %s' % (self.user, + #active_files = active_files[:1000] + self.logger.debug('%s - %s has %s files to transfer \ + from %s to %s' % (count, self.user, len(active_files), source, destination)) @@ -287,6 +308,7 @@ def map_active(inputdoc): pfn_list = [] dash_report = [] + # take these active files and make a copyjob entry def tfc_map(item): self.logger.debug('Preparing PFNs...') @@ -395,13 +417,24 @@ def command(self, jobs, jobs_lfn, jobs_pfn, jobs_report): }, "files": [] } + transfers = list() + #for SrcDest in copyjob: + # self.logger.debug("Creating FTS job...") + # self.logger.debug("%s -> %s" % (SrcDest.split(" ")[0], SrcDest.split(" ")[1])) + # transfers.append(fts3.new_transfer(SrcDest.split(" ")[0], + # SrcDest.split(" ")[1]) + # ) + #except: + # self.logger.exception("Failure during new_transfer") for SrcDest in copyjob: tempDict = {"sources": [], "metadata": None, "destinations": []} tempDict["sources"].append(SrcDest.split(" ")[0]) tempDict["destinations"].append(SrcDest.split(" ")[1]) - rest_copyjob["files"].append(tempDict) + rest_copyjob["files"].append(tempDict) + + #self.logger.debug("FTS job Created with %s files..." % (transfers)) self.logger.debug("Subbmitting this REST copyjob %s" % rest_copyjob) url = self.fts_server_for_transfer + '/jobs' @@ -409,6 +442,32 @@ def command(self, jobs, jobs_lfn, jobs_pfn, jobs_report): self.logger.debug("FTS server: %s" % self.fts_server_for_transfer) self.logger.debug("link: %s -> %s" % link) heade = {"Content-Type ": "application/json"} + + """ + try: + job = fts3.new_job(transfers, + overwrite=True, + verify_checksum=False, + metadata={"issuer": "ASO"}, + copy_pin_lifetime=-1, + bring_online=None, + source_spacetoken=None, + spacetoken=None + # TODO: check why not on fts3 (clone repo maybe?) + # max_time_in_queue=6 + ) + + jobid = fts3.submit(self.context, job) + self.logger.info("Monitor link: https://fts3-pilot.cern.ch:8449/fts3/ftsmon/#/job/"+jobid) + except Exception as ex: + msg = "Error submitting to FTS" + msg += str(ex) + msg += str(traceback.format_exc()) + self.logger.debug(msg) + failure_reasons.append(msg) + submission_error = True + + """ buf = StringIO.StringIO() try: connection = RequestHandler(config={'timeout': 300, 'connecttimeout': 300}) @@ -433,6 +492,7 @@ def command(self, jobs, jobs_lfn, jobs_pfn, jobs_report): failure_reasons.append(msg) submission_error = True buf.close() + if not submission_error: res = {} try: @@ -465,7 +525,7 @@ def command(self, jobs, jobs_lfn, jobs_pfn, jobs_report): self.logger.debug(msg) submission_error = True failure_reasons.append(msg) - self.logger.debug("List files in job %s" % files_) + #self.logger.debug("List files in job %s" % files_) file_buf.close() for file_in_job in files_res: if 'file_id' in file_in_job: @@ -511,7 +571,7 @@ def command(self, jobs, jobs_lfn, jobs_pfn, jobs_report): lfn_report['JobId'] = job_id lfn_report['URL'] = self.fts_server_for_transfer self.logger.debug("Creating json file %s in %s for FTS3 Dashboard" % (lfn_report, self.dropbox_dir)) - dash_job_file = open('/tmp/Dashboard.%s.json' % getHashLfn(lfn_report['PFN']), 'w') + dash_job_file = open('/tmp/DashboardReport/Dashboard.%s.json' % getHashLfn(lfn_report['PFN']), 'w') jsondata = json.dumps(lfn_report) dash_job_file.write(jsondata) dash_job_file.close() @@ -554,15 +614,17 @@ def mark_acquired(self, files=[]): fileDoc = dict() fileDoc['asoworker'] = self.config.asoworker fileDoc['subresource'] = 'updateTransfers' - fileDoc['list_of_ids'] = toUpdate - fileDoc['list_of_transfer_state'] = ["SUBMITTED" for x in toUpdate] + fileDoc['list_of_ids'] = files[0]['key'][5] + fileDoc['list_of_transfer_state'] = "SUBMITTED" + + self.logger.debug("Marking acquired %s" % (fileDoc)) result = self.oracleDB.post(self.config.oracleFileTrans, data=encodeRequest(fileDoc)) + self.logger.debug("Marked acquired %s of %s" % (fileDoc, result)) except Exception as ex: self.logger.error("Error during status update: %s" %ex) - self.logger.debug("Marked acquired %s of %s" % (docId, lfn)) # TODO: no need of mark good right? the postjob should updated the status in case of direct stageout I think return lfn_in_transfer, dash_rep else: @@ -668,7 +730,8 @@ def mark_failed(self, files=[], force_fail=False, submission_error=False): docId = getHashLfn(temp_lfn) self.logger.debug("Marking failed %s" % docId) try: - docbyId = self.oracleDB.get(self.config.oracleFileTrans, + docbyId = self.oracleDB.get(self.config.oracleFileTrans.replace('filetransfers', + 'fileusertransfers'), data=encodeRequest({'subresource': 'getById', 'id': docId})) except Exception as ex: self.logger.error("Error updating failed docs: %s" %ex) diff --git a/src/python/test.py b/src/python/test.py index 3ebe3eb..796870f 100644 --- a/src/python/test.py +++ b/src/python/test.py @@ -16,7 +16,6 @@ '/data/srv/asyncstageout/state/asyncstageout/creds/OpsProxy', '/data/srv/asyncstageout/state/asyncstageout/creds/OpsProxy') -""" fileDoc = {} fileDoc['asoworker'] = 'asodciangot1' fileDoc['subresource'] = 'acquireTransfers' @@ -26,7 +25,7 @@ print(result) - +""" fileDoc = {} fileDoc['asoworker'] = 'asodciangot1' fileDoc['subresource'] = 'acquiredTransfers' @@ -45,7 +44,7 @@ result = server.get('/crabserver/dev/filetransfers', data=encodeRequest(fileDoc)) -""" + #print (oracleOutputMapping(result)) fileDoc = {} fileDoc['asoworker'] = 'asodciangot1' @@ -62,4 +61,4 @@ data=encodeRequest(fileDoc)) print(result) - +"""