Skip to content

Commit

Permalink
Merge pull request #11169 from todor-ivanov/bugfix_T0_RaceConditionAr…
Browse files Browse the repository at this point in the history
…chiveDelay_fix-11154

Add extra protection for T0 to prevent archival of workflows having blocks not yet deleted
  • Loading branch information
amaltaro authored Jun 8, 2022
2 parents 38bf266 + 71d225c commit 2db755c
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
_CountUndeletedBlocksByWorkflow_
MySQL implementation of Workflows.CountUndeletedBlocksByWorkflow
Retrieves a list of workflows and the relative undeleted blocks counters,
"""


from WMCore.Database.DBFormatter import DBFormatter


class CountUndeletedBlocksByWorkflow(DBFormatter):
"""
Retrieves a list of all workflows and the relative undeleted blocks counters
The structure returned:
[{'count': 6,
'deleted': 0,
'name': 'PromptReco_Run351572_HcalNZS_Tier0_REPLAY_2022_ID220531142559_v425_220531_1430'},
{'count': 8,
'deleted': 0,
'name': 'PromptReco_Run351572_NoBPTX_Tier0_REPLAY_2022_ID220531142559_v425_220531_1430'},
...]
"""
sql = """
SELECT
dbsbuffer_workflow.name,
COUNT(DISTINCT dbsbuffer_block.blockname) as count
FROM dbsbuffer_block
INNER JOIN dbsbuffer_file ON
dbsbuffer_file.block_id = dbsbuffer_block.id
INNER JOIN dbsbuffer_workflow ON
dbsbuffer_workflow.id = dbsbuffer_file.workflow
WHERE dbsbuffer_block.deleted=0
GROUP BY
dbsbuffer_workflow.name
"""

def execute(self, conn=None, transaction=False, returnCursor=False):
"""
Executing the current sql query.
:param conn: A current database connection to be used if existing
:param transaction: A current database transaction to be used if existing
:return: A list of dictionaries one record for each database line returned
"""
dictResults = DBFormatter.formatDict(self, self.dbi.processData(self.sql, conn=conn,
transaction=transaction))
return dictResults
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env python
"""
_CountUndeletedBlocksByWorkflow_
Oracle implementation of Workflow.CountUndeletedBlocksByWorkflow
"""

from WMComponent.DBS3Buffer.MySQL.CountUndeletedBlocksByWorkflow import CountUndeletedBlocksByWorkflow as MySQLCountUndeletedBlocksByWorkflow


class CountUndeletedBlocksByWorkflow(MySQLCountUndeletedBlocksByWorkflow):
"""
Retrieves a list of all workflows and the relative deleted blocks counters
"""
33 changes: 30 additions & 3 deletions src/python/WMComponent/TaskArchiver/CleanCouchPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class CleanCouchPollerException(WMException):
Customized exception for the CleanCouchPoller
"""
pass


class CleanCouchPoller(BaseWorkerThread):
Expand Down Expand Up @@ -95,6 +94,23 @@ def __init__(self, config):
self.dashBoardUrl = getattr(config.TaskArchiver, "dashBoardUrl", None)
self.DataKeepDays = getattr(config.TaskArchiver, "DataKeepDays", 0.125) # 3 hours

# Initialise with None all setup defined variables:
self.teamName = None
self.useReqMgrForCompletionCheck = None
self.archiveDelayHours = None
self.wmstatsCouchDB = None
self.centralRequestDBReader = None
self.centralRequestDBWriter = None
self.deletableState = None
self.reqmgr2Svc = None
self.jobCouchdb = None
self.jobsdatabase = None
self.fwjrdatabase = None
self.fwjrService = None
self.workCouchdb = None
self.workdatabase = None
self.statsumdatabase = None

def setup(self, parameters=None):
"""
Called at startup
Expand Down Expand Up @@ -217,7 +233,7 @@ def archiveSummaryAndPublishToDashBoard(self, finishedwfsWithLogCollectAndCleanU
if self.dashBoardUrl is not None:
self.publishRecoPerfToDashBoard(spec)
else:
logging.warn("Workflow spec was not found for %s", workflow)
logging.warning("Workflow spec was not found for %s", workflow)

return

Expand Down Expand Up @@ -380,6 +396,16 @@ def deleteWorkflowFromWMBSAndDisk(self):
deletableWorkflowsDAO = self.daoFactory(classname="Workflow.GetDeletableWorkflows")
deletablewfs = deletableWorkflowsDAO.execute()

# For T0 subtract the workflows which are not having all their blocks deleted yet:
if not self.useReqMgrForCompletionCheck:
undeletedBlocksByWorkflowDAO = self.dbsDaoFactory(classname="CountUndeletedBlocksByWorkflow")
wfsWithUndeletedBlocks = [record['name'] for record in undeletedBlocksByWorkflowDAO.execute()]
for workflow in list(deletablewfs):
if workflow in wfsWithUndeletedBlocks:
msg = "Removing workflow: %s from the list of deletable workflows. It still has blocks NOT deleted."
self.logger.info(msg, workflow)
deletablewfs.pop(workflow)

# Only delete those where the upload and notification succeeded
logging.info("Found %d candidate workflows for deletion.", len(deletablewfs))
# update the completed flag in dbsbuffer_workflow table so blocks can be closed
Expand Down Expand Up @@ -973,7 +999,7 @@ def getPerformanceFromDQM(self, dqmUrl, dataset, run):
return False
except Exception as ex:
logging.error('Couldnt fetch DQM Performance data for dataset %s , Run %s', dataset, run)
logging.exception(ex) # Let's print the stacktrace with generic Exception
logging.exception(str(ex)) # Let's print the stacktrace with generic Exception
return False

try:
Expand All @@ -982,6 +1008,7 @@ def getPerformanceFromDQM(self, dqmUrl, dataset, run):
except Exception as ex:
logging.info("Actually got a JSON from DQM perf in for %s run %d , but content was bad, Bailing out",
dataset, run)
logging.exception(str(ex)) # Let's print the stacktrace with generic Exception
return False
# If it gets here before returning False or responseJSON, it went wrong
return False
Expand Down
109 changes: 109 additions & 0 deletions src/python/WMCore/WMBS/MySQL/Workflow/GetDeletedBlocksByWorkflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""
_GetDeletedBlocksByWorkflow_
MySQL implementation of Workflows.GetDeletedBlocksByWorkflow
Retrieves a list of workflows with lists of deleted and NOT deleted blocks per workflow,
NOTE: This DAO is not used in the production code but is to be used only for debugging purposes
"""


