From 71d225c21eebca1b123e7bc9d937c61dd0242b49 Mon Sep 17 00:00:00 2001 From: Todor Ivanov Date: Tue, 31 May 2022 17:38:11 +0200 Subject: [PATCH] Add extra protection for T0 to prevent archival of workflows having blocks not yet deleted. Add GetDeletedBlocksByWorkflow DAO to WMBS. Aggregate all results in the DAO per workflowName Add extra protection for T0 at cleanCouchPoller Typo Update docstrings and log messages Remove redundant statements: Remove redundant range() start argument Remove redundant pass statement Remove redundant DISTINCT statement Typo Add CountUndeletedBlocksByWorkflow DAO && Decrease execution complexity in workflows with undeleted blocks check. Change log level to info. remove keynames remapping from GetDeletedBlocksByWorkflow DAO Pylint fixes. Review fixes --- .../MySQL/CountUndeletedBlocksByWorkflow.py | 49 ++++++++ .../Oracle/CountUndeletedBlocksByWorkflow.py | 14 +++ .../TaskArchiver/CleanCouchPoller.py | 33 +++++- .../Workflow/GetDeletedBlocksByWorkflow.py | 109 ++++++++++++++++++ .../Workflow/GetDeletedBlocksByWorkflow.py | 16 +++ 5 files changed, 218 insertions(+), 3 deletions(-) create mode 100644 src/python/WMComponent/DBS3Buffer/MySQL/CountUndeletedBlocksByWorkflow.py create mode 100644 src/python/WMComponent/DBS3Buffer/Oracle/CountUndeletedBlocksByWorkflow.py create mode 100644 src/python/WMCore/WMBS/MySQL/Workflow/GetDeletedBlocksByWorkflow.py create mode 100644 src/python/WMCore/WMBS/Oracle/Workflow/GetDeletedBlocksByWorkflow.py diff --git a/src/python/WMComponent/DBS3Buffer/MySQL/CountUndeletedBlocksByWorkflow.py b/src/python/WMComponent/DBS3Buffer/MySQL/CountUndeletedBlocksByWorkflow.py new file mode 100644 index 0000000000..2851ae06d5 --- /dev/null +++ b/src/python/WMComponent/DBS3Buffer/MySQL/CountUndeletedBlocksByWorkflow.py @@ -0,0 +1,49 @@ +""" +_CountUndeletedBlocksByWorkflow_ + +MySQL implementation of Workflows.CountUndeletedBlocksByWorkflow + +Retrieves a list of workflows and the relative undeleted blocks counters, +""" + + +from WMCore.Database.DBFormatter import DBFormatter + + +class CountUndeletedBlocksByWorkflow(DBFormatter): + """ + Retrieves a list of all workflows and the relative undeleted blocks counters + + The structure returned: + [{'count': 6, + 'deleted': 0, + 'name': 'PromptReco_Run351572_HcalNZS_Tier0_REPLAY_2022_ID220531142559_v425_220531_1430'}, + {'count': 8, + 'deleted': 0, + 'name': 'PromptReco_Run351572_NoBPTX_Tier0_REPLAY_2022_ID220531142559_v425_220531_1430'}, + ...] + """ + sql = """ + SELECT + dbsbuffer_workflow.name, + COUNT(DISTINCT dbsbuffer_block.blockname) as count + FROM dbsbuffer_block + INNER JOIN dbsbuffer_file ON + dbsbuffer_file.block_id = dbsbuffer_block.id + INNER JOIN dbsbuffer_workflow ON + dbsbuffer_workflow.id = dbsbuffer_file.workflow + WHERE dbsbuffer_block.deleted=0 + GROUP BY + dbsbuffer_workflow.name + """ + + def execute(self, conn=None, transaction=False, returnCursor=False): + """ + Executing the current sql query. + :param conn: A current database connection to be used if existing + :param transaction: A current database transaction to be used if existing + :return: A list of dictionaries one record for each database line returned + """ + dictResults = DBFormatter.formatDict(self, self.dbi.processData(self.sql, conn=conn, + transaction=transaction)) + return dictResults diff --git a/src/python/WMComponent/DBS3Buffer/Oracle/CountUndeletedBlocksByWorkflow.py b/src/python/WMComponent/DBS3Buffer/Oracle/CountUndeletedBlocksByWorkflow.py new file mode 100644 index 0000000000..383b2a8250 --- /dev/null +++ b/src/python/WMComponent/DBS3Buffer/Oracle/CountUndeletedBlocksByWorkflow.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +""" +_CountUndeletedBlocksByWorkflow_ + +Oracle implementation of Workflow.CountUndeletedBlocksByWorkflow +""" + +from WMComponent.DBS3Buffer.MySQL.CountUndeletedBlocksByWorkflow import CountUndeletedBlocksByWorkflow as MySQLCountUndeletedBlocksByWorkflow + + +class CountUndeletedBlocksByWorkflow(MySQLCountUndeletedBlocksByWorkflow): + """ + Retrieves a list of all workflows and the relative deleted blocks counters + """ diff --git a/src/python/WMComponent/TaskArchiver/CleanCouchPoller.py b/src/python/WMComponent/TaskArchiver/CleanCouchPoller.py index a5db580b06..a2d5bf7b4a 100644 --- a/src/python/WMComponent/TaskArchiver/CleanCouchPoller.py +++ b/src/python/WMComponent/TaskArchiver/CleanCouchPoller.py @@ -45,7 +45,6 @@ class CleanCouchPollerException(WMException): Customized exception for the CleanCouchPoller """ - pass class CleanCouchPoller(BaseWorkerThread): @@ -95,6 +94,23 @@ def __init__(self, config): self.dashBoardUrl = getattr(config.TaskArchiver, "dashBoardUrl", None) self.DataKeepDays = getattr(config.TaskArchiver, "DataKeepDays", 0.125) # 3 hours + # Initialise with None all setup defined variables: + self.teamName = None + self.useReqMgrForCompletionCheck = None + self.archiveDelayHours = None + self.wmstatsCouchDB = None + self.centralRequestDBReader = None + self.centralRequestDBWriter = None + self.deletableState = None + self.reqmgr2Svc = None + self.jobCouchdb = None + self.jobsdatabase = None + self.fwjrdatabase = None + self.fwjrService = None + self.workCouchdb = None + self.workdatabase = None + self.statsumdatabase = None + def setup(self, parameters=None): """ Called at startup @@ -217,7 +233,7 @@ def archiveSummaryAndPublishToDashBoard(self, finishedwfsWithLogCollectAndCleanU if self.dashBoardUrl is not None: self.publishRecoPerfToDashBoard(spec) else: - logging.warn("Workflow spec was not found for %s", workflow) + logging.warning("Workflow spec was not found for %s", workflow) return @@ -380,6 +396,16 @@ def deleteWorkflowFromWMBSAndDisk(self): deletableWorkflowsDAO = self.daoFactory(classname="Workflow.GetDeletableWorkflows") deletablewfs = deletableWorkflowsDAO.execute() + # For T0 subtract the workflows which are not having all their blocks deleted yet: + if not self.useReqMgrForCompletionCheck: + undeletedBlocksByWorkflowDAO = self.dbsDaoFactory(classname="CountUndeletedBlocksByWorkflow") + wfsWithUndeletedBlocks = [record['name'] for record in undeletedBlocksByWorkflowDAO.execute()] + for workflow in list(deletablewfs): + if workflow in wfsWithUndeletedBlocks: + msg = "Removing workflow: %s from the list of deletable workflows. It still has blocks NOT deleted." + self.logger.info(msg, workflow) + deletablewfs.pop(workflow) + # Only delete those where the upload and notification succeeded logging.info("Found %d candidate workflows for deletion.", len(deletablewfs)) # update the completed flag in dbsbuffer_workflow table so blocks can be closed @@ -973,7 +999,7 @@ def getPerformanceFromDQM(self, dqmUrl, dataset, run): return False except Exception as ex: logging.error('Couldnt fetch DQM Performance data for dataset %s , Run %s', dataset, run) - logging.exception(ex) # Let's print the stacktrace with generic Exception + logging.exception(str(ex)) # Let's print the stacktrace with generic Exception return False try: @@ -982,6 +1008,7 @@ def getPerformanceFromDQM(self, dqmUrl, dataset, run): except Exception as ex: logging.info("Actually got a JSON from DQM perf in for %s run %d , but content was bad, Bailing out", dataset, run) + logging.exception(str(ex)) # Let's print the stacktrace with generic Exception return False # If it gets here before returning False or responseJSON, it went wrong return False diff --git a/src/python/WMCore/WMBS/MySQL/Workflow/GetDeletedBlocksByWorkflow.py b/src/python/WMCore/WMBS/MySQL/Workflow/GetDeletedBlocksByWorkflow.py new file mode 100644 index 0000000000..009b7769b7 --- /dev/null +++ b/src/python/WMCore/WMBS/MySQL/Workflow/GetDeletedBlocksByWorkflow.py @@ -0,0 +1,109 @@ +""" +_GetDeletedBlocksByWorkflow_ + +MySQL implementation of Workflows.GetDeletedBlocksByWorkflow + +Retrieves a list of workflows with lists of deleted and NOT deleted blocks per workflow, + +NOTE: This DAO is not used in the production code but is to be used only for debugging purposes +""" + + +from WMCore.Database.DBFormatter import DBFormatter + + +class GetDeletedBlocksByWorkflow(DBFormatter): + """ + Retrieves a list of all workflows and the relative deleted blocks lists + """ + sql = """SELECT + dbsbuffer_block.blockname, + dbsbuffer_block.deleted, + wmbs_workflow.name + FROM dbsbuffer_block + INNER JOIN dbsbuffer_file ON + dbsbuffer_file.block_id = dbsbuffer_block.id + INNER JOIN dbsbuffer_workflow ON + dbsbuffer_workflow.id = dbsbuffer_file.workflow + INNER JOIN wmbs_workflow ON + wmbs_workflow.name = dbsbuffer_workflow.name + GROUP BY + dbsbuffer_block.blockname, + dbsbuffer_block.deleted, + wmbs_workflow.name + """ + + def format(self, result): + """ + _format_ + + Format the query results into the proper dictionary expected at the upper layer Python code. + The input should be a list of database objects, each one representing a single line returned + from the database with key names matching the column names from the sql query. + + The intermediate (not aggregated) result representing the primary database output in python should be + a list of dictionaries one record per line returned from the database with key names mapped to the + python code variable naming conventions. + + e.g. + [{'blockname': '/a/b/c#123-qwe', + 'deleted': 0, + 'name': 'WorkflowName'}, + {'blockname': '/a/b/c#456-rty', + 'deleted': 1, + 'name': 'WorkflowName'}, + {'blockname': '/a/b/d#123-asd', + 'deleted': 0, + 'name': 'WorkflowName'} + ... + ] + + NOTE: + * The number of records per workflow and block returned (i.e. number of records per group in the GROUP BY statement) + from the query is not related to either the number of blocks nor to the number of workflows, but rather to the + combination of number of files in the block and some other factor which increases the granularity (it seems to be + the number of records in dbsbuffer_workflow table per file aggregated by workflow), and NO `DISTINCT` requirement + in the SELECT statement is needed because we already have them properly grouped. + * Once deleted we should NOT expect duplicate records with two different values of the deleted + flag to be returned for a single block but we should still create the list of deleted and + NotDeleted blocks as sets and eventually create their proper intersection for double check. + + This list needs to be further aggregated by name to produce an aggregated structure per workflow like: + + [{'name': 'WorkflowName' + 'blocksNotDeleted': ['/a/b/c#123-qwe', + /a/b/c#456-rty'] + 'blocksDeleted': ['/a/b/d#123-asd'] + }, + ... + ] + + :param result: The result as returned by the mysql query execution. + :return: List of dictionaries + """ + + # First reformat the output in a list of dictionaries per DB record + dictResults = DBFormatter.formatDict(self, result) + + # Now aggregate all blocks per workflow: + results = {} + for record in dictResults: + wfName = record['name'] + results.setdefault(wfName, {'name': wfName, 'blocksDeleted': [], 'blocksNotDeleted': []}) + if record['deleted']: + results[wfName]['blocksDeleted'].append(record['blockname']) + else: + results[wfName]['blocksNotDeleted'].append(record['blockname']) + return results.values() + + def execute(self, conn=None, transaction=False, returnCursor=False): + """ + Executing the current sql query. + :param conn: A current database connection to be used if existing + :param transaction: A current database transaction to be used if existing + :return: A list of dictionaries one record for each database line returned + """ + results = self.dbi.processData(self.sql, conn=conn, + transaction=transaction) + + return self.format(results) diff --git a/src/python/WMCore/WMBS/Oracle/Workflow/GetDeletedBlocksByWorkflow.py b/src/python/WMCore/WMBS/Oracle/Workflow/GetDeletedBlocksByWorkflow.py new file mode 100644 index 0000000000..a5d264e956 --- /dev/null +++ b/src/python/WMCore/WMBS/Oracle/Workflow/GetDeletedBlocksByWorkflow.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python +""" +_GetDeletedBlocksByWorkflow_ + +Oracle implementation of Workflow.GetDeletedBlocksByWorkflow + +NOTE: This DAO is not used in the production code but is to be used only for debugging purposes +""" + +from WMCore.WMBS.MySQL.Workflow.GetDeletedBlocksByWorkflow import GetDeletedBlocksByWorkflow as MySQLGetDeletedBlocksByWorkflow + + +class GetDeletedBlocksByWorkflow(MySQLGetDeletedBlocksByWorkflow): + """ + Retrieves a list of all workflows and the relative deleted blocks lists + """