Skip to content

Commit

Permalink
Merge pull request #11024 from amaltaro/fix-10106
Browse files Browse the repository at this point in the history
MSOutput: read RelVal output data policy from configuration
  • Loading branch information
todor-ivanov authored Mar 8, 2022
2 parents 1592300 + de788f3 commit a1da43b
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 19 deletions.
33 changes: 14 additions & 19 deletions src/python/WMCore/MicroService/MSOutput/MSOutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
from WMCore.MicroService.MSCore import MSCore
from WMCore.MicroService.Tools.Common import gigaBytes
from WMCore.Services.CRIC.CRIC import CRIC
from WMCore.Services.DBS.DBS3Reader import getDataTiers
from Utils.Pipeline import Pipeline, Functor
from WMCore.Database.MongoDB import MongoDB
from WMCore.MicroService.MSOutput.MSOutputTemplate import MSOutputTemplate
from WMCore.MicroService.MSOutput.RelValPolicy import RelValPolicy
from WMCore.WMException import WMException
from WMCore.Services.AlertManager.AlertManagerAPI import AlertManagerAPI

Expand Down Expand Up @@ -97,20 +99,25 @@ def __init__(self, msConfig, mode, logger=None):
# fetch documents created in the last 6 months (default value)
self.msConfig.setdefault("mongoDocsCreatedSecs", 6 * 30 * 24 * 60 * 60)
self.msConfig.setdefault("sendNotification", False)
self.msConfig.setdefault("relvalPolicy", [])

self.uConfig = {}
# service name used to route alerts via AlertManager
self.alertServiceName = "ms-output"
self.alertManagerAPI = AlertManagerAPI(self.msConfig.get("alertManagerUrl", None), logger=logger)

# RelVal output data placement policy from the service configuration
self.msConfig.setdefault("dbsUrl", "https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader")
allDBSDatatiers = getDataTiers(self.msConfig['dbsUrl'])
allDiskRSEs = self.rucio.evaluateRSEExpression("*", returnTape=False)
self.relvalPolicy = RelValPolicy(self.msConfig['relvalPolicy'],
allDBSDatatiers, allDiskRSEs, logger=logger)

self.cric = CRIC(logger=self.logger)
self.uConfig = {}
self.campaigns = {}
self.psn2pnnMap = {}

self.tapeStatus = dict()
for endpoint, quota in viewitems(self.msConfig['tapePledges']):
self.tapeStatus[endpoint] = dict(quota=quota, usage=0, remaining=0)

