This repository has been archived by the owner on Feb 24, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #4517 from dciangot/asoOracle
ASO Oracle
- Loading branch information
Showing
11 changed files
with
1,707 additions
and
760 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
#!/usr/bin/env python | ||
""" | ||
Kibana monitor script for OracleAso | ||
""" | ||
from __future__ import print_function | ||
from __future__ import division | ||
|
||
import os | ||
import sys | ||
import json | ||
import time | ||
import pycurl | ||
import urllib | ||
import urllib2 | ||
import httplib | ||
import logging | ||
import datetime | ||
import subprocess | ||
from urlparse import urljoin | ||
from socket import gethostname | ||
from optparse import OptionParser | ||
|
||
from RESTInteractions import HTTPRequests | ||
from ServerUtilities import encodeRequest, oracleOutputMapping | ||
from ServerUtilities import TRANSFERDB_STATES, PUBLICATIONDB_STATES | ||
|
||
|
||
def check_availability(): | ||
"""put here your availability logic, """ | ||
return 1 | ||
|
||
def generate_xml(input): | ||
from xml.etree.ElementTree import Element, SubElement, tostring | ||
from pprint import pprint | ||
xmllocation = './ASO_XML_Report.xml' | ||
logger = logging.getLogger() | ||
handler = logging.StreamHandler(sys.stdout) | ||
formatter = logging.Formatter("%(asctime)s:%(levelname)s:%(module)s %(message)s", | ||
datefmt="%a, %d %b %Y %H:%M:%S %Z(%z)") | ||
handler.setFormatter(formatter) | ||
logger.addHandler(handler) | ||
logger.setLevel(logging.DEBUG) | ||
|
||
root = Element('serviceupdate') | ||
root.set( "xmlns", "http://sls.cern.ch/SLS/XML/update") | ||
child = SubElement(root, "id") | ||
|
||
# change this to a name which you will use in kibana queries(for example vocms031 or any other name) | ||
# or just uncomment next line to use the hostname of the machine which is running this script | ||
# child.text = gethostname().split('.')[0] | ||
child.text = "oramon-testbed" | ||
|
||
fmt = "%Y-%m-%dT%H:%M:%S%z" | ||
now_utc = datetime.datetime.now().strftime(fmt) | ||
child_timestamp = SubElement(root, "timestamp") | ||
child_timestamp.text = str(now_utc) | ||
|
||
child_status = SubElement(root,"status") | ||
|
||
# when you have a way to calculate the availability for your service | ||
# change the function check_availability, for now it will | ||
# always return 1(available) | ||
if check_availability() == 1: | ||
# This means that everything is fine | ||
child_status.text = "available" | ||
else: | ||
child_status.text = "degraded" | ||
|
||
# now put all numeric values her | ||
data = SubElement(root, "data") | ||
|
||
for key in input.keys(): | ||
if isinstance(input[key],dict): | ||
for skey in input[key]: | ||
nName="%s_%s"%(key,skey) | ||
nValue=input[key][skey] | ||
numericval = SubElement(data, "numericvalue") | ||
numericval.set("name",nName) | ||
numericval.text = str(nValue) | ||
|
||
temp_xmllocation = xmllocation + ".temp" | ||
while True: | ||
try: | ||
with open(temp_xmllocation, 'w') as f: | ||
f.write(tostring(root)) | ||
os.system('mv %s %s' % (temp_xmllocation, xmllocation)) | ||
break | ||
except Exception, e: | ||
logger.debug(str(e)) | ||
continue | ||
|
||
# push the XML to elasticSearch | ||
maxi = 0 | ||
while maxi < 3: | ||
cmd = "curl -i -F file=@%s xsls.cern.ch"%xmllocation | ||
try: | ||
pu = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) | ||
break | ||
except Exception, e: | ||
logger.debug(str(e)) | ||
maxi = maxi + 1 | ||
continue | ||
|
||
if __name__ == "__main__": | ||
server = HTTPRequests('cmsweb-testbed.cern.ch', | ||
'/data/srv/asyncstageout/state/asyncstageout/creds/OpsProxy', | ||
'/data/srv/asyncstageout/state/asyncstageout/creds/OpsProxy') | ||
|
||
result = server.get('/crabserver/preprod/filetransfers', | ||
data=encodeRequest({'subresource': 'groupedTransferStatistics', 'grouping': 0})) | ||
|
||
results = oracleOutputMapping(result) | ||
|
||
|
||
status = {'transfers':{}, 'publications':{}} | ||
tmp = {'transfers':{ 'DONE':0, 'ACQUIRED':0, 'SUBMITTED':0, 'FAILED':0, 'RETRY':0 }, | ||
'publications':{'DONE':0, 'ACQUIRED':0, 'NEW':0, 'FAILED':0, 'RETRY':0}} | ||
|
||
#past = open("tmp_transfer") | ||
#tmp = json.load(past) | ||
|
||
for doc in results: | ||
if doc['aso_worker']=="asodciangot1": | ||
if 'transfers' in tmp and TRANSFERDB_STATES[doc['transfer_state']] in tmp['transfers']: | ||
status['transfers'][TRANSFERDB_STATES[doc['transfer_state']]] = - tmp['transfers'][TRANSFERDB_STATES[doc['transfer_state']]] + doc['nt'] | ||
tmp['transfers'][TRANSFERDB_STATES[doc['transfer_state']]] = doc['nt'] | ||
else: | ||
status['transfers'][TRANSFERDB_STATES[doc['transfer_state']]] = doc['nt'] | ||
tmp['transfers'][TRANSFERDB_STATES[doc['transfer_state']]] = doc['nt'] | ||
|
||
result = server.get('/crabserver/preprod/filetransfers', | ||
data=encodeRequest({'subresource': 'groupedPublishStatistics', 'grouping': 0})) | ||
|
||
results = oracleOutputMapping(result) | ||
|
||
for doc in results: | ||
if doc['aso_worker']=="asodciangot1": | ||
if 'publications' in tmp and PUBLICATIONDB_STATES[doc['publication_state']] in tmp['publications']: | ||
status['publications'][PUBLICATIONDB_STATES[doc['publication_state']]] = -tmp['publications'][PUBLICATIONDB_STATES[doc['publication_state']]] + doc['nt'] | ||
tmp['publications'][PUBLICATIONDB_STATES[doc['publication_state']]] = doc['nt'] | ||
else: | ||
status['publications'][PUBLICATIONDB_STATES[doc['publication_state']]] = doc['nt'] | ||
tmp['publications'][PUBLICATIONDB_STATES[doc['publication_state']]] = doc['nt'] | ||
|
||
#past.close() | ||
while True: | ||
try: | ||
tmp_transfer = open("tmp_transfer","w") | ||
tmp_transfer.write(json.dumps(tmp)) | ||
tmp_transfer.close() | ||
break | ||
except Exception as ex: | ||
print(ex) | ||
continue | ||
|
||
print (status) | ||
|
||
generate_xml(tmp) | ||
|
||
sys.exit(0) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
import threading | ||
import Queue | ||
import sys | ||
import os | ||
|
||
def do_work(in_queue,): | ||
while True: | ||
item = in_queue.get() | ||
# process | ||
with open(item) as json_file: | ||
json_data = json.load(json_file) | ||
|
||
jobid = item.split(".")[1] | ||
|
||
reporter = { | ||
"LFNs": [], | ||
"transferStatus": [], | ||
"failure_reason": [], | ||
"timestamp": [], | ||
"username": "" | ||
} | ||
reporter["LFNs"] = json_data["LFNs"] | ||
reporter["transferStatus"] = ['Finished' for x in range(len(reporter["LFNs"]))] | ||
reporter["username"] = json_data["username"] | ||
reporter["reasons"] = ['' for x in range(len(reporter["LFNs"]))] | ||
reporter["timestamp"] = 10000 | ||
report_j = json.dumps(reporter) | ||
|
||
try: | ||
if not os.path.exists("/data/srv/asyncstageout/current/install/asyncstageout/Monitor/work/%s" %user): | ||
os.makedirs("/data/srv/asyncstageout/current/install/asyncstageout/Monitor/work/%s" %user) | ||
out_file = open("/data/srv/asyncstageout/current/install/asyncstageout/AsyncTransfer/dropbox/inputs/%s/Reporter.%s.json"%(user,jobid),"w") | ||
out_file.write(report_j) | ||
out_file.close() | ||
os.remove('/%s/Monitor.%s.json' %(self.user,self.jobid)) | ||
except Exception as ex: | ||
msg="Cannot create fts job report: %s" %ex | ||
self.logger.error(msg) | ||
|
||
in_queue.task_done() | ||
|
||
if __name__ == "__main__": | ||
self.f | ||
work = Queue.Queue() | ||
results = Queue.Queue() | ||
total = 20 | ||
|
||
# start for workers | ||
for i in xrange(4): | ||
t = threading.Thread(target=do_work, args=(work,)) | ||
t.daemon = True | ||
t.start() | ||
|
||
# produce data | ||
for i in os.listdir("/data/srv/asyncstageout/current/install/asyncstageout/AsyncTransfer/dropbox/outputs"): | ||
work.put(i) | ||
|
||
work.join() | ||
|
||
sys.exit() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.