-
Notifications
You must be signed in to change notification settings - Fork 3
/
localWorkQueueStatus.py
152 lines (129 loc) · 7.28 KB
/
localWorkQueueStatus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
__localWorkQueueStatus.py__
It gives you an overview of the local workqueue and workqueue_inbox database
focusing on the elements status.
Created on Apr 27, 2015.
@author: amaltaro
"""
import sys
import os
import logging
import argparse
from pprint import pformat
from WMCore.Configuration import loadConfigurationFile
from WMCore.WorkQueue.WorkQueueBackend import WorkQueueBackend
parser = argparse.ArgumentParser(description="Local workqueue monitoring")
parser.add_argument('-v', '--verbose', help='Increase output verbosity', action='store_true')
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
def createElementsSummary(allElements, dbName):
"""
Print the local couchdb situation based on the WQE status
"""
summary = {}
for elem in allElements:
summary.setdefault(elem['Status'], 0)
summary[elem['Status']] += 1
logging.info("Found a total of %d elements in the '%s' db", len(allElements), dbName)
logging.info(pformat(summary))
return summary.keys()
def byStatusSummary(elemByStatus, localWQInboxDB=None):
"""
Parse each element to build up a summary based on the element status.
"""
stuckElements = []
workSplitBySite = {} # we divide the number of jobs by the number of common sites
workBySite = {} # we simply add Jobs to each of the common sites
workOverview = {'totalGoodJobs': 0, 'totalBadJobs': 0, 'totalAvailableGoodLQE': 0, 'totalAvailableBadLQE': 0}
for elem in elemByStatus:
if elem.get('NoLocationUpdate'):
commonSites = set(elem['SiteWhitelist'])
logging.debug("NoLocationUpdate element assigned to %s", commonSites)
elif elem['NoInputUpdate'] and elem['NoPileupUpdate']:
commonSites = set(elem['SiteWhitelist'])
logging.debug("NoInputUpdate AND NoPileupUpdate element assigned to %s", commonSites)
elif elem['NoInputUpdate']:
puSites = elem['PileupData'].values()[0] if elem['PileupData'] else elem['SiteWhitelist']
commonSites = set(puSites) & set(elem['SiteWhitelist'])
logging.debug("NoInputUpdate element with pileup location and sitewhitelist intersection as: %s", commonSites)
elif elem['NoPileupUpdate']:
inputSites = elem['Inputs'].values()[0] if elem['Inputs'] else elem['SiteWhitelist']
commonSites = set(inputSites) & set(elem['SiteWhitelist'])
logging.debug("NoPileupUpdate element with input location and sitewhitelist intersection as: %s", commonSites)
else:
inputSites = elem['Inputs'].values()[0] if elem['Inputs'] else elem['SiteWhitelist']
puSites = elem['PileupData'].values()[0] if elem['PileupData'] else elem['SiteWhitelist']
commonSites = set(inputSites) & set(puSites) & set(elem['SiteWhitelist'])
logging.debug("Unflagged element with input location and sitewhitelist intersection as: %s", commonSites)
if not commonSites:
workOverview['totalBadJobs'] += elem['Jobs']
workOverview['totalAvailableBadLQE'] += 1
tempElem = {'RequestName': elem['RequestName'],
'id': elem.id,
'NoInputUpdate': elem['NoInputUpdate'],
'NoPileupUpdate': elem['NoPileupUpdate'],
'Inputs': elem['Inputs'].values()[0] if elem['Inputs'] else [],
'PileupData': elem['PileupData'].values()[0] if elem['PileupData'] else [],
'SiteWhitelist': elem['SiteWhitelist']}
# now get the location where it was supposed to be
if localWQInboxDB:
inboxDoc = localWQInboxDB.getElements(elementIDs=[elem.id])[0]
tempElem['OriginalInputLocation'] = inboxDoc['Inputs']
stuckElements.append(tempElem)
else:
workOverview['totalGoodJobs'] += elem['Jobs']
workOverview['totalAvailableGoodLQE'] += 1
for site in commonSites:
workSplitBySite.setdefault(site, 0)
workSplitBySite[site] += elem['Jobs']/len(commonSites)
workBySite.setdefault(site, {'Jobs': 0, 'LQE': 0})
workBySite[site]['Jobs'] += elem['Jobs']
workBySite[site]['LQE'] += 1
#if site == 'T2_CH_CERN':
# logging.info("%s, id %s with %d jobs to process", elem['RequestName'], elem.id, elem['Jobs'])
# Report on site vs jobs vs elements situation
logging.info("Average number of UNIQUE jobs per site:\n%s\n", pformat(workSplitBySite))
logging.info("Maximum number of POSSIBLE jobs per site:\n%s\n", pformat(workBySite))
if elemByStatus[0]['Status'] == 'Available' and localWQInboxDB:
logging.info("Found %d elements stuck in Available in local workqueue with no common site/data location:", len(stuckElements))
for elem in stuckElements:
logging.info(" %s with docid %s, site whitelist set to %s while input %s only available at %s", elem['RequestName'],
elem['id'],
elem['SiteWhitelist'],
elem['OriginalInputLocation'].keys(),
elem['Inputs'])
logging.debug("%s\n", pformat(stuckElements))
def main():
"""
Whatever
"""
if 'WMAGENT_CONFIG' not in os.environ:
os.environ['WMAGENT_CONFIG'] = '/data/srv/wmagent/current/config/wmagent/config.py'
config = loadConfigurationFile(os.environ["WMAGENT_CONFIG"])
# Get local workqueue and workqueue_inbox docs
localWQBackend = WorkQueueBackend(config.WorkQueueManager.couchurl, db_name="workqueue")
localWQInboxDB = WorkQueueBackend(config.WorkQueueManager.couchurl, db_name="workqueue_inbox")
wqDocIDs = localWQBackend.getElements()
wqInboxDocIDs = localWQInboxDB.getElements()
# Build and print a summary of these elements
logging.info("************* LOCAL workqueue elements summary ************")
foundStatus = createElementsSummary(wqInboxDocIDs, 'workqueue_inbox')
foundStatus = createElementsSummary(wqDocIDs, 'workqueue')
# Now investigate docs in the workqueue database
for status in foundStatus:
logging.info("\n************* workqueue elements summary by status: %s ************", status)
elemByStatus = [x for x in wqDocIDs if x['Status'] == status]
byStatusSummary(elemByStatus, localWQInboxDB=localWQInboxDB)
# time to look up at central global queue
logging.info("\n************* GLOBAL workqueue elements summary ************")
globalWQBackend = WorkQueueBackend(config.WorkloadSummary.couchurl, db_name="workqueue")
gqDocIDs = globalWQBackend.getElements(status='Available')
_ = createElementsSummary(gqDocIDs, 'workqueue')
#logging.info("Found %d 'Available' docs in global workqueue database", len(gqDocIDs))
byStatusSummary(gqDocIDs)
sys.exit(0)
if __name__ == "__main__":
sys.exit(main())