diff --git a/src/python/WMCore/MicroService/MSCore/MSManager.py b/src/python/WMCore/MicroService/MSCore/MSManager.py index d4cbf75dc2..d8acf86fef 100644 --- a/src/python/WMCore/MicroService/MSCore/MSManager.py +++ b/src/python/WMCore/MicroService/MSCore/MSManager.py @@ -34,6 +34,7 @@ from WMCore.MicroService.MSCore.TaskManager import start_new_thread from Utils.Utilities import strToBool + def daemon(func, reqStatus, interval, logger): "Daemon to perform given function action for all request in our store" while True: @@ -398,16 +399,18 @@ def post(self, doc): :param doc: input JSON doc for HTTP POST request :return: list of results """ + res = [] if 'pileup' in self.services: - res = [] if 'pileupName' in doc: # this is create POST request res = self.msPileup.createPileup(doc) if 'query' in doc: # this is POST request to get data for a given JSON query res = self.msPileup.queryDatabase(doc) - return res - return [] + elif 'transferor' in self.services: + if 'workflow' in doc: + res = self.msTransferor.updateSites(doc) + return res def update(self, doc): """ diff --git a/src/python/WMCore/MicroService/MSTransferor/DataStructs/Workflow.py b/src/python/WMCore/MicroService/MSTransferor/DataStructs/Workflow.py index 4606917c0e..42a686ecb9 100644 --- a/src/python/WMCore/MicroService/MSTransferor/DataStructs/Workflow.py +++ b/src/python/WMCore/MicroService/MSTransferor/DataStructs/Workflow.py @@ -34,6 +34,10 @@ def __init__(self, reqName, reqData, logger=None, verbose=False): self.pileupDatasets = set() self.pileupRSEList = set() + # flag to indicate if workflow is updated, e.g. see MSTransferor when we + # update site white/black lists and setup this flag + self.dataReplacement = False + self.campaigns = set() self.dataCampaignMap = [] # these blocks structure will be key'ed by the block name and value'd by the block size diff --git a/src/python/WMCore/MicroService/MSTransferor/MSTransferor.py b/src/python/WMCore/MicroService/MSTransferor/MSTransferor.py index 6be0fda8ef..115dc23194 100644 --- a/src/python/WMCore/MicroService/MSTransferor/MSTransferor.py +++ b/src/python/WMCore/MicroService/MSTransferor/MSTransferor.py @@ -15,6 +15,7 @@ standard_library.install_aliases() # system modules +import os from operator import itemgetter from pprint import pformat from retry import retry @@ -27,10 +28,12 @@ from WMCore.MicroService.Tools.Common import (teraBytes, isRelVal) from WMCore.MicroService.MSCore.MSCore import MSCore from WMCore.MicroService.MSTransferor.RequestInfo import RequestInfo +from WMCore.MicroService.MSTransferor.MSTransferorError import MSTransferorStorageError from WMCore.MicroService.MSTransferor.DataStructs.RSEQuotas import RSEQuotas from WMCore.Services.CRIC.CRIC import CRIC from WMCore.Services.MSPileup.MSPileupUtils import getPileupDocs from WMCore.Services.Rucio.RucioUtils import GROUPING_ALL +from WMCore.Lexicon import requestName def newTransferRec(dataIn): @@ -72,6 +75,12 @@ def __init__(self, msConfig, logger=None): """ super(MSTransferor, self).__init__(msConfig, logger=logger) + # persistent area for site list processing + wdir = '{}/storage'.format(os.getcwd()) + self.storage = self.msConfig.get('persistentArea', wdir) + os.makedirs(self.storage, exist_ok=True) + self.logger.info("Using directory %s as workflow persistent area", self.storage) + # minimum percentage completion for dataset/blocks subscribed self.msConfig.setdefault("minPercentCompletion", 99) # minimum available storage to consider a resource good for receiving data @@ -91,9 +100,9 @@ def __init__(self, msConfig, logger=None): self.pileupQuery = self.msConfig.get("pileupQuery", {"query": {"active": True}, "filters": ["expectedRSEs", "pileupName"]}) - quotaAccount = self.msConfig["rucioAccount"] + self.quotaAccount = self.msConfig["rucioAccount"] - self.rseQuotas = RSEQuotas(quotaAccount, self.msConfig["quotaUsage"], + self.rseQuotas = RSEQuotas(self.quotaAccount, self.msConfig["quotaUsage"], minimumThreshold=self.msConfig["minimumThreshold"], verbose=self.msConfig['verbose'], logger=logger) self.reqInfo = RequestInfo(self.msConfig, self.rucio, self.logger) @@ -210,6 +219,10 @@ def execute(self, reqStatus): # now check where input primary and parent blocks will need to go self.checkDataLocation(wflow, rseList) + # check if our workflow needs an update, if so wflow.dataReplacement flag is set + # which will be used by makeTransferRucio->moveReplicationRule chain of API calls + self.checkDataReplacement(wflow) + try: success, transfers = self.makeTransferRequest(wflow, rseList) except Exception as ex: @@ -224,12 +237,16 @@ def execute(self, reqStatus): wflow.getName(), pformat(transfers)) if self.createTransferDoc(wflow.getName(), transfers): self.logger.info("Transfer document successfully created in CouchDB for: %s", wflow.getName()) - # then move this request to staging status - self.change(wflow.getName(), 'staging', self.__class__.__name__) + # then move this request to staging status but only if we did't do data replacement + if not wflow.dataReplacement: + self.change(wflow.getName(), 'staging', self.__class__.__name__) counterSuccessRequests += 1 else: counterFailedRequests += 1 self.alertTransferCouchDBError(wflow.getName()) + # clean-up local persistent storage if move operation was successful + if wflow.dataReplacement: + self.cleanupStorage(wflow.getName()) else: counterFailedRequests += 1 # it can go slightly beyond the limit. It's evaluated for every slice @@ -498,8 +515,23 @@ def makeTransferRucio(self, wflow, dataIn, dids, dataSize, grouping, copies, nod # Then make the data subscription, for real!!! self.logger.info("Creating rule for workflow %s with %d DIDs in container %s, RSEs: %s, grouping: %s", wflow.getName(), len(dids), dataIn['name'], rseExpr, grouping) + + # define list of rules ids we collect either from createReplicationRule or + # moveReplicationRule API calls + res = [] try: - res = self.rucio.createReplicationRule(dids, rseExpr, **ruleAttrs) + # make decision about current workflow, if it is new request we'll create + # new replication rule, otherwise we'll move replication rule + if wflow.dataReplacement: + rids = self.getRuleIdsFromDoc(wflow.getName()) + self.logger.info("Going to move %s rules for workflow: %s", len(rids), wflow.getName()) + for rid in rids: + # the self.rucio.moveReplicationRule may raise different exceptions + # based on different outcome of the operation + res += self.rucio.moveReplicationRule(rid, rseExpr, self.quotaAccount) + self.logger.info("Rule %s was moved and the new rule is %s", rid, res) + else: + res = self.rucio.createReplicationRule(dids, rseExpr, **ruleAttrs) except Exception as exc: msg = "Hit a bad exception while creating replication rules for DID: %s. Error: %s" self.logger.error(msg, dids, str(exc)) @@ -521,6 +553,22 @@ def makeTransferRucio(self, wflow, dataIn, dids, dataSize, grouping, copies, nod self.logger.info(msg, wflow.getName(), dids, rseExpr, ruleAttrs) return success, transferId + def getRuleIdsFromDoc(self, workflowName): + """ + Obtain transfer IDs for given workflow name + :param workflowName: workflow name + :return: list of transfer IDs + """ + # make request to ReqMgr2 service + # https://xxx.cern.ch/reqmgr2/data/transferinfo/ + tids = [] + data = self.reqmgrAux.getTransferInfo(workflowName) + for row in data['result']: + transfers = row['transferDoc']['transfers'] + for rec in transfers: + tids += rec['transferIDs'] + return list(set(tids)) + def alertPUMisconfig(self, workflowName): """ Send alert to Prometheus with PU misconfiguration error @@ -676,3 +724,61 @@ def _diskPNNs(self, pnnList): else: diskPNNs.add(pnn) return diskPNNs + + def updateSites(self, rec): + """ + Update sites API provides asynchronous update of site list information + :param rec: JSON payload with the following data structures: {'workflow': } + :return: either empty list (no errors) or list of errors + """ + # preserve provided payload to local file system + wflowName = rec['workflow'] + status = self.updateStorage(wflowName) + if status == 'ok': + return [] + err = MSTransferorStorageError(status, **rec) + self.logger.error(err) + return [err.error()] + + def updateStorage(self, wflowName): + """ + Save workflow data to persistent storage + :param wflowName: name of the workflow + :return: status of this operation + """ + try: + fname = os.path.join(self.storage, wflowName) + with open(fname, 'w', encoding="utf-8"): + if requestName(wflowName): + # we perform touch operation on file system, i.e. create empty file + self.logger.info("Creating workflow entry %s in the persistent storage", wflowName) + os.utime(fname, None) + else: + return "error: fail to pass Lexicon validation" + return 'ok' + except Exception as exp: + msg = "Unable to save workflow '%s' to storage=%s. Error: %s" % (wflowName, self.storage, str(exp)) + self.logger.exception(msg) + return str(exp) + + def checkDataReplacement(self, wflow): + """ + Check if given workflow exists on local storage and set dataReplacement flag if it is the case + :param wflow: workflow object + :return: nothing + """ + fname = '{}/{}'.format(self.storage, wflow.getName()) + if os.path.exists(fname): + self.logger.info("Workflow %s is set for data replacement", wflow.getName()) + wflow.dataReplacement = True + + def cleanupStorage(self, wflowName): + """ + Remove workflow from persistent storage + :param wflowName: name of workflow + :return: nothing + """ + fname = os.path.join(self.storage, wflowName) + if os.path.exists(fname): + self.logger.info("Cleanup workflow entry %s in the persistent storage", wflowName) + os.remove(fname) diff --git a/src/python/WMCore/MicroService/MSTransferor/MSTransferorError.py b/src/python/WMCore/MicroService/MSTransferor/MSTransferorError.py new file mode 100644 index 0000000000..41b0fa217c --- /dev/null +++ b/src/python/WMCore/MicroService/MSTransferor/MSTransferorError.py @@ -0,0 +1,86 @@ +""" +File : MSTransferorError.py +Author : Valentin Kuznetsov +Description: MSTransferorError represents MSTransferor errors +""" + +# system modules +import json + +# WMCore modules +from WMCore.WMException import WMException + + +# MSTransferor error codes +MSPILEUP_GENERIC_ERROR = 1 +MSPILEUP_STORAGE_ERROR = 2 + + +class MSTransferorError(WMException): + """ + MSTransferorError represents generic MSTransferor error + """ + def __init__(self, msg, errorNo=None, **data): + """ + Constructor of MSTransferorError class + """ + super().__init__(self, msg) + self.data = data + self.code = errorNo + self.assign(msg) + + def error(self): + """ + JSON representation of MSTransferorError + + :return: MSTransferorError representation + """ + edict = {'data': self.data, 'message': self.msg, 'code': self.code, 'error': 'MSTransferorError'} + return edict + + def __str__(self): + """ + String representation of MSTransferorError + + :return: human readable MSTransferorError representation + """ + return json.dumps(self.error(), indent=4) + + def assign(self, msg="", code=0): + """ + Assign proper message and error codes + + :param msg: string + :param code: int + :return: None + """ + if msg: + self.msg = msg + else: + self.msg = 'Generic MSTransferor error' + if code > 0: + self.code = code + else: + self.code = MSPILEUP_GENERIC_ERROR + + +class MSTransferorGenericError(MSTransferorError): + """ + Generic MSTransferor exception + """ + def __init__(self, msg, errorNo=None, **data): + super().__init__(msg, errorNo, **data) + if not msg: + msg = "generic error" + self.assign(msg=msg, code=MSPILEUP_GENERIC_ERROR) + + +class MSTransferorStorageError(MSTransferorError): + """ + Storage MSTransferor exception + """ + def __init__(self, msg, errorNo=None, **data): + super().__init__(msg, errorNo, **data) + if not msg: + msg = "storage error" + self.assign(msg=msg, code=MSPILEUP_STORAGE_ERROR) diff --git a/src/python/WMCore/Services/Rucio/Rucio.py b/src/python/WMCore/Services/Rucio/Rucio.py index 7d439abdc2..8ff439588e 100644 --- a/src/python/WMCore/Services/Rucio/Rucio.py +++ b/src/python/WMCore/Services/Rucio/Rucio.py @@ -16,7 +16,7 @@ from rucio.client import Client from rucio.common.exception import (AccountNotFound, DataIdentifierNotFound, AccessDenied, DuplicateRule, DataIdentifierAlreadyExists, DuplicateContent, InvalidRSEExpression, - UnsupportedOperation, FileAlreadyExists, RuleNotFound, RSENotFound) + UnsupportedOperation, FileAlreadyExists, RuleNotFound, RSENotFound, RuleReplaceFailed) from Utils.MemoryCache import MemoryCache from Utils.IteratorTools import grouper from WMCore.Services.Rucio.RucioUtils import (validateMetaData, weightedChoice, @@ -454,6 +454,30 @@ def closeBlockContainer(self, name, scope='cms'): self.logger.error("Exception closing container/block: %s. Error: %s", name, str(ex)) return response + def moveReplicationRule(self, ruleId, rseExpression, account): + """ + Perform move operation for provided rule id and rse expression + :param ruleId: rule id + :param rseExpression: rse expression + :param account: rucio quota account + :return: it returns either an empty list or a list with a string id for the rule created + Please note, we made return type from this wrapper compatible with createReplicateRule + """ + ruleIds = [] + try: + rid = self.cli.move_replication_rule(ruleId, rseExpression, account) + ruleIds.append(rid) + except RuleNotFound as ex: + msg = "RuleNotFound move DID replication rule. Error: %s" % str(ex) + raise WMRucioException(msg) from ex + except RuleReplaceFailed as ex: + msg = "RuleReplaceFailed move DID replication rule. Error: %s" % str(ex) + raise WMRucioException(msg) from ex + except Exception as ex: + msg = "Unsupported exception from Rucio API. Error: %s" % str(ex) + raise WMRucioException(msg) from ex + return ruleIds + def createReplicationRule(self, names, rseExpression, scope='cms', copies=1, **kwargs): """ _createReplicationRule_ diff --git a/test/python/WMCore_t/MicroService_t/MSTransferor_t/MSTransferorError_t.py b/test/python/WMCore_t/MicroService_t/MSTransferor_t/MSTransferorError_t.py new file mode 100644 index 0000000000..c70a60fede --- /dev/null +++ b/test/python/WMCore_t/MicroService_t/MSTransferor_t/MSTransferorError_t.py @@ -0,0 +1,38 @@ +""" +Unit tests for MSTransferorError.py module + +Author: Valentin Kuznetsov +""" + +# system modules +import unittest + +# WMCore modules +from WMCore.MicroService.MSTransferor.MSTransferorError import MSTransferorStorageError, MSPILEUP_STORAGE_ERROR + + +class TransferorErrorTest(unittest.TestCase): + "Unit test for TransferorError module" + + def testError(self): + """Test MSTransferorError""" + rec = {'workflow': 'testWorkflow'} + + # test custom emessage + msg = 'test error' + err = MSTransferorStorageError(msg, **rec) + edict = err.error() + self.assertEqual(edict['message'], msg) + self.assertEqual(edict['code'], MSPILEUP_STORAGE_ERROR) + self.assertEqual(edict['data'], rec) + + # test default assigned message + err = MSTransferorStorageError('', **rec) + edict = err.error() + self.assertEqual(edict['message'], 'storage error') + self.assertEqual(edict['code'], MSPILEUP_STORAGE_ERROR) + self.assertEqual(edict['data'], rec) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/python/WMCore_t/MicroService_t/MSTransferor_t/MSTransferor_t.py b/test/python/WMCore_t/MicroService_t/MSTransferor_t/MSTransferor_t.py index a5b382d65b..0ea7203b11 100644 --- a/test/python/WMCore_t/MicroService_t/MSTransferor_t/MSTransferor_t.py +++ b/test/python/WMCore_t/MicroService_t/MSTransferor_t/MSTransferor_t.py @@ -14,6 +14,8 @@ from Utils.PythonVersion import PY3 from WMCore.MicroService.MSTransferor.MSTransferor import MSTransferor from WMQuality.Emulators.EmulatedUnitTestCase import EmulatedUnitTestCase +from WMCore.MicroService.MSTransferor.MSTransferorError import MSTransferorStorageError +from WMCore.MicroService.MSTransferor.DataStructs.Workflow import Workflow def getTestFile(partialPath): @@ -141,6 +143,34 @@ def notestRequestRecord(self): for idx in range(len(resp)): self.assertItemsEqual(resp[idx], expectedRes[idx]) + def testUpdateStorage(self): + """ + Test updateStorage method. We should be able to save and read + JSON objects to persistent storage of MSTransferor. + """ + # use default storage and check save/read operations + wflow = 'testWorkflow' + status = self.msTransferor.updateStorage(wflow) + self.assertEqual(status, 'ok') + wflowObject = Workflow(wflow, {'DbsUrl': 'https://cmsweb-testbed.cern.ch', 'RequestType': 'StoreResults'}) + self.msTransferor.checkDataReplacement(wflowObject) + self.assertEqual(wflowObject.dataReplacement, True) + + def testUpdateSites(self): + """ + Test the updateSites method. + """ + rec = {'workflow': 'testWorkflow'} + res = self.msTransferor.updateSites(rec) + self.assertEqual(res, []) + + # now let's test transferor error + self.msTransferor.storage = '/bla' + res = self.msTransferor.updateSites(rec) + err = MSTransferorStorageError("[Errno 2] No such file or directory: '/bla/testWorkflow'", **rec) + self.assertEqual(res, [err.error()]) + self.assertEqual(err.code, 2) + if __name__ == '__main__': unittest.main()