Skip to content

Commit

Permalink
Introduce new logic basedon ops feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet committed Nov 20, 2024
1 parent 0b40374 commit d041a52
Showing 1 changed file with 17 additions and 17 deletions.
34 changes: 17 additions & 17 deletions src/python/WMCore/MicroService/MSTransferor/MSTransferor.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,17 @@ def makeTransferRucio(self, wflow, dataIn, dids, dataSize, grouping, copies, nod
self.logger.info("Creating rule for workflow %s with %d DIDs in container %s, RSEs: %s, grouping: %s",
wflow.getName(), len(dids), dataIn['name'], rseExpr, grouping)
try:
res = self.rucio.createReplicationRule(dids, rseExpr, **ruleAttrs)
# make decision about current workflow, if it is new request we'll create
# new replication rule, otherwise we'll move replication rule
if wflow.updatedWorkflow:
rules = self.rucio.listDataRules(wflow.getName())
for rdoc in rules:
for rid in rdoc['ruleIds']:
# the moveReplcationRule will raise different exceptions
# if move operation is not normal
self.rucio.moveReplicationRule(rid, rseExpr, **ruleAttrs)
else:
res = self.rucio.createReplicationRule(dids, rseExpr, **ruleAttrs)
except Exception as exc:
msg = "Hit a bad exception while creating replication rules for DID: %s. Error: %s"
self.logger.error(msg, dids, str(exc))
Expand Down Expand Up @@ -799,30 +809,20 @@ def _updateSites(self, wflow):
rseExp = rdoc['rse_expression']
if rseExp != rse:
continue
rid = rdoc['id']
status = self.rucio.deleteRule(rid)
if not status:
err = f"Unable to delete rule {rid} (RSE={rse} for {wflowName}"
self.logger.error(err)
errors.append({'workflow': wflowName, 'error': err})
del newRSEs[rse]

# if site was added to SiteWhiteList add new rule id
if rse in siteWhiteList:
self.logger.info("Add rse %s to workflow %s", rse, wflowName)
for rdoc in rules:
rseExp = rdoc['rse_expression']
if rseExp != rse:
continue
self.rucio.createReplicationRule(wflowName, rse)
for rid in rdoc['ruleIds']:
opts = {}
status = self.rucio.updateRule(rid, opts)
newRSEs[rse] = None
if not status:
err = f"Unable to update rule {rid} (RSE={rse} for {wflowName}"
self.logger.error(err)
errors.append({'workflow': wflowName, 'error': err})
newRSEs[rse] = None

# persist new rule ids in a transfer document
wflow.pileupRSEList = set(newRSEs.keys())
if len(newRSEs) > 0:
self.logger.info("Workflow %s is updated with new RSE list %s", wflowName, set(newRSEs.keys()))
wflow.pileupRSEList = set(newRSEs.keys())
wflow.updatedWorkflow = True
return errors

0 comments on commit d041a52

Please sign in to comment.