Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement logic to remake input data placement upon site list changes #12155

Merged
merged 2 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/python/WMCore/MicroService/MSCore/MSManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from WMCore.MicroService.MSCore.TaskManager import start_new_thread
from Utils.Utilities import strToBool


def daemon(func, reqStatus, interval, logger):
"Daemon to perform given function action for all request in our store"
while True:
Expand Down Expand Up @@ -398,16 +399,18 @@ def post(self, doc):
:param doc: input JSON doc for HTTP POST request
:return: list of results
"""
res = []
if 'pileup' in self.services:
res = []
if 'pileupName' in doc:
# this is create POST request
res = self.msPileup.createPileup(doc)
if 'query' in doc:
# this is POST request to get data for a given JSON query
res = self.msPileup.queryDatabase(doc)
return res
return []
elif 'transferor' in self.services:
if 'workflow' in doc:
res = self.msTransferor.updateSites(doc)
return res

def update(self, doc):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ def __init__(self, reqName, reqData, logger=None, verbose=False):
self.pileupDatasets = set()
self.pileupRSEList = set()

# flag to indicate if workflow is updated, e.g. see MSTransferor when we
# update site white/black lists and setup this flag
self.dataReplacement = False

self.campaigns = set()
self.dataCampaignMap = []
# these blocks structure will be key'ed by the block name and value'd by the block size
Expand Down
116 changes: 111 additions & 5 deletions src/python/WMCore/MicroService/MSTransferor/MSTransferor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
standard_library.install_aliases()

# system modules
import os
from operator import itemgetter
from pprint import pformat
from retry import retry
Expand All @@ -27,10 +28,12 @@
from WMCore.MicroService.Tools.Common import (teraBytes, isRelVal)
from WMCore.MicroService.MSCore.MSCore import MSCore
from WMCore.MicroService.MSTransferor.RequestInfo import RequestInfo
from WMCore.MicroService.MSTransferor.MSTransferorError import MSTransferorStorageError
from WMCore.MicroService.MSTransferor.DataStructs.RSEQuotas import RSEQuotas
from WMCore.Services.CRIC.CRIC import CRIC
from WMCore.Services.MSPileup.MSPileupUtils import getPileupDocs
from WMCore.Services.Rucio.RucioUtils import GROUPING_ALL
from WMCore.Lexicon import requestName


def newTransferRec(dataIn):
Expand Down Expand Up @@ -72,6 +75,12 @@ def __init__(self, msConfig, logger=None):
"""
super(MSTransferor, self).__init__(msConfig, logger=logger)

# persistent area for site list processing
wdir = '{}/storage'.format(os.getcwd())
vkuznet marked this conversation as resolved.
Show resolved Hide resolved
self.storage = self.msConfig.get('persistentArea', wdir)
amaltaro marked this conversation as resolved.
Show resolved Hide resolved
os.makedirs(self.storage, exist_ok=True)
self.logger.info("Using directory %s as workflow persistent area", self.storage)

# minimum percentage completion for dataset/blocks subscribed
self.msConfig.setdefault("minPercentCompletion", 99)
# minimum available storage to consider a resource good for receiving data
Expand All @@ -91,9 +100,9 @@ def __init__(self, msConfig, logger=None):
self.pileupQuery = self.msConfig.get("pileupQuery",
{"query": {"active": True}, "filters": ["expectedRSEs", "pileupName"]})

quotaAccount = self.msConfig["rucioAccount"]
self.quotaAccount = self.msConfig["rucioAccount"]