from WMCore.Database.DBFormatter import DBFormatter


class GetDeletedBlocksByWorkflow(DBFormatter):
"""
Retrieves a list of all workflows and the relative deleted blocks lists
"""
sql = """SELECT
dbsbuffer_block.blockname,
dbsbuffer_block.deleted,
wmbs_workflow.name
FROM dbsbuffer_block
INNER JOIN dbsbuffer_file ON
dbsbuffer_file.block_id = dbsbuffer_block.id
INNER JOIN dbsbuffer_workflow ON
dbsbuffer_workflow.id = dbsbuffer_file.workflow
INNER JOIN wmbs_workflow ON
wmbs_workflow.name = dbsbuffer_workflow.name
GROUP BY
dbsbuffer_block.blockname,
dbsbuffer_block.deleted,
wmbs_workflow.name
"""

def format(self, result):
"""
_format_
Format the query results into the proper dictionary expected at the upper layer Python code.
The input should be a list of database objects, each one representing a single line returned
from the database with key names matching the column names from the sql query.
The intermediate (not aggregated) result representing the primary database output in python should be
a list of dictionaries one record per line returned from the database with key names mapped to the
python code variable naming conventions.
e.g.
[{'blockname': '/a/b/c#123-qwe',
'deleted': 0,
'name': 'WorkflowName'},
{'blockname': '/a/b/c#456-rty',
'deleted': 1,
'name': 'WorkflowName'},
{'blockname': '/a/b/d#123-asd',
'deleted': 0,
'name': 'WorkflowName'}
...
]
NOTE:
* The number of records per workflow and block returned (i.e. number of records per group in the GROUP BY statement)
from the query is not related to either the number of blocks nor to the number of workflows, but rather to the
combination of number of files in the block and some other factor which increases the granularity (it seems to be
the number of records in dbsbuffer_workflow table per file aggregated by workflow), and NO `DISTINCT` requirement
in the SELECT statement is needed because we already have them properly grouped.
* Once deleted we should NOT expect duplicate records with two different values of the deleted
flag to be returned for a single block but we should still create the list of deleted and
NotDeleted blocks as sets and eventually create their proper intersection for double check.
This list needs to be further aggregated by name to produce an aggregated structure per workflow like:
[{'name': 'WorkflowName'
'blocksNotDeleted': ['/a/b/c#123-qwe',
/a/b/c#456-rty']
'blocksDeleted': ['/a/b/d#123-asd']
},
...
]
:param result: The result as returned by the mysql query execution.
:return: List of dictionaries
"""

# First reformat the output in a list of dictionaries per DB record
dictResults = DBFormatter.formatDict(self, result)

# Now aggregate all blocks per workflow:
results = {}
for record in dictResults:
wfName = record['name']
results.setdefault(wfName, {'name': wfName, 'blocksDeleted': [], 'blocksNotDeleted': []})
if record['deleted']:
results[wfName]['blocksDeleted'].append(record['blockname'])
else:
results[wfName]['blocksNotDeleted'].append(record['blockname'])
return results.values()

def execute(self, conn=None, transaction=False, returnCursor=False):
"""
Executing the current sql query.
:param conn: A current database connection to be used if existing
:param transaction: A current database transaction to be used if existing
:return: A list of dictionaries one record for each database line returned
"""
results = self.dbi.processData(self.sql, conn=conn,
transaction=transaction)

return self.format(results)
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env python
"""
_GetDeletedBlocksByWorkflow_
Oracle implementation of Workflow.GetDeletedBlocksByWorkflow
NOTE: This DAO is not used in the production code but is to be used only for debugging purposes
"""

from WMCore.WMBS.MySQL.Workflow.GetDeletedBlocksByWorkflow import GetDeletedBlocksByWorkflow as MySQLGetDeletedBlocksByWorkflow


class GetDeletedBlocksByWorkflow(MySQLGetDeletedBlocksByWorkflow):
"""
Retrieves a list of all workflows and the relative deleted blocks lists
"""

0 comments on commit 2db755c

Please sign in to comment.