From d041a522a05e582f725b00c50c93720be823eca8 Mon Sep 17 00:00:00 2001 From: Valentin Kuznetsov Date: Wed, 20 Nov 2024 16:22:28 -0500 Subject: [PATCH] Introduce new logic basedon ops feedback --- .../MicroService/MSTransferor/MSTransferor.py | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/python/WMCore/MicroService/MSTransferor/MSTransferor.py b/src/python/WMCore/MicroService/MSTransferor/MSTransferor.py index 0af2d2c0ee..69094f710b 100644 --- a/src/python/WMCore/MicroService/MSTransferor/MSTransferor.py +++ b/src/python/WMCore/MicroService/MSTransferor/MSTransferor.py @@ -520,7 +520,17 @@ def makeTransferRucio(self, wflow, dataIn, dids, dataSize, grouping, copies, nod self.logger.info("Creating rule for workflow %s with %d DIDs in container %s, RSEs: %s, grouping: %s", wflow.getName(), len(dids), dataIn['name'], rseExpr, grouping) try: - res = self.rucio.createReplicationRule(dids, rseExpr, **ruleAttrs) + # make decision about current workflow, if it is new request we'll create + # new replication rule, otherwise we'll move replication rule + if wflow.updatedWorkflow: + rules = self.rucio.listDataRules(wflow.getName()) + for rdoc in rules: + for rid in rdoc['ruleIds']: + # the moveReplcationRule will raise different exceptions + # if move operation is not normal + self.rucio.moveReplicationRule(rid, rseExpr, **ruleAttrs) + else: + res = self.rucio.createReplicationRule(dids, rseExpr, **ruleAttrs) except Exception as exc: msg = "Hit a bad exception while creating replication rules for DID: %s. Error: %s" self.logger.error(msg, dids, str(exc)) @@ -799,30 +809,20 @@ def _updateSites(self, wflow): rseExp = rdoc['rse_expression'] if rseExp != rse: continue - rid = rdoc['id'] - status = self.rucio.deleteRule(rid) - if not status: - err = f"Unable to delete rule {rid} (RSE={rse} for {wflowName}" - self.logger.error(err) - errors.append({'workflow': wflowName, 'error': err}) del newRSEs[rse] # if site was added to SiteWhiteList add new rule id if rse in siteWhiteList: self.logger.info("Add rse %s to workflow %s", rse, wflowName) for rdoc in rules: + rseExp = rdoc['rse_expression'] if rseExp != rse: continue - self.rucio.createReplicationRule(wflowName, rse) - for rid in rdoc['ruleIds']: - opts = {} - status = self.rucio.updateRule(rid, opts) - newRSEs[rse] = None - if not status: - err = f"Unable to update rule {rid} (RSE={rse} for {wflowName}" - self.logger.error(err) - errors.append({'workflow': wflowName, 'error': err}) + newRSEs[rse] = None # persist new rule ids in a transfer document - wflow.pileupRSEList = set(newRSEs.keys()) + if len(newRSEs) > 0: + self.logger.info("Workflow %s is updated with new RSE list %s", wflowName, set(newRSEs.keys())) + wflow.pileupRSEList = set(newRSEs.keys()) + wflow.updatedWorkflow = True return errors