msOutIndex = IndexModel('RequestName', unique=True)
msOutDBConfig = {
'database': 'msOutDB',
Expand Down Expand Up @@ -665,22 +672,10 @@ def docInfoUpdate(self, msOutDoc):
dataItem['Copies'] = 1

if msOutDoc['IsRelVal']:
_, dsn, procString, dataTier = dataItem['Dataset'].split('/')
destination = set()
if dataTier != "RECO" and dataTier != "ALCARECO":
destination.add('T2_CH_CERN')
if dataTier == "GEN-SIM":
destination.add('T1_US_FNAL_Disk')
if dataTier == "GEN-SIM-DIGI-RAW":
destination.add('T1_US_FNAL_Disk')
if dataTier == "GEN-SIM-RECO":
destination.add('T1_US_FNAL_Disk')
if "RelValTTBar" in dsn and "TkAlMinBias" in procString and dataTier != "ALCARECO":
destination.add('T2_CH_CERN')
if "MinimumBias" in dsn and "SiStripCalMinBias" in procString and dataTier != "ALCARECO":
destination.add('T2_CH_CERN')

destination = self.relvalPolicy.getDestinationByDataset(dataItem['Dataset'])
if destination:
# ensure each RelVal destination gets a copy of the data
dataItem['Copies'] = len(destination)
dataItem['DiskDestination'] = '|'.join(destination)
else:
self.logger.warning("RelVal dataset: %s without any destination", dataItem['Dataset'])
Expand Down
123 changes: 123 additions & 0 deletions src/python/WMCore/MicroService/MSOutput/RelValPolicy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Module to store the RelVal output data placement policy and
make decisions based on that
"""
from __future__ import print_function, division

from copy import deepcopy
import json
from WMCore.MicroService.Tools.Common import getMSLogger
from WMCore.WMException import WMException


class RelValPolicyException(WMException):
"""
General exception to be raised when a flaw is found in the RelVal
output data placement policy
"""
pass


class RelValPolicy():
"""
This module will contain the RelVal output data placement policy, where
destinations will be decided according to the dataset datatier.
It's supposed to hold a policy driven by dataset datatier, and it's
data structure looks like:
[{"datatier": "tier_1", "destinations": ["rse_name_1", "rse_name_2"]},
{"datatier": "tier_2", "destinations": ["rse_name_2"]},
{"datatier": "default", "destinations": ["rse_name_3"]}]
the 'default' key matches the case where a datatier is not specified
in the policy.
"""

def __init__(self, policyDesc, listDatatiers, listRSEs, logger=None):
"""
Given a policy data structure - as a list of dictionaries - it
will validate the policy, the datatiers and RSEs defined in it,
and it will convert the policy into a flat dictionary for easier
data lookup.
:param policyDesc: list of dictionary items with the output rules
:param listDatatiers: flat list of existent datatiers in DBS
:param listRSEs: flat list of existent Disk RSEs in Rucio
:param logger: logger object, if any
"""
self.origPolicy = deepcopy(policyDesc)

self.logger = getMSLogger(verbose=False, logger=logger)

self._validatePolicy(policyDesc, listDatatiers, listRSEs)
self.dictPolicy = self._convertPolicy(policyDesc)

def __str__(self):
"""
Stringify this object, printing the original policy
"""
objectOut = dict(originalPolicy=self.origPolicy, mappedPolicy=self.dictPolicy)
return json.dumps(objectOut)

def _validatePolicy(self, policyDesc, validDBSTiers, validDiskRSEs):
"""
This method validates the overall policy data structure, including:
* internal and external data types
* whether the datatiers exist in DBS
* whether the RSEs exist in Rucio
:param policyDesc: list of dictionaries with the policy definition
:param validDBSTiers: list with existent DBS datatiers
:param validDiskRSEs: list with existent Rucio Disk RSEs
:return: nothing, but it will raise an exception if any validation fails
"""
if not isinstance(policyDesc, list):
msg = "The RelVal output data placement policy is not in the expected data type. "
msg += "Type expected: list, while the current data type is: {}. ".format(type(policyDesc))
msg += "This critical ERROR must be fixed."
raise RelValPolicyException(msg) from None

# policy must have a default/fallback destination for datatiers not explicitly listed
hasDefault = False
for item in policyDesc:
# validate the datatier
if not isinstance(item['datatier'], str):
msg = "The 'datatier' parameter must be a string, not {}.".format(type(item['datatier']))
raise RelValPolicyException(msg) from None
if item['datatier'] == "default":
hasDefault = True
elif item['datatier'] not in validDBSTiers:
raise RelValPolicyException("Datatier '{}' does not exist in DBS.".format(item['datatier']))

# validate the destinations
if not isinstance(item['destinations'], list):
msg = "The 'destinations' parameter must be a list, not {}".format(type(item['destinations']))
raise RelValPolicyException(msg) from None
for rseName in item['destinations']:
if rseName not in validDiskRSEs:
msg = "Destinations '{}' does not exist in Rucio.".format(rseName)
raise RelValPolicyException(msg) from None

if hasDefault is False:
msg = "A 'default' key must be defined with default destinations."
raise RelValPolicyException(msg) from None

def _convertPolicy(self, policyDesc):
"""
Maps the RelVal data policy to a flat dictionary key'ed by datatiers
:param policyDesc: list of dictionaries with the policy definition
:return: a dictionary with a map of the RelVal policy
"""
outputPolicy = dict()
for item in policyDesc:
outputPolicy.update({item['datatier']: item['destinations']})
return outputPolicy

def getDestinationByDataset(self, dsetName):
"""
Provided a dataset name, return the destination defined for its datatier.
:param dsetName: a string with the full dataset name
:return: a list of locations
"""
_, dsn, procString, dataTier = dsetName.split('/')
return self.dictPolicy.get(dataTier, self.dictPolicy['default'])
83 changes: 83 additions & 0 deletions test/python/WMCore_t/MicroService_t/MSOutput_t/RelValPolicy_t.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
Unit tests for the WMCore/MicroService/MSOutput/RelValPolicy.py module
"""
from __future__ import division, print_function

import unittest
import json
from WMCore.MicroService.MSOutput.RelValPolicy import RelValPolicy, RelValPolicyException


class RelValPolicyTests(unittest.TestCase):
"""Unit tests for the RelValPolicy module"""

def setUp(self):
"""Basic setup for each unit test"""
pass

def tearDown(self):
"""Basic tear down operation when leaving each unit test"""
pass

def testBrokenPolicy(self):
"""Tests for the RelValPolicy class with a broken policy"""
validDatatiers = ["GEN-SIM", "GEN", "SIM", "AOD"]
validRSEs = ["rse_1", "rse_2", "rse_3"]

# test output policies with a wrong data type
for testPolicy in [None, {}, "blah", 123]:
with self.assertRaises(RelValPolicyException):
RelValPolicy(testPolicy, validDatatiers, validRSEs)

# test internal structure with wrong data type for datatier
testPolicy = [{"datatier": ["tier1", "tier2"], "destinations": ["rse_1", "rse_2"]},
{"datatier": "default", "destinations": ["rse_1"]}]
with self.assertRaises(RelValPolicyException):
RelValPolicy(testPolicy, validDatatiers, validRSEs)

# test internal structure with wrong data type for destinations
testPolicy = [{"datatier": "tier1", "destinations": "rse_1"},
{"datatier": "default", "destinations": ["rse_1"]}]
with self.assertRaises(RelValPolicyException):
RelValPolicy(testPolicy, validDatatiers, validRSEs)

# test internal structure missing the required "default" key/value pair
testPolicy = [{"datatier": "GEN", "destinations": ["rse_1", "rse_2"]}]
with self.assertRaises(RelValPolicyException):
RelValPolicy(testPolicy, validDatatiers, validRSEs)

def testValidPolicy(self):
"""Tests for the RelValPolicy class with a valid policy"""
validDatatiers = ["GEN-SIM", "GEN", "SIM", "AOD"]
validRSEs = ["rse_1", "rse_2", "rse_3"]

testPolicy = [{"datatier": "SIM", "destinations": ["rse_1", "rse_2"]},
{"datatier": "GEN-SIM", "destinations": ["rse_1"]},
{"datatier": "default", "destinations": ["rse_2"]}]
policyObj = RelValPolicy(testPolicy, validDatatiers, validRSEs)

# now test the method to get destinations for a given dataset (datatier)
for policyItem in testPolicy:
resp = policyObj.getDestinationByDataset("/PD/ProcStr-v1/{}".format(policyItem['datatier']))
self.assertEqual(resp, policyItem['destinations'])

# and this should fallback to the 'default' case
resp = policyObj.getDestinationByDataset("/PD/ProcStr-v1/BLAH")
self.assertEqual(resp, ["rse_2"])

def testStringification(self):
"""Test the stringification of the RelValPolicy object"""
validDatatiers = []
validRSEs = ["rse_2"]
testPolicy = [{"datatier": "default", "destinations": ["rse_2"]}]

policyObj = str(RelValPolicy(testPolicy, validDatatiers, validRSEs))

self.assertTrue(isinstance(policyObj, str))
policyObj = json.loads(policyObj)
self.assertCountEqual(policyObj["originalPolicy"], testPolicy)
self.assertCountEqual(policyObj["mappedPolicy"], {"default": ["rse_2"]})


if __name__ == '__main__':
unittest.main()

0 comments on commit a1da43b

Please sign in to comment.