self.rseQuotas = RSEQuotas(quotaAccount, self.msConfig["quotaUsage"],
self.rseQuotas = RSEQuotas(self.quotaAccount, self.msConfig["quotaUsage"],
minimumThreshold=self.msConfig["minimumThreshold"],
verbose=self.msConfig['verbose'], logger=logger)
self.reqInfo = RequestInfo(self.msConfig, self.rucio, self.logger)
Expand Down Expand Up @@ -210,6 +219,10 @@ def execute(self, reqStatus):
# now check where input primary and parent blocks will need to go
self.checkDataLocation(wflow, rseList)

# check if our workflow needs an update, if so wflow.dataReplacement flag is set
# which will be used by makeTransferRucio->moveReplicationRule chain of API calls
self.checkDataReplacement(wflow)

try:
success, transfers = self.makeTransferRequest(wflow, rseList)
except Exception as ex:
Expand All @@ -224,12 +237,16 @@ def execute(self, reqStatus):
wflow.getName(), pformat(transfers))
if self.createTransferDoc(wflow.getName(), transfers):
self.logger.info("Transfer document successfully created in CouchDB for: %s", wflow.getName())
# then move this request to staging status
self.change(wflow.getName(), 'staging', self.__class__.__name__)
# then move this request to staging status but only if we did't do data replacement
if not wflow.dataReplacement:
self.change(wflow.getName(), 'staging', self.__class__.__name__)
counterSuccessRequests += 1
else:
counterFailedRequests += 1
self.alertTransferCouchDBError(wflow.getName())
# clean-up local persistent storage if move operation was successful
if wflow.dataReplacement:
self.cleanupStorage(wflow.getName())
else:
counterFailedRequests += 1
# it can go slightly beyond the limit. It's evaluated for every slice
Expand Down Expand Up @@ -498,8 +515,23 @@ def makeTransferRucio(self, wflow, dataIn, dids, dataSize, grouping, copies, nod
# Then make the data subscription, for real!!!
self.logger.info("Creating rule for workflow %s with %d DIDs in container %s, RSEs: %s, grouping: %s",
wflow.getName(), len(dids), dataIn['name'], rseExpr, grouping)

# define list of rules ids we collect either from createReplicationRule or
# moveReplicationRule API calls
res = []
try:
res = self.rucio.createReplicationRule(dids, rseExpr, **ruleAttrs)
# make decision about current workflow, if it is new request we'll create
# new replication rule, otherwise we'll move replication rule
if wflow.dataReplacement:
rids = self.getRuleIdsFromDoc(wflow.getName())
vkuznet marked this conversation as resolved.
Show resolved Hide resolved
self.logger.info("Going to move %s rules for workflow: %s", len(rids), wflow.getName())
for rid in rids:
# the self.rucio.moveReplicationRule may raise different exceptions
# based on different outcome of the operation
res += self.rucio.moveReplicationRule(rid, rseExpr, self.quotaAccount)
vkuznet marked this conversation as resolved.
Show resolved Hide resolved
self.logger.info("Rule %s was moved and the new rule is %s", rid, res)
else:
res = self.rucio.createReplicationRule(dids, rseExpr, **ruleAttrs)
except Exception as exc:
msg = "Hit a bad exception while creating replication rules for DID: %s. Error: %s"
self.logger.error(msg, dids, str(exc))
Expand All @@ -521,6 +553,22 @@ def makeTransferRucio(self, wflow, dataIn, dids, dataSize, grouping, copies, nod
self.logger.info(msg, wflow.getName(), dids, rseExpr, ruleAttrs)
return success, transferId

def getRuleIdsFromDoc(self, workflowName):
"""
Obtain transfer IDs for given workflow name
:param workflowName: workflow name
:return: list of transfer IDs
"""
# make request to ReqMgr2 service
# https://xxx.cern.ch/reqmgr2/data/transferinfo/<workflowName>
tids = []
data = self.reqmgrAux.getTransferInfo(workflowName)
for row in data['result']:
transfers = row['transferDoc']['transfers']
for rec in transfers:
tids += rec['transferIDs']
return list(set(tids))

def alertPUMisconfig(self, workflowName):
"""
Send alert to Prometheus with PU misconfiguration error
Expand Down Expand Up @@ -676,3 +724,61 @@ def _diskPNNs(self, pnnList):
else:
diskPNNs.add(pnn)
return diskPNNs

def updateSites(self, rec):
"""
Update sites API provides asynchronous update of site list information
:param rec: JSON payload with the following data structures: {'workflow': <wflow name>}
:return: either empty list (no errors) or list of errors
"""
# preserve provided payload to local file system
wflowName = rec['workflow']
status = self.updateStorage(wflowName)
if status == 'ok':
return []
err = MSTransferorStorageError(status, **rec)
self.logger.error(err)
return [err.error()]

def updateStorage(self, wflowName):
"""
Save workflow data to persistent storage
:param wflowName: name of the workflow
:return: status of this operation
"""
try:
fname = os.path.join(self.storage, wflowName)
with open(fname, 'w', encoding="utf-8"):
if requestName(wflowName):
# we perform touch operation on file system, i.e. create empty file
self.logger.info("Creating workflow entry %s in the persistent storage", wflowName)
os.utime(fname, None)
else:
return "error: fail to pass Lexicon validation"
return 'ok'
except Exception as exp:
msg = "Unable to save workflow '%s' to storage=%s. Error: %s" % (wflowName, self.storage, str(exp))
self.logger.exception(msg)
mapellidario marked this conversation as resolved.
Show resolved Hide resolved
return str(exp)

def checkDataReplacement(self, wflow):
"""
Check if given workflow exists on local storage and set dataReplacement flag if it is the case
:param wflow: workflow object
:return: nothing
"""
fname = '{}/{}'.format(self.storage, wflow.getName())
if os.path.exists(fname):
self.logger.info("Workflow %s is set for data replacement", wflow.getName())
wflow.dataReplacement = True
vkuznet marked this conversation as resolved.
Show resolved Hide resolved

def cleanupStorage(self, wflowName):
"""
Remove workflow from persistent storage
:param wflowName: name of workflow
:return: nothing
"""
fname = os.path.join(self.storage, wflowName)
if os.path.exists(fname):
self.logger.info("Cleanup workflow entry %s in the persistent storage", wflowName)
os.remove(fname)
86 changes: 86 additions & 0 deletions src/python/WMCore/MicroService/MSTransferor/MSTransferorError.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""
File : MSTransferorError.py
Author : Valentin Kuznetsov <vkuznet AT gmail dot com>
Description: MSTransferorError represents MSTransferor errors
"""

# system modules
import json

# WMCore modules
from WMCore.WMException import WMException


# MSTransferor error codes
MSPILEUP_GENERIC_ERROR = 1
MSPILEUP_STORAGE_ERROR = 2


class MSTransferorError(WMException):
"""
MSTransferorError represents generic MSTransferor error
"""
def __init__(self, msg, errorNo=None, **data):
"""
Constructor of MSTransferorError class
"""
super().__init__(self, msg)
self.data = data
self.code = errorNo
self.assign(msg)

def error(self):
"""
JSON representation of MSTransferorError

:return: MSTransferorError representation
"""
edict = {'data': self.data, 'message': self.msg, 'code': self.code, 'error': 'MSTransferorError'}
return edict

def __str__(self):
"""
String representation of MSTransferorError

:return: human readable MSTransferorError representation
"""
return json.dumps(self.error(), indent=4)

def assign(self, msg="", code=0):
"""
Assign proper message and error codes

:param msg: string
:param code: int
:return: None
"""
if msg:
self.msg = msg
else:
self.msg = 'Generic MSTransferor error'
if code > 0:
self.code = code
else:
self.code = MSPILEUP_GENERIC_ERROR


class MSTransferorGenericError(MSTransferorError):
"""
Generic MSTransferor exception
"""
def __init__(self, msg, errorNo=None, **data):
super().__init__(msg, errorNo, **data)
if not msg:
msg = "generic error"
self.assign(msg=msg, code=MSPILEUP_GENERIC_ERROR)


class MSTransferorStorageError(MSTransferorError):
"""
Storage MSTransferor exception
"""
def __init__(self, msg, errorNo=None, **data):
super().__init__(msg, errorNo, **data)
if not msg:
msg = "storage error"
self.assign(msg=msg, code=MSPILEUP_STORAGE_ERROR)
26 changes: 25 additions & 1 deletion src/python/WMCore/Services/Rucio/Rucio.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from rucio.client import Client
from rucio.common.exception import (AccountNotFound, DataIdentifierNotFound, AccessDenied, DuplicateRule,
DataIdentifierAlreadyExists, DuplicateContent, InvalidRSEExpression,
UnsupportedOperation, FileAlreadyExists, RuleNotFound, RSENotFound)
UnsupportedOperation, FileAlreadyExists, RuleNotFound, RSENotFound, RuleReplaceFailed)
from Utils.MemoryCache import MemoryCache
from Utils.IteratorTools import grouper
from WMCore.Services.Rucio.RucioUtils import (validateMetaData, weightedChoice,
Expand Down Expand Up @@ -454,6 +454,30 @@ def closeBlockContainer(self, name, scope='cms'):
self.logger.error("Exception closing container/block: %s. Error: %s", name, str(ex))
return response

def moveReplicationRule(self, ruleId, rseExpression, account):
"""
Perform move operation for provided rule id and rse expression
:param ruleId: rule id
:param rseExpression: rse expression
:param account: rucio quota account
:return: it returns either an empty list or a list with a string id for the rule created
Please note, we made return type from this wrapper compatible with createReplicateRule
"""
ruleIds = []
try:
rid = self.cli.move_replication_rule(ruleId, rseExpression, account)
ruleIds.append(rid)
except RuleNotFound as ex:
msg = "RuleNotFound move DID replication rule. Error: %s" % str(ex)
raise WMRucioException(msg) from ex
except RuleReplaceFailed as ex:
msg = "RuleReplaceFailed move DID replication rule. Error: %s" % str(ex)
raise WMRucioException(msg) from ex
except Exception as ex:
msg = "Unsupported exception from Rucio API. Error: %s" % str(ex)
raise WMRucioException(msg) from ex
return ruleIds

def createReplicationRule(self, names, rseExpression, scope='cms', copies=1, **kwargs):
"""
_createReplicationRule_
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""
Unit tests for MSTransferorError.py module

Author: Valentin Kuznetsov <vkuznet [AT] gmail [DOT] com>
"""

# system modules
import unittest

# WMCore modules
from WMCore.MicroService.MSTransferor.MSTransferorError import MSTransferorStorageError, MSPILEUP_STORAGE_ERROR


class TransferorErrorTest(unittest.TestCase):
"Unit test for TransferorError module"

def testError(self):
"""Test MSTransferorError"""
rec = {'workflow': 'testWorkflow'}

# test custom emessage
msg = 'test error'
err = MSTransferorStorageError(msg, **rec)
edict = err.error()
self.assertEqual(edict['message'], msg)
self.assertEqual(edict['code'], MSPILEUP_STORAGE_ERROR)
self.assertEqual(edict['data'], rec)

# test default assigned message
err = MSTransferorStorageError('', **rec)
edict = err.error()
self.assertEqual(edict['message'], 'storage error')
self.assertEqual(edict['code'], MSPILEUP_STORAGE_ERROR)
self.assertEqual(edict['data'], rec)


if __name__ == '__main__':
unittest.main()
Loading