diff --git a/DatabaseExample b/DatabaseExample
deleted file mode 100644
index 0b354f2..0000000
--- a/DatabaseExample
+++ /dev/null
@@ -1,61 +0,0 @@
-# Config File for SFrameBatch to automagically construct
-# SFrame XML Files just with CrossSections and XML Paths
-# Format is Process Name, Path to XML File, Cross Section, either number of events/ cores,method (True for fast and false for weights)
-
-
-#NickName Directory Cross section Number of Events/Cores Methods
-SingleMuData /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/DATA_SingleMuon.xml
-BprimeB* /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/BprimeBToTW_M-*_v1.xml 1
-BprimeT* /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/BprimeTToTW_M-*_v1.xml 1
-TTbar_Mtt1000toInf /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_TT_Mtt1000toInf.xml 20.578 23168980
-SingleT_t /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_ST_t-channel_4f.xml 26.38 1666000
-SingleT_t_top /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_ST_t-channel_top_4f_leptonDecays.xml 44.33 3206600
-SingleTWtop /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_STpos_tW_inc.xml 35.6 998400
-SingleTWAntitop /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_STneg_tW_inc.xml 35.6 967600
-SingleT_s /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_ST_s-channel-4f.xml 3.36 337066841375
-TTbar /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_TTbar.xml 831.76 33364899
-TTbar_ext /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_TTbar_ext.xml 831.76 182424800
-WW /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_WW.xml 118.7 993214
-WZ /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_WZ.xml 47.13 1000000
-ZZ /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_ZZ.xml 16.523 989312
-QCD_20toInf_Mu /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_QCD_20toInf_MuEnrichedPt15.xml 720648000*0.00042 22093630
-QCD_15to20_Mu /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_QCD_Pt-15to20_MuEnrichedPt5.xml 1273190000*0.003 4613847
-QCD_20to30_Mu /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_QCD_Pt-20to30_MuEnrichedPt5.xml 558528000*0.0053 31556346475
-QCD_30to50_Mu /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_QCD_Pt-30to50_MuEnrichedPt5.xml 139803000*0.01182 275633615592
-QCD_50to80_Mu /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_QCD_Pt-50to80_MuEnrichedPt5.xml 19222500*0.02276 20314042
-QCD_80to120_Mu /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_QCD_Pt-80to120_MuEnrichedPt5.xml 2758420*0.03844 136848342702
-QCD_120to170_Mu /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_QCD_Pt-120to170_MuEnrichedPt5.xml 469797*0.05362 766305342545
-QCD_170to300_Mu /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_QCD_Pt-170to300_MuEnrichedPt5.xml 117989*0.07335 7815165
-QCD_300to470_Mu /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_QCD_Pt-300to470_MuEnrichedPt5.xml 7820.25*0.10196 7490138
-QCD_470to600_Mu /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_QCD_Pt-470to600_MuEnrichedPt5.xml 645.528*0.12242 370257504188
-QCD_600to800_Mu /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_QCD_Pt-600to800_MuEnrichedPt5.xml 187.109*0.13412 3969731
-QCD_800to1000_Mu /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_QCD_Pt-800to1000_MuEnrichedPt5.xml 32.3486*0.14552 396184224859
-QCD_1000toInf_Mu /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_QCD_Pt-1000toInf_MuEnrichedPt5.xml 10.4305*0.15544 3879642
-WJets_HT100to200 /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_WJets_LNu_HT100To200.xml 1345*1.21 29521944
-WJets_HT200to400 /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_WJets_LNu_HT200To400.xml 359.7*1.21 19957497
-WJets_HT400to600 /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_WJets_LNu_HT400To600.xml 48.91*1.21 7174379
-WJets_HT600to800 /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_WJets_LNu_HT600To800.xml 12.05*1.21 3723054
-WJets_HT800to1200 /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_WJets_LNu_HT800To1200.xml 5.501*1.21 7456227
-WJets_HT1200to2500 /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_WJets_LNu_HT1200To2500.xml 1.329*1.21 6736297
-WJets_HT2500toInf /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_WJets_LNu_HT2500ToInf.xml 0.03216*1.21 2486059
-ZJets_M10to50 /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_DYJetsToLL_NLO_M10to50.xml 18610 9.32173870097E+11
-ZJets_M50 /nfs/dust/cms/user/gonvaq/CMSSW/CMSSW_8_0_8/src/UHH2/common/datasets/RunII_80X_v1/MC_DYJetsToLL_NLO_M50.xml 6025.2 4.51201113827E+11
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/Inf_Classes.py b/Inf_Classes.py
index 641ef5f..74d9f8f 100644
--- a/Inf_Classes.py
+++ b/Inf_Classes.py
@@ -1,16 +1,15 @@
-#!/usr/bin/env python
-
-import xml.dom.minidom as minidom
+from __future__ import print_function
from copy import deepcopy
from glob import glob
-import os
class JobConfig(object):
- def __init__(self,node):
+ def __init__(self, node):
for item in node.attributes.items():
- if(item[0]=='JobName'): self.JobName = item[1]
- if(item[0]=='OutputLevel'): self.OutputLevel = item[1]
+ if item[0] == "JobName":
+ self.JobName = item[1]
+ if item[0] == "OutputLevel":
+ self.OutputLevel = item[1]
self.Libs = []
for child in node.getElementsByTagName('Library'):
for lib in child.attributes.items():
@@ -23,65 +22,72 @@ def __init__(self,node):
for item in node.getElementsByTagName('Cycle'):
self.Job_Cylce.append(Cycle(item))
+
class Cycle(object):
- def __init__(self,node):
+ def __init__(self, node):
self.cacheData = 0
for item in node.attributes.items():
- if(item[0]=='Name'): self.Cyclename = item[1]
- #if(item[0]=='RunMode'): self.RunMode = item[1]
- #if(item[0]=='ProofServer'): self.ProofServer = item[1]
- #if(item[0]=='ProofWorkDir'): self.ProofWorkDir = item[1]
- if(item[0]=='OutputDirectory'): self.OutputDirectory = item[1]
- if(item[0]=='PostFix'): self.PostFix = item[1]
- if(item[0]=='TargetLumi'): self.TargetLumi = item[1]
-
- self.Cycle_InputData =[]
- for item in node.getElementsByTagName('InputData'):
- self.Cycle_InputData.append(InputData(item,self.cacheData))
- if self.Cycle_InputData[-1].Cacheable=='True': self.cacheData = True
+ if item[0] == "Name":
+ self.Cyclename = item[1]
+ if item[0] == "OutputDirectory":
+ self.OutputDirectory = item[1]
+ if item[0] == "PostFix":
+ self.PostFix = item[1]
+ if item[0] == "TargetLumi":
+ self.TargetLumi = item[1]
+
+ self.Cycle_InputData = []
+ for item in node.getElementsByTagName("InputData"):
+ self.Cycle_InputData.append(InputData(item, self.cacheData))
+ if self.Cycle_InputData[-1].Cacheable == "True":
+ self.cacheData = True
self.Cycle_UserConf = []
for child in node.getElementsByTagName('UserConfig'):
for item in child.getElementsByTagName('Item'):
- name = None
+ name = None
value = None
for attr in item.attributes.items():
- if(attr[0]=='Name'): name=attr[1]
- if(attr[0]=='Value'): value=attr[1]
- self.Cycle_UserConf.append(UserConfig(name,value))
+ if attr[0] == "Name":
+ name = attr[1]
+ if attr[0] == "Value":
+ value = attr[1]
+ self.Cycle_UserConf.append(UserConfig(name, value))
- #print self.Cycle_UserConf
- UC_sframe_weight = filter(lambda uc: uc.Name == 'use_sframe_weight', self.Cycle_UserConf)
+ UC_sframe_weight = list(filter(lambda uc: uc.Name == 'use_sframe_weight', self.Cycle_UserConf))
self.usingSFrameWeight = not (UC_sframe_weight and UC_sframe_weight[0].Value == 'false')
class InputData(object):
- def __init__(self,node,cacheMe):
- self.NEventsSkip = 0
+ def __init__(self, node, cacheMe):
+ self.NEventsSkip = 0
for item in node.attributes.items():
if ' ' in item[1]:
- print 'Space in',item[0],'=',item[1]
- print 'Aborting since this is most probably wrong'
+ print("Space in", item[0], "=", item[1])
+ print("Aborting since this is most probably wrong")
exit(0)
- if(item[0]=='Lumi'): self.Lumi = item[1]
- if(item[0]=='NEventsMax'): self.NEventsMax = item[1]
- if(item[0]=='Type'): self.Type = item[1]
- if(item[0]=='Version'): self.Version = item[1]
- if(item[0]=='Cacheable'):
- if item[1]=='True' and not cacheMe:
- res = raw_input('Cacheable is set to True. Are you sure you want to continue? Y/[N] ')
+ if item[0] == "Lumi":
+ self.Lumi = item[1]
+ if item[0] == "NEventsMax":
+ self.NEventsMax = item[1]
+ if item[0] == "Type":
+ self.Type = item[1]
+ if item[0] == "Version":
+ self.Version = item[1]
+ if item[0] == "Cacheable":
+ if item[1] == "True" and not cacheMe:
+ res = input("Cacheable is set to True. Are you sure you want to continue? Y/[N] ")
if res.lower() != 'y':
exit(0)
- else: cacheMe = True
+ else:
+ cacheMe = True
self.Cacheable = item[1]
- if(item[0]=='NEventsSkip'): self.NEventsSkip = item[1]
- #print self.Version
- self.io_list =InputList()
+ if item[0] == "NEventsSkip":
+ self.NEventsSkip = item[1]
+ self.io_list = InputList()
for item in node.childNodes:
- #print item.nodeValue
if not item.nodeType == 3:
help_list = []
- #print item.nodeName
help_list.append(item.nodeName)
for entry in item.attributes.items():
for y in entry:
@@ -89,51 +95,51 @@ def __init__(self,node,cacheMe):
if item.nodeName == "In":
self.io_list.FileInfoList.append(help_list)
elif item.nodeName == "InputTree":
- if len(self.io_list.InputTree)==0:
- self.io_list.InputTree=help_list
+ if len(self.io_list.InputTree) == 0:
+ self.io_list.InputTree = help_list
elif self.io_list.InputTree != help_list:
- print 'not using the same InputTree. Prefere to exit'
+ print("not using the same InputTree. Prefere to exit")
exit(0)
else:
self.io_list.other.append(help_list)
-
-
+
expanded_list = []
for help_list in self.io_list.FileInfoList:
expanded_list += _expand_help_list_filenames(help_list)
self.io_list.FileInfoList = expanded_list
- def split_NEvents(self,NEventsBreak,LastBreak):
+ def split_NEvents(self, NEventsBreak, LastBreak):
self.NEventsBreak = NEventsBreak
- self.LastBreak = LastBreak
-
-
+ self.LastBreak = LastBreak
+
+
class UserConfig(object):
- def __init__(self,Name,Value):
+ def __init__(self, Name, Value):
self.Name = Name
self.Value = Value
+
class InputList(object):
def __init__(self):
- self.FileInfoList =[]
+ self.FileInfoList = []
self.InputTree = []
- self.other =[]
+ self.other = []
def _expand_help_list_filenames(help_list):
- filenames = filter(lambda s: '*' in s, help_list)
+ filenames = list(filter(lambda s: '*' in s, help_list))
if not filenames:
return [help_list]
- assert(len(filenames) == 1)
+ assert len(filenames) == 1
pattern = filenames[0]
real_filenames = glob(pattern)
if not real_filenames:
- raise RuntimeError('No files found for pattern: %s'%pattern)
+ raise RuntimeError("No files found for pattern: %s" % pattern)
new_help_list = []
for new_file in real_filenames:
new_list = deepcopy(help_list)
new_help_list.append(new_list)
- for i in xrange(len(new_list)):
+ for i in range(len(new_list)):
if new_list[i] == pattern:
new_list[i] = new_file
return new_help_list
diff --git a/LumiCalcAutoBuilder.py b/LumiCalcAutoBuilder.py
deleted file mode 100644
index 6966b74..0000000
--- a/LumiCalcAutoBuilder.py
+++ /dev/null
@@ -1,162 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import ConfigParser
-# SFrameBatch comes with a standalone version to read the number of entries.
-# Keep this in mind if it need to be updated some day
-import readaMCatNloEntries
-
-import sys, glob, copy, os, re
-# From a list of CrossSections and XML Files this class creates the sframe steering file!
-# Lets see how complicated this can get ???
-
-
-class process_helper(object):
- def __init__(self,name,crosssection,path,numberEvents):
- self.name = name
-
- self.crosssection = float(crosssection)
- self.path = path
- self.numberEvents = float(numberEvents)
- self.lumi = float(numberEvents)/float(crosssection)
- def printInfo(self):
- print "Process Name:",self.name,"CrossSection:",self.crosssection,"XML path:",self.path,"Number of Events:",self.numberEvents,"Lumi:",self.Lumi
-
-def str2bool(v):
- return v.lower() in ("yes", "true", "t", "1")
-
-class lumicalc_autobuilder(object):
- def __init__(self,path_to_Data):
- self.ProcessList = []
- finished_samples = False
- self.UserConfigText = []
- with open(str(path_to_Data)) as f:
- for line in f:
- if '#' in line or line == '\n' :
- continue
- elif 'USERCONFIGBLOCK' in line:
- finished_samples = True
- continue
- if finished_samples:
- striped_line = line.lstrip().rstrip()
- if striped_line: self.UserConfigText.append(striped_line)
- continue
-
- tmpsplit = line.split()
- if not tmpsplit:continue
-
- for exp in glob.glob(tmpsplit[1]):
- list_process = copy.deepcopy(tmpsplit)
- print list_process
- currentfile = ''
- if '*' in tmpsplit[1]:
- split_wildcards = tmpsplit[1].split('*')
- currentwork = exp[len(split_wildcards[0])-1:len(exp)-len(split_wildcards[1])]
- list_process[0]= list_process[0].replace('*',currentwork)
- list_process[1]= exp
-
- #if '/' not in list_process[1]:
- list_process[1] = os.path.abspath(list_process[1])
-
- #print list_process
- if 'data' in list_process[0].lower():
- self.ProcessList.append(process_helper(list_process[0],1,list_process[1],1))
- continue
- numberEvents = 0
- lastxmlline = ''
-
- for xmlline in reversed(open(list_process[1]).readlines()):
- lastxmlline = xmlline.rstrip()
- #print list_process[1],lastxmlline
- if lastxmlline:
- break
- #print 'Bool',str2bool(list_process[4])
-
- if '' not in lastxmlline:
- #print list_process[1]
- if len(list_process) < 5:
- if len(list_process) == 4:
- numberEvents = float(list_process[3])
- else:
- print "No idea which method to use to read entries please add cores and method"
- print "for",list_process[0]
- exit(1)
- if len(list_process) > 4:
- methodname = 'fast'
- if not str2bool(list_process[4]): methodname = 'weights'
- print 'going to count events for',list_process[0]
- numberEvents = readaMCatNloEntries.readEntries(int(list_process[3]),[list_process[1]],str2bool(list_process[4]))[0]
-
- with open(list_process[2], "a") as myfile:
- myfile.write('')
- else:
- if len(list_process) == 4:
- numberEvents = float(list_process[3])
- else:
- splitted_lastwords = lastxmlline.split('"')
- #print splitted_lastwords
- if len(splitted_lastwords)>1:
- numberEvents = splitted_lastwords[1]
- else:
- #print lastxmlline.split(' ')
- numberEvents = lastxmlline.split(' ')[-2]
-
- crosssectionNumber = list_process[2]
- if '*' in crosssectionNumber:
- numbers = crosssectionNumber.split('*')
- crosssectionNumber = float(1.)
- for num in numbers:
- #print num
- crosssectionNumber = float(crosssectionNumber)*float(num)
- self.ProcessList.append(process_helper(list_process[0],crosssectionNumber,list_process[1],numberEvents))
-
-
- # Follows sframe conventions as of 2016
- # Parts are filled with __CHANGE_ME__!!!
- # UserConfig Section is missing
- def write_to_toyxml(self,xmlname):
- with open(str('lumi_'+xmlname.replace('xml','py')),'w+') as nf:
- nf.write('lumi_list = [\n')
- for i in self.ProcessList:
- nf.write('\t[\''+i.name+'\','+str(i.lumi)+'],\n')
- nf.write('\t]')
-
- with open(str(xmlname),'w+') as f:
- f.write('\n')
- f.write('\n')
- print "Added to Entity",i.name,i.path
- f.write(']>\n')
- f.write('\n\n\n')
- f.write('\n')
- f.write('\t\n')
- f.write('\t\n')
- f.write('\t\n')
- for i in self.ProcessList:
- datatype = 'MC'
- if 'data' in i.name.lower(): datatype = 'DATA'
- f.write('\t\t\n')
- f.write('\t\t\t&'+i.name+';\n')
- f.write('\t\t\t\n')
- f.write('\t\t\t\n')
- f.write('\t\t\n')
- print 'Added Process to InputData:', i.name,'with lumi:',i.lumi
- f.write('\t\t\n')
- f.write('\t\t\n')
- for line in self.UserConfigText:
- f.write('\t\t\t'+line+'\n')
- f.write('\t\t\n')
- f.write('\t\n')
- f.write('\n')
- print 'Process list contains',len(self.ProcessList),'entries'
- #con = create_database(data)
- #lumi
-
-
-if __name__ == "__main__":
- print "Expected format: path to Database, Name for new pre-XMl file"
- TestInfo = lumicalc_autobuilder(sys.argv[1:][0])
- TestInfo.write_to_toyxml(sys.argv[1:][1])
- exit(0)
diff --git a/Manager.py b/Manager.py
index b1b4ecb..57ba428 100644
--- a/Manager.py
+++ b/Manager.py
@@ -1,251 +1,315 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-from io_func import *
-from batch_classes import *
-from Inf_Classes import *
-from SubmissionInfo_Class import *
+from __future__ import print_function
+import io_func
+import batch_classes
+import SubmissionInfo_Class
import os
import subprocess
-import itertools
-import datetime
import json
import time
import gc
-import StringIO
-from xml.dom.minidom import parse, parseString
-import xml.sax
-
-# takes care of looking into qstat
+# takes care of looking into qstat
class pidWatcher(object):
- def __init__(self,subInfo):
+ def __init__(self, subInfo):
self.pidStates = {}
- ListOfPids = [subInfo[k].arrayPid for k in range(len(subInfo)) if len(subInfo[k].arrayPid) > 0 ]
+ ListOfPids = [subInfo[k].arrayPid for k in range(len(subInfo)) if len(subInfo[k].arrayPid) > 0]
# adding Pids of resubmitted jobs
for process in subInfo:
for pid in process.pids:
if process.arrayPid not in pid:
ListOfPids.append(pid)
- if(len(ListOfPids)==0):
+ if len(ListOfPids) == 0:
self.parserWorked = False
return
try:
- #looking into condor_q for jobs that are idle, running or hold (HTC State 1,2 and 5)
- proc_cQueue = subprocess.Popen(['condor_q']+ListOfPids+['-af:,','GlobalJobId','JobStatus'],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
+ # looking into condor_q for jobs that are idle, running or hold (HTC State 1,2 and 5)
+ proc_cQueue = subprocess.Popen(
+ ["condor_q"] + ListOfPids + ["-af:,", "GlobalJobId", "JobStatus"],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
cQueue_jsons = proc_cQueue.communicate()[0]
processes_json = []
if cQueue_jsons:
for line in cQueue_jsons.split('\n'):
- if(',' not in line):
+ if "," not in line:
continue
GlobalJobId, JobStatus = line.strip().split(',')
- processes_json.append({'GlobalJobId':GlobalJobId,'JobStatus':int(JobStatus)})
+ processes_json.append({"GlobalJobId": GlobalJobId, "JobStatus": int(JobStatus)})
self.parserWorked = True
else:
self.parserWorked = False
except Exception as e:
# print e
self.parserWorked = False
- print 'Processing condor_q information did not work. Maybe the NAF has some problem. Or nothing is running on the Batch anymore.'
- print 'Going to wait for 5 minutes, lets see if condor_q will start to work again.'
+ print(
+ "Processing condor_q information did not work. Maybe the NAF has some problem."
+ + "Or nothing is running on the Batch anymore."
+ )
+ print("Going to wait for 5 minutes, lets see if condor_q will start to work again.")
+ print(e)
time.sleep(300)
return
- if(any(len(i)==0 for i in ListOfPids)):
- self.parserWorked=False
+ if any(len(i) == 0 for i in ListOfPids):
+ self.parserWorked = False
if self.parserWorked:
for item in processes_json:
raw_id = item["GlobalJobId"]
jobid = raw_id.split("#")[1]
- if(jobid.split('.')[0] not in ListOfPids):
+ if jobid.split(".")[0] not in ListOfPids:
continue
- self.pidStates.update({jobid:item["JobStatus"]})
+ self.pidStates.update({jobid: item["JobStatus"]})
def check_pidstatus(self, pid, debug=False):
- if '.' not in str(pid): pid = pid + '.0'
+ if "." not in str(pid):
+ pid = pid + ".0"
if pid not in self.pidStates:
return -1
state = str(self.pidStates[pid])
- if(state == '1' or state == '2' or state == '4'):
+ if state == "1" or state == "2" or state == "4":
return 1 # in the batch
else:
return 2 # error state
return 0 # not available
-#JSON Format is used to store the submission information
+
+# JSON Format is used to store the submission information
class HelpJSON:
- def __init__(self,json_file):
+ def __init__(self, json_file):
self.data = None
- #return
if os.path.isfile(json_file):
- print 'Using saved settings from:', json_file
- self.data = json.load(open(json_file,'r'))
- #self.data = json.load(self.data)
+ print("Using saved settings from:", json_file)
+ self.data = json.load(open(json_file, "r"))
- def check(self,datasetname):
+ def check(self, datasetname):
for element in self.data:
jdict = json.loads(element)
- if str(datasetname) == str(jdict['name']) and (str(jdict['arrayPid']) or any(jdict['pids'])):
- print 'Found Submission Info for',jdict['name']
- mysub = SubInfo()
+ if str(datasetname) == str(jdict["name"]) and (str(jdict["arrayPid"]) or any(jdict["pids"])):
+ print("Found Submission Info for", jdict["name"])
+ mysub = SubmissionInfo_Class.SubInfo()
mysub.load_Dict(jdict)
return mysub
return None
+
class JobManager(object):
- def __init__(self,options,header,workdir):
- self.header = header #how do I split stuff, sframe_batch header in xml file
- self.workdir = workdir #name of the workdir
- self.merge = MergeManager(options.add,options.forceMerge,options.waitMerge,options.addNoTree)
- self.subInfo = [] #information about the submission status
- self.deadJobs = 0 #check if no file has been written to disk and nothing is on running on the batch
- self.totalFiles = 0
+ def __init__(self, options, header, workdir):
+ self.header = header # how do I split stuff, sframe_batch header in xml file
+ self.workdir = workdir # name of the workdir
+ self.merge = MergeManager(options.add, options.forceMerge, options.waitMerge, options.addNoTree)
+ self.subInfo = [] # information about the submission status
+ self.deadJobs = 0 # check if no file has been written to disk and nothing is on running on the batch
+ self.totalFiles = 0
self.missingFiles = -1
- self.move_cursor_up_cmd = None # pretty print status
- self.stayAlive = 0 # loop counter to see if program is running
- self.numOfResubmit =0
+ self.move_cursor_up_cmd = None # pretty print status
+ self.stayAlive = 0 # loop counter to see if program is running
+ self.numOfResubmit = 0
self.watch = None
self.printString = []
self.keepGoing = options.keepGoing
self.exitOnQuestion = options.exitOnQuestion
- self.outputstream = self.workdir+'/Stream_'
+ self.outputstream = self.workdir + "/Stream_"
self.sl6_container = options.sl6container # run code in SL6 singularity container
- #read xml file and do the magic
- def process_jobs(self,InputData,Job):
- jsonhelper = HelpJSON(self.workdir+'/SubmissinInfoSave.p')
+ # read xml file and do the magic
+ def process_jobs(self, InputData, Job):
+ jsonhelper = HelpJSON(self.workdir + "/SubmissinInfoSave.p")
number_of_processes = len(InputData)
gc.disable()
- for process in xrange(number_of_processes):
+ for process in range(number_of_processes):
found = None
- processName = ([InputData[process].Version])
+ processName = [InputData[process].Version]
if jsonhelper.data:
- helpSubInfo = SubInfo()
found = jsonhelper.check(InputData[process].Version)
if found:
self.subInfo.append(found)
if not found:
- self.subInfo.append(SubInfo(InputData[process].Version,write_all_xml(self.workdir+'/'+InputData[process].Version,processName,self.header,Job,self.workdir),InputData[process].Type))
+ self.subInfo.append(
+ SubmissionInfo_Class.SubInfo(
+ InputData[process].Version,
+ io_func.write_all_xml(
+ self.workdir + "/" + InputData[process].Version, processName, self.header, Job, self.workdir
+ ),
+ InputData[process].Type,
+ )
+ )
if self.subInfo[-1].numberOfFiles == 0:
- print 'Removing',self.subInfo[-1].name
+ print("Removing", self.subInfo[-1].name)
self.subInfo.pop()
else:
self.totalFiles += self.subInfo[-1].numberOfFiles
- self.subInfo[-1].reset_resubmit(self.header.AutoResubmit) #Reset the retries every time you start
- write_script(processName[0],self.workdir,self.header,self.sl6_container) #Write the scripts you need to start the submission
+ self.subInfo[-1].reset_resubmit(self.header.AutoResubmit) # Reset the retries every time you start
+ batch_classes.write_script(
+ processName[0], self.workdir, self.header, self.sl6_container
+ ) # Write the scripts you need to start the submission
gc.enable()
- #submit the jobs to the batch as array job
- #the used function should soon return the pid of the job for killing and knowing if something failed
- def submit_jobs(self,OutputDirectory,nameOfCycle):
+
+ # submit the jobs to the batch as array job
+ # the used function should soon return the pid of the job for killing and knowing if something failed
+ def submit_jobs(self, OutputDirectory, nameOfCycle):
for process in self.subInfo:
process.startingTime = time.time()
- process.arrayPid = submit_qsub(process.numberOfFiles,self.outputstream+str(process.name),str(process.name),self.workdir)
- print 'Submitted jobs',process.name, 'pid', process.arrayPid
+ process.arrayPid = batch_classes.submit_qsub(
+ process.numberOfFiles, self.outputstream + str(process.name), str(process.name), self.workdir
+ )
+ print("Submitted jobs", process.name, "pid", process.arrayPid)
process.reachedBatch = [False]*process.numberOfFiles
if process.status != 0:
process.status = 0
- process.pids=[process.arrayPid+'.'+str(i) for i in range(process.numberOfFiles)]
- # if any(process.pids):
+ process.pids = [process.arrayPid + "." + str(i) for i in range(process.numberOfFiles)]
+ # if any(process.pids):
# process.pids = ['']*process.numberOfFiles
- #resubmit the jobs see above
+
+ # resubmit the jobs see above
def resubmit_jobs(self):
qstat_out = self.watch.parserWorked
ask = True
for process in self.subInfo:
- for it in process.missingFiles:
+ for it in process.missingFiles:
batchstatus = self.watch.check_pidstatus(process.pids[it-1])
- if qstat_out and batchstatus==-1 and ask:
- print '\n' + str(qstat_out)
+ if qstat_out and batchstatus == -1 and ask:
+ print('\n' + str(qstat_out))
if self.exitOnQuestion:
exit(-1)
elif not self.keepGoing:
- res = raw_input('Some jobs are still running (see above). Do you really want to resubmit? Y/[N] ')
+ res = input(
+ "Some jobs are still running (see above). Do you really want to resubmit? Y/[N] "
+ )
if res.lower() != 'y':
exit(-1)
ask = False
if batchstatus != 1:
- process.pids[it-1] = resubmit(self.outputstream+process.name,process.name+'_'+str(it),self.workdir,self.header,self.sl6_container)
- #print 'Resubmitted job',process.name,it, 'pid', process.pids[it-1]
- self.printString.append('Resubmitted job '+process.name+' '+str(it)+' pid '+str(process.pids[it-1]))
- if process.status != 0: process.status =0
- process.reachedBatch[it-1] = False
-
- #see how many jobs finished, were copied to workdir
- def check_jobstatus(self, OutputDirectory, nameOfCycle,remove = False, autoresubmit = True):
- missing = open(self.workdir+'/missing_files.txt','w+')
+ process.pids[it - 1] = batch_classes.resubmit(
+ self.outputstream + process.name,
+ process.name + "_" + str(it),
+ self.workdir,
+ self.header,
+ self.sl6_container,
+ )
+ self.printString.append(
+ "Resubmitted job " + process.name + " " + str(it) + " pid " + str(process.pids[it - 1])
+ )
+ if process.status != 0:
+ process.status = 0
+ process.reachedBatch[it - 1] = False
+
+ # see how many jobs finished, were copied to workdir
+ def check_jobstatus(self, OutputDirectory, nameOfCycle, remove=False, autoresubmit=True):
+ missing = open(self.workdir + "/missing_files.txt", "w+")
waitingFlag_autoresub = False
- missingRootFiles = 0
- ListOfDict =[]
+ missingRootFiles = 0
+ ListOfDict = []
self.watch = pidWatcher(self.subInfo)
ask = True
- for i in xrange(len(self.subInfo)-1, -1, -1):
+ for i in range(len(self.subInfo) - 1, -1, -1):
process = self.subInfo[i]
ListOfDict.append(process.to_JSON())
- rootFiles =0
+ rootFiles = 0
self.subInfo[i].missingFiles = []
for it in range(process.numberOfFiles):
- if process.jobsDone[it]:
- rootFiles+=1
+ if process.jobsDone[it]:
+ rootFiles += 1
continue
- #have a look at the pids with qstat
+ # have a look at the pids with qstat
batchstatus = self.watch.check_pidstatus(process.pids[it])
- #kill batchjobs with error otherwise update batchinfo
- batchstatus = process.process_batchStatus(batchstatus,it)
- #check if files have arrived
- filename = OutputDirectory+'/'+self.workdir+'/'+nameOfCycle+'.'+process.data_type+'.'+process.name+'_'+str(it)+'.root'
- #if process.jobsRunning[it]:
- #print filename, os.path.exists(filename), process.jobsRunning[it], process.jobsDone[it], process.arrayPid, process.pids[it]
- if os.path.exists(filename) and process.startingTime < os.path.getctime(filename) and not process.jobsRunning[it]:
+ # kill batchjobs with error otherwise update batchinfo
+ batchstatus = process.process_batchStatus(batchstatus, it)
+ # check if files have arrived
+ filename = (
+ OutputDirectory
+ + "/"
+ + self.workdir
+ + "/"
+ + nameOfCycle
+ + "."
+ + process.data_type
+ + "."
+ + process.name
+ + "_"
+ + str(it)
+ + ".root"
+ )
+
+ if (
+ os.path.exists(filename)
+ and process.startingTime < os.path.getctime(filename)
+ and not process.jobsRunning[it]
+ ):
process.jobsDone[it] = True
if not process.jobsDone[it]:
- missing.write(self.workdir+'/'+nameOfCycle+'.'+process.data_type+'.'+process.name+'_'+str(it)+'.root sframe_main '+process.name+'_'+str(it+1)+'.xml\n')
- self.subInfo[i].missingFiles.append(it+1)
- missingRootFiles +=1
+ missing.write(
+ self.workdir
+ + "/"
+ + nameOfCycle
+ + "."
+ + process.data_type
+ + "."
+ + process.name
+ + "_"
+ + str(it)
+ + ".root sframe_main "
+ + process.name
+ + "_"
+ + str(it + 1)
+ + ".xml\n"
+ )
+ self.subInfo[i].missingFiles.append(it + 1)
+ missingRootFiles += 1
else:
- rootFiles+=1
- #auto resubmit if job dies, take care that there was some job before and warn the user if more then 10% of jobs die
- #print process.name,'batch status',batchstatus, 'process.reachedBatch',process.reachedBatch, 'process status',process.status,'resubmit counter',process.resubmit[it], 'resubmit active',autoresubmit
+ rootFiles += 1
+ # auto resubmit if job dies, take care that there was some job before and warn the user
+ # if more then 10% of jobs die
if (
- process.notFoundCounter[it] > 5 and
- not process.jobsRunning[it] and
- not process.jobsDone[it] and
- process.reachedBatch[it] and
- (process.resubmit[it] ==-1 or process.resubmit[it]>0) and
- (process.pids[it] or process.arrayPid) and
- autoresubmit
+ process.notFoundCounter[it] > 5
+ and not process.jobsRunning[it]
+ and not process.jobsDone[it]
+ and process.reachedBatch[it]
+ and (process.resubmit[it] == -1 or process.resubmit[it] > 0)
+ and (process.pids[it] or process.arrayPid)
+ and autoresubmit
):
- if float(self.numOfResubmit)/float(self.totalFiles) >.10 and ask:
+ if float(self.numOfResubmit) / float(self.totalFiles) > 0.10 and ask:
if self.exitOnQuestion:
exit(-1)
elif not self.keepGoing:
- res = raw_input('More then 10% of jobs are dead, do you really want to continue? Y/[N] ')
- if res.lower() != 'y':
+ res = input("More then 10% of jobs are dead, do you really want to continue? Y/[N] ")
+ if res.lower() != "y":
exit(-1)
ask = False
- #print 'resubmitting', process.name+'_'+str(it+1),es not Found',process.notFoundCounter[it], 'pid', process.pids[it], process.arrayPid, 'task',it+1
waitingFlag_autoresub = True
- process.pids[it] = resubmit(self.outputstream+process.name,process.name+'_'+str(it+1),self.workdir,self.header,self.sl6_container)
+ process.pids[it] = batch_classes.resubmit(
+ self.outputstream + process.name,
+ process.name + "_" + str(it + 1),
+ self.workdir,
+ self.header,
+ self.sl6_container,
+ )
process.status = 0
- #print 'AutoResubmitted job',process.name,it, 'pid', process.pids[it]
- self.printString.append('File Found '+str(os.path.exists(filename)))
- if os.path.exists(filename): self.printString.append('Timestamp is ok '+str(process.startingTime < os.path.getctime(filename)))
- self.printString.append('AutoResubmitted job '+process.name+' '+str(it)+' pid '+str(process.pids[it]))
- #time.sleep(5)
+ self.printString.append("File Found " + str(os.path.exists(filename)))
+ if os.path.exists(filename):
+ self.printString.append(
+ "Timestamp is ok " + str(process.startingTime < os.path.getctime(filename))
+ )
+ self.printString.append(
+ "AutoResubmitted job " + process.name + " " + str(it) + " pid " + str(process.pids[it])
+ )
process.reachedBatch[it] = False
- if process.resubmit[it] > 0 :
+ if process.resubmit[it] > 0:
process.resubmit[it] -= 1
- self.numOfResubmit +=1
+ self.numOfResubmit += 1
# final status updates
if (
- any( i > 6 for i in process.notFoundCounter) and
- not any(process.jobsRunning) and
- not all(process.jobsDone) and
- all(process.reachedBatch) # basically set to error when nothing is running anymore & everything was on the batch
+ any(i > 6 for i in process.notFoundCounter)
+ and not any(process.jobsRunning)
+ and not all(process.jobsDone)
+ and all(
+ process.reachedBatch
+ ) # basically set to error when nothing is running anymore & everything was on the batch
):
process.status = 4
- ###Debugging is ongoing
+ # Debugging is ongoing
"""
if any( i > 6 for i in process.notFoundCounter):
print 'Process', process.name,'not found i-times',i
@@ -255,73 +319,99 @@ def check_jobstatus(self, OutputDirectory, nameOfCycle,remove = False, autoresub
"""
if all(process.jobsDone) and not process.status == 2:
process.status = 1
- process.rootFileCounter=rootFiles
+ process.rootFileCounter = rootFiles
try:
missing.close()
except IOError as e:
- print "I/O error({0}): {1}".format(e.errno, e.strerror)
-
+ print("I/O error({0}): {1}".format(e.errno, e.strerror))
+
self.missingFiles = missingRootFiles
- #Save/update pids and other information to json file, such that it can be loaded and used later
+ # Save/update pids and other information to json file, such that it can be loaded and used later
try:
- jsonFile = open(self.workdir+'/SubmissinInfoSave.p','wb+')
+ jsonFile = open(self.workdir + "/SubmissinInfoSave.p", 'w+', encoding='utf-8')
json.dump(ListOfDict, jsonFile)
jsonFile.close()
except IOError as e:
- print "I/O error({0}): {1}".format(e.errno, e.strerror)
- if(waitingFlag_autoresub): time.sleep(5)
-
-
- #print status of jobs
+ print("I/O error({0}): {1}".format(e.errno, e.strerror))
+ if waitingFlag_autoresub:
+ time.sleep(5)
+
+ # print status of jobs
def print_status(self):
if not self.move_cursor_up_cmd:
- self.move_cursor_up_cmd = '\x1b[1A\x1b[2K'*(len(self.subInfo) + 3)
- self.move_cursor_up_cmd += '\x1b[1A' # move once more up since 'print' finishes the line
- print 'Status of files'
+ self.move_cursor_up_cmd = "\x1b[1A\x1b[2K" * (len(self.subInfo) + 3)
+ self.move_cursor_up_cmd += "\x1b[1A" # move once more up since 'print' finishes the line
+ print("Status of files")
else:
- print self.move_cursor_up_cmd
- #time.sleep(.1) # 'blink'
-
+ print(self.move_cursor_up_cmd)
+ # time.sleep(.1) # 'blink'
+
for item in self.printString:
- print item
+ print(item)
self.printString = []
- stayAliveArray = ['|','/','-','\\']
+ stayAliveArray = ["|", "/", "-", "\\"]
if self.stayAlive < 3:
- self.stayAlive +=1
+ self.stayAlive += 1
else:
self.stayAlive = 0
process_names_length = str(len(max([process.name for process in self.subInfo], key=len)))
- print ('%'+process_names_length+'s: %6s %6s %.6s')% ('Sample Name','Ready','#Files','[%]')
- readyFiles =0
+ print(("%" + process_names_length + "s: %6s %6s %.6s") % ("Sample Name", "Ready", "#Files", "[%]"))
+ readyFiles = 0
for process in self.subInfo:
- status_message = ['\033[94m Working \033[0m','\033[92m Transferred \033[0m','Merging','Already Merged','\033[91m Failed \033[0m']
- #print process.status
- print ('%'+process_names_length+'s: %6i %6i %.3i')% (process.name, process.rootFileCounter,process.numberOfFiles, 100*float(process.rootFileCounter)/float(process.numberOfFiles)), status_message[process.status]
+ status_message = [
+ "\033[94m Working \033[0m",
+ "\033[92m Transferred \033[0m",
+ "Merging",
+ "Already Merged",
+ "\033[91m Failed \033[0m",
+ ]
+ # print process.status
+ print(
+ ("%" + process_names_length + "s: %6i %6i %.3i")
+ % (
+ process.name,
+ process.rootFileCounter,
+ process.numberOfFiles,
+ 100 * float(process.rootFileCounter) / float(process.numberOfFiles),
+ ),
+ status_message[process.status],
+ )
readyFiles += process.rootFileCounter
- print 'Number of files: ',readyFiles,'/',self.totalFiles,'(%.3i)' % (100*(1-float(readyFiles)/float(self.totalFiles))),stayAliveArray[self.stayAlive],stayAliveArray[self.stayAlive]
- print '='*80
-
- #take care of merging
- def merge_files(self,OutputDirectory,nameOfCycle,InputData):
- self.merge.merge(OutputDirectory,nameOfCycle,self.subInfo,self.workdir,InputData,self.outputstream)
- #wait for every process to finish
+ print(
+ "Number of files: ",
+ readyFiles,
+ "/",
+ self.totalFiles,
+ "(%.3i)" % (100 * (1 - float(readyFiles) / float(self.totalFiles))),
+ stayAliveArray[self.stayAlive],
+ stayAliveArray[self.stayAlive],
+ )
+ print("=" * 80)
+
+ # take care of merging
+ def merge_files(self, OutputDirectory, nameOfCycle, InputData):
+ self.merge.merge(OutputDirectory, nameOfCycle, self.subInfo, self.workdir, InputData, self.outputstream)
+
+ # wait for every process to finish
def merge_wait(self):
self.merge.wait_till_finished()
- #see how many jobs finished (or error)
+
+ # see how many jobs finished (or error)
def get_subInfoFinish(self):
for process in self.subInfo:
- if process.status==0:
+ if process.status == 0:
return False
return True
-#class to take care of merging (maybe rethink design)
+
+# class to take care of merging (maybe rethink design)
class MergeManager(object):
- def __init__(self,add,force,wait,onlyhist=False):
+ def __init__(self, add, force, wait, onlyhist=False):
self.add = add
self.force = force
- self.active_process=[]
+ self.active_process = []
self.wait = wait
self.onlyhist = onlyhist
@@ -331,30 +421,46 @@ def get_mergerStatus(self):
else:
return False
- def merge(self,OutputDirectory,nameOfCycle,info,workdir,InputData,outputdir):
- if not self.add and not self.force and not self.onlyhist: return
- #print "Don't worry your are using nice = 10"
+ def merge(self, OutputDirectory, nameOfCycle, info, workdir, InputData, outputdir):
+ if not self.add and not self.force and not self.onlyhist:
+ return
OutputTreeName = ""
for inputObj in InputData:
for mylist in inputObj.io_list.other:
if "OutputTree" in mylist:
- OutputTreeName= mylist[2]
+ OutputTreeName = mylist[2]
for process in info:
if not process.numberOfFiles == process.rootFileCounter:
continue
- #print any(process.jobsRunning)
- #print process.name,any(process.jobsRunning), process.status ==1,os.path.exists(OutputDirectory+'/'+nameOfCycle+'.'+process.data_type+'.'+process.name+'.root'
- if (not os.path.exists(OutputDirectory+'/'+nameOfCycle+'.'+process.data_type+'.'+process.name+'.root') and all(process.jobsDone) and process.status !=2 ) or self.force:
- self.active_process.append(add_histos(OutputDirectory,nameOfCycle+'.'+process.data_type+'.'+process.name,process.numberOfFiles,workdir,OutputTreeName,self.onlyhist,outputdir+process.name))
+ if (
+ not os.path.exists(
+ OutputDirectory + "/" + nameOfCycle + "." + process.data_type + "." + process.name + ".root"
+ )
+ and all(process.jobsDone)
+ and process.status != 2
+ ) or self.force:
+ self.active_process.append(
+ batch_classes.add_histos(
+ OutputDirectory,
+ nameOfCycle + "." + process.data_type + "." + process.name,
+ process.numberOfFiles,
+ workdir,
+ OutputTreeName,
+ self.onlyhist,
+ outputdir + process.name,
+ )
+ )
process.status = 2
- #elif process.status !=2:
+ # elif process.status !=2:
# process.status = 3
def wait_till_finished(self):
- if not self.wait: return
+ if not self.wait:
+ return
for process in self.active_process:
- if not process: continue
- print 'Active process',process.communicate()[0]
+ if not process:
+ continue
+ print("Active process", process.communicate()[0])
if not process.poll():
process.wait()
- #os.kill(process.pid,-9)
+ # os.kill(process.pid,-9)
diff --git a/RunII_80X_v3.info b/RunII_80X_v3.info
deleted file mode 100644
index b42c4f4..0000000
--- a/RunII_80X_v3.info
+++ /dev/null
@@ -1,116 +0,0 @@
-# Config File for SFrameBatch to automagically construct
-# SFrame XML Files just with CrossSections and XML Paths
-# Format is Process Name, Path to XML File, Cross Section, either number of events/ cores,method (True for fast and false for weights)
-# If there exists a tag with the number of events it will be read in if the last colum is left empty.
-# For Data thr cross section should be empty
-# Usage sframe_batch.py --XMLDatabase RunII_80X_v3.info FILENAME_TO_STORE.xml
-
-#NickName Directory Cross section Number of Events/Cores Methods
-#SingleEleData DATA_SingleElectron*.xml
-SingleMuData DATA_SingleMu*.xml
-SingleMuData SingleMuon*
-BprimeB* Bprime/BprimeBToTW_M-*.xml 1
-BprimeT* Bprime/BprimeTToTW_M-*.xml 1
-X53* Bprime/X53ToTW_M-*.xml 1
-SingleT_t MC_ST_t-channel_antitop_4f_inclusiveDecays.xml 80.95
-SingleT_t_top MC_ST_t-channel_top_4f_inclusiveDecays.xml 136.02
-SingleTWtop ST_tW_top_5f_inclusiveDecays_13TeV.xml 35.6
-SingleTWAntitop ST_tW_antitop_5f_inclusiveDecays_13TeV.xml 35.6
-SingleT_s ST_s-channel_4f_leptonDecays_13TeV.xml 3.36
-#TTbar MC_TTbar.xml 831.76
-#TTbar_Mtt1000toInf MC_TT_Mtt1000toInf.xml 20.578 23546331
-TTbar_Tune MC_TT_TuneCUETP8M2T4.xml 831.76
-#WW MC_WW.xml 118.7 993214
-#WZ MC_WZ.xml 47.13 1000000
-#ZZ MC_ZZ.xml 16.523 989312
-WJets_HT100to200 MC_WJetsToLNu_HT-100To200.xml 1345*1.21
-WJets_HT200to400 MC_WJetsToLNu_HT-200To400.xml 359.7*1.21
-WJets_HT400to600 MC_WJetsToLNu_HT-400To600.xml 48.91*1.21
-WJets_HT600to800 MC_WJetsToLNu_HT-600To800.xml 12.05*1.21
-WJets_HT800to1200 MC_WJetsToLNu_HT-800To1200.xml 5.501*1.21
-WJets_HT1200to2500 MC_WJetsToLNu_HT-1200To2500.xml 1.329*1.21
-WJets_HT2500toInf MC_WJetsToLNu_HT-2500ToInf.xml 0.03216*1.21
-WJets_comp MC_WJetsToLNu.xml 61526.7
-
-#ZJets_M10to50 MC_DYJetsToLL_NLO_M10to50.xml 18610
-#ZJets_M50 MC_DYJetsToLL_NLO_M50.xml 6025.2
-
-ZJets_HT70to100 MC_DYJetsToLL_M-50_HT-70to100.xml 175.3*1.23
-ZJets_HT100to200 MC_DYJetsToLL_M-50_HT-100to200.xml 147.4*1.23
-ZJets_HT200to400 MC_DYJetsToLL_M-50_HT-200to400.xml 40.99*1.23
-ZJets_HT400to600 MC_DYJetsToLL_M-50_HT-400to600.xml 5.678*1.23
-ZJets_HT600to800 MC_DYJetsToLL_M-50_HT-600to800.xml 1.367*1.23
-ZJets_HT800to1200 MC_DYJetsToLL_M-50_HT-800to1200.xml 0.6304*1.23
-ZJets_HT1200to2500 MC_DYJetsToLL_M-50_HT-1200to2500.xml 0.1514*1.23
-ZJets_HT2500toINF MC_DYJetsToLL_M-50_HT-2500toInf 0.003565*1.23
-
-#QCD_20toInf_Mu MC_QCD_20toInf_MuEnrichedPt15.xml 720648000*0.00042
-
-QCD_15to20_Mu MC_QCD_Pt-15to20_MuEnrichedPt5.xml 1273190000*0.003 4141208
-QCD_20to30_Mu MC_QCD_Pt-20to30_MuEnrichedPt5.xml 558528000*0.0053 31475094
-QCD_30to50_Mu MC_QCD_Pt-30to50_MuEnrichedPt5.xml 139803000*0.01182 29944719
-QCD_50to80_Mu MC_QCD_Pt-50to80_MuEnrichedPt5.xml 19222500*0.02276 19806515
-QCD_80to120_Mu MC_QCD_Pt-80to120_MuEnrichedPt5.xml 2758420*0.03844 13778176
-QCD_120to170_Mu MC_QCD_Pt-120to170_MuEnrichedPt5.xml 469797*0.05362 8042660
-QCD_170to300_Mu MC_QCD_Pt-170to300_MuEnrichedPt5.xml 117989*0.07335 7946703
-QCD_300to470_Mu MC_QCD_Pt-300to470_MuEnrichedPt5.xml 7820.25*0.10196 7936465
-QCD_470to600_Mu MC_QCD_Pt-470to600_MuEnrichedPt5.xml 645.528*0.12242 3850466
-QCD_600to800_Mu MC_QCD_Pt-600to800_MuEnrichedPt5.xml 187.109*0.13412 4008200
-QCD_800to1000_Mu MC_QCD_Pt-800to1000_MuEnrichedPt5.xml 32.3486*0.14552 3959757
-QCD_1000toInf_Mu MC_QCD_Pt-1000toInf_MuEnrichedPt5.xml 10.4305*0.15544 3976075
-
-QCD_Pt20to30_EM MC_QCD_Pt20to30_EMEnriched.xml 557600000*0.0096 9167183
-QCD_Pt30to50_EM MC_QCD_Pt30to50_EMEnriched.xml 136000000*0.073 11498358
-QCD_Pt50to80_EM MC_QCD_Pt50to80_EMEnriched.xml 19800000*0.146 23191144
-QCD_Pt80to120_EM MC_QCD_Pt80to120_EMEnriched.xml 2800000*0.125 33636668
-QCD_Pt120to170_EM MC_QCD_Pt120to170_EMEnriched.xml 477000*0.132 35369495
-QCD_Pt170to300_EM MC_QCD_Pt170to300_EMEnriched.xml 114000*0.165 11516622
-QCD_Pt300toInf_EM MC_QCD_Pt300toInf_EMEnriched.xml 9000*0.15 7360640
-
-QCD_Pt30to80_bcToE MC_QCD_Pt30to80_bcToE.xml 159068000*0.00255 15094812
-QCD_Pt80to170_bcToE MC_QCD_Pt80to170_bcToE.xml 3221000*0.01183 14087383
-QCD_Pt170to250_bcToE MC_QCD_Pt170to250_bcToE.xml 105771*0.02492 8686936
-QCD_Pt250toInf_bcToE MC_QCD_Pt250toInf_bcToE.xml 21094.1*0.03375 9874604
-
-
-#################################################
-# The following will be used as Information Block for the xml files !
-# The UserConfig can be stored here. Later support for libs/lumi might be added
-# USERCONFIGBLOCK is the marker where it starts to read the user config!
-
-USERCONFIGBLOCK
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/SFrameBatchSteer.py b/SFrameBatchSteer.py
index cf94872..08f164b 100755
--- a/SFrameBatchSteer.py
+++ b/SFrameBatchSteer.py
@@ -1,36 +1,49 @@
#!/usr/bin/env python
-
import sys
-# the mock-0.3.1 dir contains testcase.py, testutils.py & mock.py
-
-
+# the mock-0.3.1 dir contains testcase.py, testutils.py & mock.py
-#simple script that runs several sframe batch jobs and creates everything you might need
+# simple script that runs several sframe batch jobs and creates everything you might need
if __name__ == "__main__":
debug = False
- remove = True #remove directories with old results
+ remove = True # remove directories with old results
- #put your local sfram_batch dir in search path
- sys.path.append('/nfs/dust/cms/user/gonvaq/SFrameBatch/')
- #import the main function
+ # put your local sfram_batch dir in search path
+ sys.path.append("/nfs/dust/cms/user/gonvaq/SFrameBatch/")
+ # import the main function
from sframe_batch import SFrameBatchMain
- #what you want to do, could also be done in parallel but then monitoring gets more difficult
- variations_variables = ['PU_variation']#'SF_muonID','BTag_variation',
- variations = ['up','down']
- xmlfile = "Sel.xml" # EleSel.xml
+ # what you want to do, could also be done in parallel but then monitoring gets more difficult
+ variations_variables = ["PU_variation"] # 'SF_muonID','BTag_variation',
+ variations = ["up", "down"]
+ xmlfile = "Sel.xml" # EleSel.xml
for var in variations_variables:
for value in variations:
- command_string = "-slac "+xmlfile+" -w workdir."+var+"_"+value+" -o ./"+var+"_"+value+" --ReplaceUserItem "+var+","+value
-
- if debug :print command_string.split(" ")
+ command_string = (
+ "-slac "
+ + xmlfile
+ + " -w workdir."
+ + var
+ + "_"
+ + value
+ + " -o ./"
+ + var
+ + "_"
+ + value
+ + " --ReplaceUserItem "
+ + var
+ + ","
+ + value
+ )
+
+ if debug:
+ print(command_string.split(" "))
else:
- #try:
+ # try:
SFrameBatchMain(command_string.split(" "))
"""
except:
print "SFrameBatch did crash during running:"
- print command_string
+ print command_string
sys.exit(1)
"""
diff --git a/SubmissionInfo_Class.py b/SubmissionInfo_Class.py
index ea5b6b4..3526130 100644
--- a/SubmissionInfo_Class.py
+++ b/SubmissionInfo_Class.py
@@ -1,43 +1,44 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-import subprocess
import json
-import time
+
# class for the submission information
class SubInfo(object):
- def __init__(self,name='',numberOfFiles=0,data_type='',resubmit =0):
+ def __init__(self, name="", numberOfFiles=0, data_type="", resubmit=0):
self.name = name
- self.numberOfFiles =numberOfFiles #number of expected files
+ self.numberOfFiles = numberOfFiles # number of expected files
self.data_type = data_type
- self.rootFileCounter = 0 #number of expected files
- self.status = 0 # 0: init, 1: data on disk
+ self.rootFileCounter = 0 # number of expected files
+ self.status = 0 # 0: init, 1: data on disk
self.missingFiles = []
- self.pids = ['']*numberOfFiles
- self.notFoundCounter = [0]*numberOfFiles
- self.reachedBatch = [False]*numberOfFiles
- self.jobsRunning = [False]*numberOfFiles
- self.jobsDone = [False]*numberOfFiles
- self.arrayPid = ''
- self.resubmit = [resubmit]*numberOfFiles
+ self.pids = [""] * numberOfFiles
+ self.notFoundCounter = [0] * numberOfFiles
+ self.reachedBatch = [False] * numberOfFiles
+ self.jobsRunning = [False] * numberOfFiles
+ self.jobsDone = [False] * numberOfFiles
+ self.arrayPid = ""
+ self.resubmit = [resubmit] * numberOfFiles
self.startingTime = 0
- def reset_resubmit(self,value):
- self.resubmit =[value]*self.numberOfFiles
+
+ def reset_resubmit(self, value):
+ self.resubmit = [value] * self.numberOfFiles
+
def to_JSON(self):
- json_s=json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4)
- json_s=''.join(json_s.split()) #remove line breaks and whitespace
+ json_s = json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4)
+ json_s = "".join(json_s.split()) # remove line breaks and whitespace
return json_s
- def load_Dict(self,data):
+
+ def load_Dict(self, data):
self.__dict__ = data
- def process_batchStatus(self,batch,it):
+
+ def process_batchStatus(self, batch, it):
self.jobsRunning[it] = False
self.notFoundCounter[it] += 1
if batch == 1:
- self.notFoundCounter[it]=0 # Safeguard, no action is taken if a job is not found once.
- self.reachedBatch[it] = True # Use to understand when a job reached the batch before taking any action
+ self.notFoundCounter[it] = 0 # Safeguard, no action is taken if a job is not found once.
+ self.reachedBatch[it] = True # Use to understand when a job reached the batch before taking any action
self.jobsRunning[it] = True
- #kill jobs with have an error state
+ # kill jobs with have an error state
if batch == 2:
- print "not yet implemented in ht condor! Do not remember what happens!"
+ print("not yet implemented in ht condor! Do not remember what happens!")
return -2
return batch
diff --git a/batch_classes.py b/batch_classes.py
index 9f2805d..b91725a 100644
--- a/batch_classes.py
+++ b/batch_classes.py
@@ -1,19 +1,16 @@
-#!/usr/bin/env python
-
+from __future__ import print_function
from subprocess import call
from subprocess import Popen
from subprocess import PIPE
import os
-
-from tree_checker import *
-#from fhadd import fhadd
+import io_func
SINGULARITY_IMG = os.path.expandvars("/nfs/dust/cms/user/$USER/slc6_latest.sif")
-def write_script(name,workdir,header,sl6_container=False):
- sframe_wrapper=open(workdir+'/sframe_wrapper.sh','w')
+def write_script(name, workdir, header, sl6_container=False):
+ sframe_wrapper = open(workdir + "/sframe_wrapper.sh", "w")
# For some reason, we have to manually copy across certain environment
# variables, most notably LD_LIBRARY_PATH, and if running on singularity, PATH
@@ -37,26 +34,38 @@ def write_script(name,workdir,header,sl6_container=False):
condor_notification = ''
condor_submitfile_name = workdir+'/CondorSubmitfile_'+name+'.submit'
- if(os.path.isfile(condor_submitfile_name)):
+ if os.path.isfile(condor_submitfile_name):
return
- #Make sure user does not try to submit jobs to the EL7 nodes without singularity from a environment with a SL6 SCRAM_ARCH
- if 'slc6' in os.getenv("SCRAM_ARCH") and not sl6_container:
- raise EnvironmentError("\033[91mSCRAM_ARCH shows this environment is setup for SL6. You tried to submit to EL7 nodes without using a singularity container.\n Make sure to use --sl6container to run these jobs inside singularity container.\033[0m")
+ # Make sure user does not try to submit jobs to the EL7 nodes without singularity
+ # from a environment with a SL6 SCRAM_ARCH
+ if 'slc6' in os.getenv("SCRAM_ARCH", "") and not sl6_container:
+ raise EnvironmentError("\033[91mSCRAM_ARCH shows this environment is setup for SL6."
+ + "You tried to submit to EL7 nodes without using a singularity container.\n"
+ + "Make sure to use --sl6container to run these jobs inside singularity container."
+ + "\033[0m")
worker_str = ""
# Run a SLC6 job on EL7 machine using singularity
if sl6_container:
if not os.path.isfile(SINGULARITY_IMG):
- print '\033[93m',"Please pull the SLC6 image to your NFS:",'\033[0m'
- print ""
- print '\033[93m','SINGULARITY_CACHEDIR="/nfs/dust/cms/user/$USER/singularity" singularity pull', SINGULARITY_IMG, 'docker://cmssw/slc6:latest','\033[0m'
- print ""
- raise RuntimeError("\033[91mCannot find image, %s. Do not use one from /afs or /cvmfs.\033[0m" % SINGULARITY_IMG)
+ print("\033[93m", "Please pull the SLC6 image to your NFS:", "\033[0m")
+ print("")
+ print(
+ "\033[93m",
+ 'SINGULARITY_CACHEDIR="/nfs/dust/cms/user/$USER/singularity" singularity pull',
+ SINGULARITY_IMG,
+ "docker://cmssw/slc6:latest",
+ "\033[0m",
+ )
+ print("")
+ raise RuntimeError(
+ "\033[91mCannot find image, %s. Do not use one from /afs or /cvmfs.\033[0m" % SINGULARITY_IMG
+ )
worker_str += '+MySingularityImage="'+SINGULARITY_IMG+'"\n'
worker_str += '+MySingularityArgs="--bind /tmp:/tmp"\n'
- submit_file = open(condor_submitfile_name,'w')
+ submit_file = open(condor_submitfile_name, "w")
submit_file.write(
"""#HTC Submission File for SFrameBatch
# +MyProject = "af-cms"
@@ -77,7 +86,11 @@ def write_script(name,workdir,header,sl6_container=False):
RequestDisk = """+header.DISK+"""G
#You need to set up sframe
getenv = True
-environment = "LD_LIBRARY_PATH_STORED="""+os.environ.get('LD_LIBRARY_PATH')+""" PATH_STORED="""+os.environ.get('PATH')+""""
+environment = "LD_LIBRARY_PATH_STORED="""
+ + os.environ.get('LD_LIBRARY_PATH_BACKUP')
+ + """ PATH_STORED="""
+ + os.environ.get('PATH')
+ + """"
JobBatchName = """+name+"""
executable = """+workdir+"""/sframe_wrapper.sh
MyIndex = $(Process) + 1
@@ -86,7 +99,8 @@ def write_script(name,workdir,header,sl6_container=False):
""")
submit_file.close()
-def resub_script(name,workdir,header,sl6_container=False):
+
+def resub_script(name, workdir, header, sl6_container=False):
if (header.Notification == 'as'):
condor_notification = 'Error'
elif (header.Notification == 'n'):
@@ -97,28 +111,41 @@ def resub_script(name,workdir,header,sl6_container=False):
condor_notification = ''
condor_resubmitfile_name = workdir+'/CondorSubmitfile_'+name+'.submit'
- if(os.path.isfile(condor_resubmitfile_name)):
+ if os.path.isfile(condor_resubmitfile_name):
return
- #Make sure user does not try to submit jobs to the EL7 nodes without singularity from a environment with a SL6 SCRAM_ARCH
- if 'slc6' in os.getenv("SCRAM_ARCH") and not sl6_container:
- raise EnvironmentError("\033[91mSCRAM_ARCH shows this environment is setup for SL6. You tried to submit to EL7 nodes without using a singularity container.\n Make sure to use --sl6container to run these jobs inside singularity container.\033[0m")
-
+ # Make sure user does not try to submit jobs to the EL7 nodes without singularity
+ # from a environment with a SL6 SCRAM_ARCH
+ if 'slc6' in os.getenv("SCRAM_ARCH", "") and not sl6_container:
+ raise EnvironmentError(
+ "\033[91mSCRAM_ARCH shows this environment is setup for SL6. You tried to submit to EL7 nodes "
+ + "without using a singularity container.\n Make sure to use --sl6container to run these jobs "
+ + "inside singularity container.\033[0m"
+ )
+
worker_str = ""
# Run a SLC6 job on EL7 machine using singularity
if sl6_container:
if not os.path.isfile(SINGULARITY_IMG):
- print '\033[93m',"Please pull the SLC6 image to your NFS:",'\033[0m'
- print ""
- print '\033[93m','SINGULARITY_CACHEDIR="/nfs/dust/cms/user/$USER/singularity" singularity pull', SINGULARITY_IMG, 'docker://cmssw/slc6:latest','\033[0m'
- print ""
- raise RuntimeError("\033[91mCannot find image, %s. Do not use one from /afs or /cvmfs.\033[0m" % SINGULARITY_IMG)
+ print("\033[93m", "Please pull the SLC6 image to your NFS:", "\033[0m")
+ print("")
+ print(
+ "\033[93m",
+ 'SINGULARITY_CACHEDIR="/nfs/dust/cms/user/$USER/singularity" singularity pull',
+ SINGULARITY_IMG,
+ "docker://cmssw/slc6:latest",
+ "\033[0m",
+ )
+ print("")
+ raise RuntimeError(
+ "\033[91mCannot find image, %s. Do not use one from /afs or /cvmfs.\033[0m" % SINGULARITY_IMG
+ )
worker_str += '+MySingularityImage="'+SINGULARITY_IMG+'"\n'
worker_str += '+MySingularityArgs="--bind /tmp:/tmp"\n'
- submitfile = open(condor_resubmitfile_name,'w')
+ submitfile = open(condor_resubmitfile_name, "w")
submitfile.write(
-"""#HTC Submission File for SFrameBatch
+ """#HTC Submission File for SFrameBatch
# +MyProject = "af-cms"
Requirements = ( OpSysAndVer == "CentOS7" )
""" + worker_str + """
@@ -138,7 +165,11 @@ def resub_script(name,workdir,header,sl6_container=False):
RequestDisk = """+header.DISK+"""G
#You need to set up sframe
getenv = True
-environment = "LD_LIBRARY_PATH_STORED="""+os.environ.get('LD_LIBRARY_PATH')+""" PATH_STORED="""+os.environ.get('PATH')+""""
+environment = "LD_LIBRARY_PATH_STORED="""
+ + os.environ.get('LD_LIBRARY_PATH_BACKUP')
+ + """ PATH_STORED="""
+ + os.environ.get('PATH')
+ + """"
JobBatchName = """+name+"""
executable = """+workdir+"""/sframe_wrapper.sh
arguments = """+name+""".xml
@@ -146,55 +177,81 @@ def resub_script(name,workdir,header,sl6_container=False):
""")
submitfile.close()
-def submit_qsub(NFiles,Stream,name,workdir):
- #print '-t 1-'+str(int(NFiles))
- #call(['ls','-l'], shell=True)
+
+def submit_qsub(NFiles, Stream, name, workdir):
if not os.path.exists(Stream):
os.makedirs(Stream)
- print Stream+' has been created'
+ print(Stream+' has been created')
+ proc_qstat = Popen(
+ [
+ "condor_submit"
+ + " "
+ + workdir
+ + "/CondorSubmitfile_"
+ + name
+ + ".submit"
+ + ' -a "Stream='
+ + Stream.split("/")[1]
+ + '" -a "queue '
+ + str(NFiles)
+ + '"'
+ ],
+ shell=True,
+ stdout=PIPE,
+ )
+ return (str(proc_qstat.communicate()[0].split()[7], 'utf-8')).split('.')[0]
- #call(['qsub'+' -t 1-'+str(NFiles)+' -o '+Stream+'/'+' -e '+Stream+'/'+' '+workdir+'/split_script_'+name+'.sh'], shell=True)
- # proc_qstat = Popen(['condor_qsub'+' -t 1-'+str(NFiles)+' -o '+Stream+'/'+' -e '+Stream+'/'+' '+workdir+'/split_script_'+name+'.sh'],shell=True,stdout=PIPE)
- # return (proc_qstat.communicate()[0].split()[2]).split('.')[0]
- proc_qstat = Popen(['condor_submit'+' '+workdir+'/CondorSubmitfile_'+name+'.submit'+' -a "Stream='+Stream.split('/')[1]+'" -a "queue '+str(NFiles)+'"'],shell=True,stdout=PIPE)
- return (proc_qstat.communicate()[0].split()[7]).split('.')[0]
-
-def resubmit(Stream,name,workdir,header,sl6_container):
- #print Stream ,name
- resub_script(name,workdir,header,sl6_container)
+def resubmit(Stream, name, workdir, header, sl6_container):
+ resub_script(name, workdir, header, sl6_container)
if not os.path.exists(Stream):
os.makedirs(Stream)
- print Stream+' has been created'
- #call(['qsub'+' -o '+Stream+'/'+' -e '+Stream+'/'+' '+workdir+'/split_script_'+name+'.sh'], shell=True)
- # proc_qstat = Popen(['condor_qsub'+' -o '+Stream+'/'+' -e '+Stream+'/'+' '+workdir+'/split_script_'+name+'.sh'],shell=True,stdout=PIPE)
- # return proc_qstat.communicate()[0].split()[2]
- proc_qstat = Popen(['condor_submit'+' '+workdir+'/CondorSubmitfile_'+name+'.submit'+' -a "Stream='+Stream.split('/')[1]+'"'],shell=True,stdout=PIPE)
- return (proc_qstat.communicate()[0].split()[7]).split('.')[0]
-
-def add_histos(directory,name,NFiles,workdir,outputTree, onlyhists,outputdir):
+ print(Stream+' has been created')
+ proc_qstat = Popen(
+ [
+ "condor_submit"
+ + " "
+ + workdir
+ + "/CondorSubmitfile_"
+ + name
+ + ".submit"
+ + ' -a "Stream='
+ + Stream.split("/")[1]
+ + '"'
+ ],
+ shell=True,
+ stdout=PIPE,
+ )
+ return (str(proc_qstat.communicate()[0].split()[7], 'utf-8')).split('.')[0]
+
+
+def add_histos(directory, name, NFiles, workdir, outputTree, onlyhists, outputdir):
if not os.path.exists(outputdir):
os.makedirs(outputdir)
FNULL = open(os.devnull, 'w')
if os.path.exists(directory+name+'.root'):
call(['rm '+directory+name+'.root'], shell=True)
- string=''
+ string = ""
proc = None
position = -1
- command_string = 'nice -n 10 hadd ' # -v 1 ' # the -v stopped working in root 6.06/01 now we get a lot of crap
- if onlyhists: command_string += '-T '
- if(outputTree):
+ command_string = "nice -n 10 hadd " # -v 1 ' # the -v stopped working in root 6.06/01 now we get a lot of crap
+ if onlyhists:
+ command_string += "-T "
+ if outputTree:
for i in range(NFiles):
- if check_TreeExists(directory+workdir+'/'+name+'_'+str(i)+'.root',outputTree) and position ==-1:
+ if (
+ io_func.check_TreeExists(directory + workdir + "/" + name + "_" + str(i) + ".root", outputTree)
+ and position == -1
+ ):
position = i
- string+=str(i)
+ string += str(i)
break
for i in range(NFiles):
if not position == i and not position == -1:
string += ','+str(i)
- elif position ==-1:
+ elif position == -1:
string += str(i)
position = 0
@@ -204,12 +261,13 @@ def add_histos(directory,name,NFiles,workdir,outputTree, onlyhists,outputdir):
else:
source_files = directory+workdir+'/'+name+'_'+string+'.root'
- #print command_string+directory+name+'.root '+source_files
- #print outputdir+'/hadd.log'
if not string.isspace():
- proc = Popen([str(command_string+directory+name+'.root '+source_files+' > '+outputdir+'/hadd.log')], shell=True, stdout=FNULL, stderr=FNULL)
+ proc = Popen(
+ [str(command_string + directory + name + ".root " + source_files + " > " + outputdir + "/hadd.log")],
+ shell=True,
+ stdout=FNULL,
+ stderr=FNULL,
+ )
else:
- print 'Nothing to merge for',name+'.root'
+ print('Nothing to merge for', name, '.root')
return proc
-
-
diff --git a/io_func.py b/io_func.py
index 5c5ee83..9f81d1a 100644
--- a/io_func.py
+++ b/io_func.py
@@ -1,144 +1,132 @@
-#!/usr/bin/env python
-
-#python classes
-#import xml.dom.minidom
-
-#import glob
-#import getopt
-
-from xml.dom.minidom import parse, parseString
from xml.dom.minidom import Document
-import xml.sax
-
+import os
import math
-import time
-import ROOT
import copy
import warnings
+from xml.dom.minidom import parseString
+import uproot
-ROOT.PyConfig.IgnoreCommandLineOptions = True
-#my classes
-from Inf_Classes import *
-from batch_classes import *
-
-def write_job(Job,Version=-1,SkipEvents=0,MaxEvents=-1,NFile=None, FileSplit=-1,workdir="workdir",LumiWeight=1):
+def write_job(Job, Version=-1, SkipEvents=0, MaxEvents=-1, NFile=None, FileSplit=-1, workdir="workdir", LumiWeight=1):
doc = Document()
root = doc.createElement("JobConfiguration")
- root.setAttribute( 'JobName', Job.JobName)
- root.setAttribute( 'OutputLevel', Job.OutputLevel)
-
+ root.setAttribute("JobName", Job.JobName)
+ root.setAttribute("OutputLevel", Job.OutputLevel)
+
for lib in Job.Libs:
# Create Element
tempChild = doc.createElement('Library')
root.appendChild(tempChild)
# Set Attr.
- tempChild.setAttribute( 'Name', lib)
-
+ tempChild.setAttribute("Name", lib)
+
for pack in Job.Packs:
# Create Element
- tempChild = doc.createElement('Package')
+ tempChild = doc.createElement("Package")
root.appendChild(tempChild)
# Set Attr.
- tempChild.setAttribute( 'Name', pack)
-
+ tempChild.setAttribute("Name", pack)
+
for cycle in Job.Job_Cylce:
# Create Element
- tempChild = doc.createElement('Cycle')
+ tempChild = doc.createElement("Cycle")
root.appendChild(tempChild)
# Set Attr.
- tempChild.setAttribute( 'Name', cycle.Cyclename)
-
- if not os.path.exists(cycle.OutputDirectory+'/'+workdir+'/') and "__NOTSET__" not in cycle.OutputDirectory:
- os.makedirs(cycle.OutputDirectory+'/'+workdir+'/')
- tempChild.setAttribute('OutputDirectory', cycle.OutputDirectory+'/'+workdir+'/')
-
- if NFile is not None and NFile is not -1:
- tempChild.setAttribute('PostFix', cycle.PostFix+'_'+str(NFile))
- tempChild.setAttribute('TargetLumi', cycle.TargetLumi)
-
+ tempChild.setAttribute("Name", cycle.Cyclename)
+
+ if (
+ not os.path.exists(cycle.OutputDirectory + "/" + workdir + "/")
+ and "__NOTSET__" not in cycle.OutputDirectory
+ ):
+ os.makedirs(cycle.OutputDirectory + "/" + workdir + "/")
+ tempChild.setAttribute("OutputDirectory", cycle.OutputDirectory + "/" + workdir + "/")
+
+ if NFile is not None and NFile is not -1:
+ tempChild.setAttribute("PostFix", cycle.PostFix + "_" + str(NFile))
+ tempChild.setAttribute("TargetLumi", cycle.TargetLumi)
+
cycleLumiWeight = LumiWeight if cycle.usingSFrameWeight else 1.
for p in range(len(cycle.Cycle_InputData)):
version_check = True
- if(Version!=-1):
+ if Version != -1:
version_check = False
for entry in Version:
- if(cycle.Cycle_InputData[p].Version==entry):
+ if cycle.Cycle_InputData[p].Version == entry:
version_check = True
- if not version_check: continue;
+ if not version_check:
+ continue
# Create Element
- InputGrandchild= doc.createElement('InputData')
+ InputGrandchild = doc.createElement("InputData")
tempChild.appendChild(InputGrandchild)
-
- InputGrandchild.setAttribute('Lumi', str(float(cycle.Cycle_InputData[p].Lumi)*cycleLumiWeight))
- InputGrandchild.setAttribute('Type', cycle.Cycle_InputData[p].Type)
- InputGrandchild.setAttribute('Version', cycle.Cycle_InputData[p].Version)
- if FileSplit!=-1:
- InputGrandchild.setAttribute('Cacheable', 'False')
+
+ InputGrandchild.setAttribute("Lumi", str(float(cycle.Cycle_InputData[p].Lumi) * cycleLumiWeight))
+ InputGrandchild.setAttribute("Type", cycle.Cycle_InputData[p].Type)
+ InputGrandchild.setAttribute("Version", cycle.Cycle_InputData[p].Version)
+ if FileSplit != -1:
+ InputGrandchild.setAttribute("Cacheable", "False")
else:
- InputGrandchild.setAttribute('Cacheable', cycle.Cycle_InputData[p].Cacheable)
+ InputGrandchild.setAttribute("Cacheable", cycle.Cycle_InputData[p].Cacheable)
InputGrandchild.setAttribute('NEventsSkip', str(SkipEvents))
- InputGrandchild.setAttribute('NEventsMax', str(MaxEvents))
-
- count_i =-1
- #print len(cycle.Cycle_InputData[p].io_list)
+ InputGrandchild.setAttribute("NEventsMax", str(MaxEvents))
+
+ count_i = -1
for entry in cycle.Cycle_InputData[p].io_list.FileInfoList:
- count_i +=1
+ count_i += 1
if FileSplit > 0:
- if not (count_i<(NFile+1)*FileSplit and count_i>= NFile*FileSplit):
+ if not (count_i < (NFile + 1) * FileSplit and count_i >= NFile * FileSplit):
continue
- Datachild= doc.createElement(entry[0])
+ Datachild = doc.createElement(entry[0])
InputGrandchild.appendChild(Datachild)
- for it in range(1,len(entry),2):
- #print entry[it],entry[it+1]
- Datachild.setAttribute(entry[it],entry[it+1])
-
+ for it in range(1, len(entry), 2):
+ Datachild.setAttribute(entry[it], entry[it + 1])
+
for entry in cycle.Cycle_InputData[p].io_list.other:
- Datachild= doc.createElement(entry[0])
+ Datachild = doc.createElement(entry[0])
InputGrandchild.appendChild(Datachild)
- for it in range(1,len(entry),2):
- #print entry[it],entry[it+1]
- Datachild.setAttribute(entry[it],entry[it+1])
- if len(cycle.Cycle_InputData[p].io_list.InputTree)!=3:
- print 'something wrong with the InputTree, lenght',len(cycle.Cycle_InputData[p].io_list.InputTree)
- print cycle.Cycle_InputData[p].io_list.InputTree
- print 'going to exit'
+ for it in range(1, len(entry), 2):
+ Datachild.setAttribute(entry[it], entry[it + 1])
+ if len(cycle.Cycle_InputData[p].io_list.InputTree) != 3:
+ print("something wrong with the InputTree, lenght", len(cycle.Cycle_InputData[p].io_list.InputTree))
+ print(cycle.Cycle_InputData[p].io_list.InputTree)
+ print("going to exit")
exit(0)
-
- Datachild= doc.createElement(cycle.Cycle_InputData[p].io_list.InputTree[0])
+
+ Datachild = doc.createElement(cycle.Cycle_InputData[p].io_list.InputTree[0])
InputGrandchild.appendChild(Datachild)
- Datachild.setAttribute(cycle.Cycle_InputData[p].io_list.InputTree[1],cycle.Cycle_InputData[p].io_list.InputTree[2])
-
- #InGrandGrandchild= doc.createElement('In')
- ConfigGrandchild = doc.createElement('UserConfig')
+ Datachild.setAttribute(
+ cycle.Cycle_InputData[p].io_list.InputTree[1], cycle.Cycle_InputData[p].io_list.InputTree[2]
+ )
+
+ # InGrandGrandchild= doc.createElement('In')
+ ConfigGrandchild = doc.createElement("UserConfig")
tempChild.appendChild(ConfigGrandchild)
for item in cycle.Cycle_UserConf:
- ConfigGrandGrandchild = doc.createElement('Item')
+ ConfigGrandGrandchild = doc.createElement("Item")
ConfigGrandchild.appendChild(ConfigGrandGrandchild)
- ConfigGrandGrandchild.setAttribute('Name',item.Name)
- ConfigGrandGrandchild.setAttribute('Value',item.Value)
+ ConfigGrandGrandchild.setAttribute("Name", item.Name)
+ ConfigGrandGrandchild.setAttribute("Value", item.Value)
return root.toprettyxml()
class fileheader(object):
- def __init__(self,xmlfile):
+ def __init__(self, xmlfile):
f = open(xmlfile)
line = f.readline()
self.header = []
self.Version = []
- self.AutoResubmit =0
+ self.AutoResubmit = 0
self.MaxJobsPerProcess = -1
self.RemoveEmptyFileSplit = False
- while ' 0
+ except BaseException as e:
+ print(e)
+ return False
-def get_number_of_events(Job, Version, atleastOneEvent = False):
- InputData = filter(lambda inp: inp.Version==Version[0], Job.Job_Cylce[0].Cycle_InputData)[0]
+
+def get_number_of_events(Job, Version, atleastOneEvent=False):
+ InputData = list(filter(lambda inp: inp.Version == Version[0], Job.Job_Cylce[0].Cycle_InputData))[0]
NEvents = 0
- if len(InputData.io_list.FileInfoList[:])<5:
- atleastOneEvent=False
- if ( any(['root://' in entry[entry.index("FileName")+1] for entry in InputData.io_list.FileInfoList[:]]) and os.environ.get("X509_USER_PROXY","")==""):
- print "\033[93m You are trying to access at least one file via xrootd and did not set X509_USER_PROXY."
- print "Before executing voms-proxy-init, you need to set this variable to somewhere suitable, e.g.:"
- print "export X509_USER_PROXY=$HOME/X509UserProxy.pem"
- print "HTCondor will pick up the full environment including this and will be able to access the files via xrootd.\033[0m"
+ if len(InputData.io_list.FileInfoList[:]) < 5:
+ atleastOneEvent = False
+ if (
+ any(["root://" in entry[entry.index("FileName") + 1] for entry in InputData.io_list.FileInfoList[:]])
+ and os.environ.get("X509_USER_PROXY", "") == ""
+ ):
+ print("\033[93m You are trying to access at least one file via xrootd and did not set X509_USER_PROXY.")
+ print("Before executing voms-proxy-init, you need to set this variable to somewhere suitable, e.g.:")
+ print("export X509_USER_PROXY=$HOME/X509UserProxy.pem")
+ print(
+ "HTCondor will pick up the full environment including this and will be able to access the files via xrootd."
+ + "\033[0m"
+ )
for entry in InputData.io_list.FileInfoList[:]:
- for name in entry:
- if name.strip().endswith('.root'):
- try:
- f = ROOT.TFile.Open(name)
- n = f.Get(str(InputData.io_list.InputTree[2])).GetEntriesFast()
- if n < 1:
- InputData.io_list.FileInfoList.remove(entry)
- break
- else:
- NEvents += n
- if atleastOneEvent:
- return 1
- f.Close()
- except:
- warnings.warn('\033[93m \n File:\n'+str(name) + '\ndoes not contain an InputTree or file is zombie/broken!\n\033[0m')
+ for name in entry:
+ if name.strip().endswith('.root'):
+ try:
+ f = uproot.open(name)
+ n = f[str(InputData.io_list.InputTree[2])].num_entries
+ if n < 1:
+ InputData.io_list.FileInfoList.remove(entry)
+ break
+ else:
+ NEvents += n
+ if atleastOneEvent:
+ return 1
+ f.close()
+ except BaseException as e:
+ warnings.warn(
+ '\033[93m \n File:\n'
+ + str(name)
+ + '\ndoes not contain an InputTree or file is zombie/broken!\n\033[0m')
+ warnings.warn(e)
return NEvents
-def write_all_xml(path,datasetName,header,Job,workdir):
- NEventsBreak= header.NEventsBreak
- FileSplit=header.FileSplit
+
+def write_all_xml(path, datasetName, header, Job, workdir):
+ NEventsBreak = header.NEventsBreak
+ FileSplit = header.FileSplit
FileSplitCompleteRemove = header.RemoveEmptyFileSplit
MaxJobs = header.MaxJobsPerProcess
- NFiles=0
+ NFiles = 0
- Version =datasetName
- if Version[0] =='-1':Version =-1
+ Version = datasetName
+ if Version[0] == "-1":
+ Version = -1
- if NEventsBreak!=0 and FileSplit<=0:
+ if NEventsBreak != 0 and FileSplit <= 0:
NEvents = get_number_of_events(Job, Version)
- if NEvents<=0:
- print Version[0],'has no InputTree'
+ if NEvents <= 0:
+ print(Version[0], "has no InputTree")
return NFiles
- print '%s: %i events' % (Version[0], NEvents)
+ print("%s: %i events" % (Version[0], NEvents))
NFiles = int(math.ceil(NEvents / float(NEventsBreak)))
if NFiles > MaxJobs and MaxJobs > 0:
- print 'Too many Jobs, changing NEventsBreak mode'
- print 'Max number of Jobs',MaxJobs,'Number of xml-Files per Job',NFiles
- NEventsBreak = int(math.ceil(NEvents/float(MaxJobs)))
+ print('Too many Jobs, changing NEventsBreak mode')
+ print("Max number of Jobs", MaxJobs, "Number of xml-Files per Job", NFiles)
+ NEventsBreak = int(math.ceil(NEvents / float(MaxJobs)))
SkipEvents = NEventsBreak
- MaxEvents = NEventsBreak
-
- for i in xrange(NFiles):
- if i*SkipEvents >= NEvents:
- break
- if (i+1)*MaxEvents >= NEvents:
- MaxEvents = NEvents-i*SkipEvents
- LumiWeight = float(NEvents)/float(MaxEvents)
- outfile = open(path+'_'+str(i+1)+'.xml','w+')
+ MaxEvents = NEventsBreak
+
+ for i in range(NFiles):
+ if i * SkipEvents >= NEvents:
+ break
+ if (i + 1) * MaxEvents >= NEvents:
+ MaxEvents = NEvents - i * SkipEvents
+ LumiWeight = float(NEvents) / float(MaxEvents)
+ outfile = open(path + "_" + str(i + 1) + ".xml", "w+")
for line in header.header:
outfile.write(line)
- outfile.write(write_job(Job,Version,i*SkipEvents,MaxEvents,i,-1,workdir,LumiWeight))
+ outfile.write(write_job(Job, Version, i * SkipEvents, MaxEvents, i, -1, workdir, LumiWeight))
outfile.close()
-
- elif FileSplit>0:
+
+ elif FileSplit > 0:
for entry in Version:
- NEvents = get_number_of_events(Job,[entry], not FileSplitCompleteRemove)
+ NEvents = get_number_of_events(Job, [entry], not FileSplitCompleteRemove)
if NEvents <= 0:
- print 'No entries found for',entry,'Going to ignore this sample.'
+ print("No entries found for", entry, "Going to ignore this sample.")
continue
- print 'Splitting job by files',entry
+ print("Splitting job by files", entry)
for cycle in Job.Job_Cylce:
- for p in xrange(len(cycle.Cycle_InputData)):
- if(cycle.Cycle_InputData[p].Version==entry) or Version ==-1:
- Total_xml = len(cycle.Cycle_InputData[p].io_list.FileInfoList)
- numberOfJobs = int(math.ceil(float(Total_xml)/FileSplit))
+ for p in range(len(cycle.Cycle_InputData)):
+ if (cycle.Cycle_InputData[p].Version == entry) or Version == -1:
+ Total_xml = len(cycle.Cycle_InputData[p].io_list.FileInfoList)
+ numberOfJobs = int(math.ceil(float(Total_xml) / FileSplit))
numberOfSplits = FileSplit
- if numberOfJobs > MaxJobs and MaxJobs >0 :
- numberOfSplits = int(math.ceil(float(Total_xml)/MaxJobs))
- numberOfJobs = int(math.ceil(float(Total_xml)/numberOfSplits))
- print 'More than',MaxJobs,'Jobs. Changing FileSplit mode'
- print 'New number of Jobs',numberOfJobs,'Number of xml-Files per Job',numberOfSplits
+ if numberOfJobs > MaxJobs and MaxJobs > 0:
+ numberOfSplits = int(math.ceil(float(Total_xml) / MaxJobs))
+ numberOfJobs = int(math.ceil(float(Total_xml) / numberOfSplits))
+ print("More than", MaxJobs, "Jobs. Changing FileSplit mode")
+ print("New number of Jobs", numberOfJobs, "Number of xml-Files per Job", numberOfSplits)
for it in range(numberOfJobs):
- outfile = open(path+'_'+str(it+1)+'.xml','w+')
+ outfile = open(path + "_" + str(it + 1) + ".xml", "w+")
for line in header.header:
outfile.write(line)
- outfile.write(write_job(Job,Version,0,-1,it,numberOfSplits,workdir))
+ outfile.write(write_job(Job, Version, 0, -1, it, numberOfSplits, workdir))
outfile.close()
- NFiles+=1
+ NFiles += 1
else:
- NFiles+=1
- outfile = open(path+'_OneCore'+'.xml','w+')
+ NFiles += 1
+ outfile = open(path + "_OneCore" + ".xml", "w+")
for line in header.header:
outfile.write(line)
- outfile.write(write_job(Job,Version,0,-1,"",0,workdir))
+ outfile.write(write_job(Job, Version, 0, -1, "", 0, workdir))
outfile.close()
return NFiles
-def result_info(Job, path, header, other = []):
- #get a xml file with all the infomartion that you need to proced
+def result_info(Job, path, header, other=[]):
+ # get a xml file with all the infomartion that you need to proced
ResultJob = copy.deepcopy(Job)
for cycle in ResultJob.Job_Cylce:
for inputdata in cycle.Cycle_InputData:
- #print inputdata.io_list.InputTree
- #print inputdata.io_list.other
output_exist = False
other_index = 0
for listoflists in inputdata.io_list.other:
for part in listoflists:
- if part == 'OutputTree':
+ if part == "OutputTree":
output_exist = True
break
- other_index +=1
+ other_index += 1
if not output_exist:
return 0
- if len(inputdata.io_list.FileInfoList)==0:
+ if len(inputdata.io_list.FileInfoList) == 0:
continue
- inputdata.io_list.FileInfoList = [['In','Lumi',inputdata.io_list.FileInfoList[0][2],'FileName',cycle.OutputDirectory+"/"+path+"/uhh2.AnalysisModuleRunner.*."+inputdata.Version+"_*.root"]]
- inputdata.io_list.InputTree =['InputTree','Name',inputdata.io_list.other[other_index][2]]
+ inputdata.io_list.FileInfoList = [
+ [
+ "In",
+ "Lumi",
+ inputdata.io_list.FileInfoList[0][2],
+ "FileName",
+ cycle.OutputDirectory
+ + "/"
+ + path
+ + "/uhh2.AnalysisModuleRunner.*."
+ + inputdata.Version
+ + "_*.root",
+ ]
+ ]
+ inputdata.io_list.InputTree = ["InputTree", "Name", inputdata.io_list.other[other_index][2]]
if not other:
inputdata.io_list.other = []
else:
if len(other) == 1:
- if other[0] =="-1":
- inputdata.io_list.other = [['OutputTree','Name',inputdata.io_list.other[other_index][2]]]
+ if other[0] == "-1":
+ inputdata.io_list.other = [["OutputTree", "Name", inputdata.io_list.other[other_index][2]]]
else:
- inputdata.io_list.other = [['OutputTree','Name',other[0]]]
+ inputdata.io_list.other = [["OutputTree", "Name", other[0]]]
else:
inputdata.io_list.other = other
-
+
for cycle_item in cycle.Cycle_UserConf:
if cycle_item.Name == 'AnalysisModule':
cycle_item.Value = "__NOTSET__"
cycle.OutputDirectory = "__NOTSET__"
- outfile = open(path+'/Result.xml','w')
+ outfile = open(path + "/Result.xml", "w")
for line in header.header:
outfile.write(line)
- outfile.write(write_job(ResultJob,-1,0,-1,None,0,""))
+ outfile.write(write_job(ResultJob, -1, 0, -1, None, 0, ""))
outfile.close()
-
- return 1
-
-
-
+ return 1
diff --git a/missing_files_runner.py b/missing_files_runner.py
index e63fb3b..e983a5a 100644
--- a/missing_files_runner.py
+++ b/missing_files_runner.py
@@ -1,35 +1,36 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+import sys
+import time
from subprocess import call
import multiprocessing
+
def run_missing_files(missing_files, worker=1):
- f = open(missing_files,'r')
+ f = open(missing_files, "r")
missing_xml = []
for line in f:
missing_xml.append(line.split(" ")[-1])
pool = multiprocessing.Pool(processes=int(worker))
- result = pool.map_async(sframe_call,missing_xml)
+ result = pool.map_async(sframe_call, missing_xml)
pool.close()
pool.join()
- print 'missing xml files', len(missing_xml)
- while result._number_left>0:
- sys.stdout.write("\033[F")
- missing = round(float(result._number_left)*float(result._chunksize)/float(len(missing_xml))*100)
- if(missing > 100):
- missing =100
- print "Missing [%]", missing
- time.sleep(10)
+ print("missing xml files", len(missing_xml))
+ while result._number_left > 0:
+ sys.stdout.write("\033[F")
+ missing = round(float(result._number_left) * float(result._chunksize) / float(len(missing_xml)) * 100)
+ if missing > 100:
+ missing = 100
+ print("Missing [%]", missing)
+ time.sleep(10)
+ print("done with", missing_files)
-
- print 'done with',missing_files
-
def sframe_call(xml):
- call(['sframe_main '+xml], shell=True)
+ call(["sframe_main " + xml], shell=True)
if __name__ == "__main__":
diff --git a/pythonSFB.sh b/pythonSFB.sh
new file mode 100755
index 0000000..82a6430
--- /dev/null
+++ b/pythonSFB.sh
@@ -0,0 +1,16 @@
+#!/bin/bash
+
+SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
+VENVDIR=$SCRIPT_DIR/pythonSFB
+VENVPYTHON=$VENVDIR/bin/python
+
+if [ ! -f $VENVPYTHON ]; then
+ echo "py3 venv for SFrameBatch execution does not exists. creating it now..."
+ env -u LD_LIBRARY_PATH /usr/bin/python3 -m venv $VENVDIR
+ source $VENVDIR/bin/activate
+ env -u LD_LIBRARY_PATH pip install -r $SCRIPT_DIR/requirements.txt
+ deactivate
+fi
+
+export LD_LIBRARY_PATH_BACKUP=$LD_LIBRARY_PATH
+env -u LD_LIBRARY_PATH "$VENVPYTHON" "$@"
diff --git a/readaMCatNloEntries.py b/readaMCatNloEntries.py
deleted file mode 100755
index c38737d..0000000
--- a/readaMCatNloEntries.py
+++ /dev/null
@@ -1,92 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-# usage of script: number of cores, first file, second file, ..., method to use True==Fast, False==counting weights
-# python readaMCatNLO.py 4 ../../common/datasets/MC_WJet.xml True/False
-
-
-
-import sys, multiprocessing, time
-from ROOT import *
-
-def read_xml(xmlFileDir):
- xmlFile = open(str(xmlFileDir))
- rootFileStore = []
- comment =False
- for line in xmlFile:
- if '':
- comment = True
- continue
- if '-->' in line:
- comment=False
- continue
- rootFileStore.append(line.split('"')[1])
- return rootFileStore
-
-def write_xml_entry_tag(xmlFile,result,fast):
- xmlFile = open(str(xmlFile),'a')
- method = 'weights'
- if(fast):method = 'fast'
- xmlFile.write('')
-
-def read_tree(rootDir):
- numberOfweightedEntries = 0
- try:
- ntuple = TFile(str(rootDir))
- AnalysisTree = ntuple.Get("AnalysisTree")
- for event in AnalysisTree:
- #numberOfweightedEntries+=event.m_weights[0]
- for value in event.m_weights:
- numberOfweightedEntries += value
- except Exception as e:
- print 'unable to count events in root file',rootDir
- print e
- return numberOfweightedEntries
-
-def read_treeFast(rootDir):
- fastentries =0
- try:
- ntuple = TFile(str(rootDir))
- AnalysisTree = ntuple.Get("AnalysisTree")
- fastentries = AnalysisTree.GetEntriesFast()
- except Exception as e:
- print 'unable to count events in root file',rootDir
- print e
- return fastentries
-
-def readEntries(worker, xmlfiles, fast=False):
- if fast: print 'Going to use the Fast Method, no weights used'
- print "number of workers",worker
- sum_list =[]
- for xml in xmlfiles:
- pool = multiprocessing.Pool(processes=int(worker))
- print "open XML file:",xml
- rootFileStore = read_xml(xml)
- numberXMLFiles = len(rootFileStore)
- result = None
- if(fast):
- result = pool.map_async(read_treeFast,rootFileStore)
- else:
- result = pool.map_async(read_tree,rootFileStore)
-
- print result._number_left ,numberXMLFiles,result._chunksize
- while result._number_left>0:
- sys.stdout.write("\033[F")
- #print result._number_left ,numberXMLFiles,result._chunksize
- missing = round(float(result._number_left)*float(result._chunksize)/float(numberXMLFiles)*100)
- if(missing > 100):
- missing =100
- print "Missing [%]", missing
- time.sleep(10)
- pool.close()
- pool.join()
- xml_result = sum(result.get())
- print "number of events in",xml,xml_result
- sum_list.append(xml_result)
- write_xml_entry_tag(xml,xml_result, fast)
- return sum_list
-
-
-if __name__ == "__main__":
- readEntries(sys.argv[1],sys.argv[2:-1], bool(sys.argv[-1]))
-
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..bd9d4b4
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,2 @@
+numpy==1.19.5
+uproot==4.3.7
diff --git a/sframe_batch.py b/sframe_batch.py
index 6fdc51d..5dd46c2 100755
--- a/sframe_batch.py
+++ b/sframe_batch.py
@@ -1,21 +1,22 @@
-#!/usr/bin/env python
+#!/usr/bin/env pythonSFB.sh
# -*- coding: utf-8 -*-
-
+from __future__ import print_function
from optparse import OptionParser
-from argparse import ArgumentParser
-from xml.dom.minidom import parse, parseString
+from xml.dom.minidom import parse
import xml.sax
import os
import sys
import shutil
import timeit
-import StringIO
+from io import StringIO
import subprocess
-#import multiprocessing
-from Manager import *
-from LumiCalcAutoBuilder import *
-from missing_files_runner import *
+import Manager
+import missing_files_runner
+import io_func
+import Inf_Classes
+import time
+
def SFrameBatchMain(input_options):
parser = OptionParser(usage="usage: %prog [options] filename",
@@ -31,17 +32,17 @@ def SFrameBatchMain(input_options):
default="",
help="Overwrite the place where to store the output.")
parser.add_option("-s", "--submit",
- action="store_true", # optional because action defaults to "store"
+ action="store_true", # optional because action defaults to "store"
dest="submit",
default=False,
help="Submit Jobs to the grid")
parser.add_option("-r", "--resubmit",
- action="store_true", # optional because action defaults to "store"
+ action="store_true", # optional because action defaults to "store"
dest="resubmit",
default=False,
help="Resubmit Jobs were no files are found in the OutputDir/workdir .")
parser.add_option("-l", "--loopCheck",
- action="store_true", # optional because action defaults to "store"
+ action="store_true", # optional because action defaults to "store"
dest="loop",
default=False,
help="Look which jobs finished and where transfered to your storage device.")
@@ -49,14 +50,14 @@ def SFrameBatchMain(input_options):
action="store_true",
dest="add",
default=False,
- help="hadd files to one")
+ help="hadd files to one")
parser.add_option("-T", "--addFilesNoTree",
action="store_true",
dest="addNoTree",
default=False,
- help="hadd files to one, without merging TTrees. Can be combined with -f.")
+ help="hadd files to one, without merging TTrees. Can be combined with -f.")
parser.add_option("-f", "--forceMerge",
- action="store_true", # optional because action defaults to "store"
+ action="store_true", # optional because action defaults to "store"
dest="forceMerge",
default=False,
help="Force to hadd the root files from the workdir into the ouput directory.")
@@ -64,7 +65,8 @@ def SFrameBatchMain(input_options):
action="store_true",
dest="waitMerge",
default=False,
- help="Wait for all merging subprocess to finish before exiting program. All the subprocesses that finish in the meantime become zombies until the main program finishes.")
+ help="Wait for all merging subprocess to finish before exiting program. All the subprocesses that"
+ + "finish in the meantime become zombies until the main program finishes.")
parser.add_option("-k", "--keepGoing",
action="store_true",
dest="keepGoing",
@@ -77,31 +79,32 @@ def SFrameBatchMain(input_options):
help="Never ask for user input, but exit instead. (Overwrites keepGoing)")
parser.add_option("--ReplaceUserItem",
action="append",
- dest="useritemlist",
+ dest="useritemlist",
default=[],
- help="Replace Items in UserConfig, for more then one just add as many times the command as you need. Nice for uncertainties. Usage --ReplaceUserItem \"Name,Value\""
+ help="Replace Items in UserConfig, for more then one just add as many times the command "
+ + "as you need. Nice for uncertainties. Usage --ReplaceUserItem \"Name,Value\""
)
parser.add_option("--addTree",
action="append",
dest="sframeTreeInfo",
default=[],
- help="This is used if you need to add something to all the InputData Childs (e.g an OutputTree) in the Result.xml file. If only one argument is passed it is assumed to be the name of the OutputTree."
+ help="This is used if you need to add something to all the InputData Childs (e.g an OutputTree)"
+ + " in the Result.xml file. If only one argument is passed it is assumed to be "
+ + "the name of the OutputTree."
)
parser.add_option("--RemoveEmptyFiles",
action="store_true",
dest="FileSplitFileCheck",
- help="Force to remove empty files in FileSplit mode. This is only necessary after a Selection where there many Files with no entries at all or only very few. This might lead to sframe crashing."
+ help="Force to remove empty files in FileSplit mode. This is only necessary after "
+ + "a Selection where there many Files with no entries at all or only very few."
+ + " This might lead to sframe crashing."
)
parser.add_option("-n", "--numberWorker",
action="store",
dest="numberOfWorker",
default="1",
- help="Specify how many workers you want to have. Usefull at the moment to run missing_files.txt in parallel on one machine.")
- parser.add_option("--XMLDatabase",
- action="store",
- dest="xmldatabaseDir",
- help="This command creates from a data file the new sframe_main xml file calculating the lumi from the number of events and a given cross section. You can specify the number of events (weighted). Else it will try to find the number of events at the end of the xml Files (stored in the dateset directory) or use a small python script to read the number of entries from the trees. If it has to read the number of entries from the trees you need to specify how many cores it should use and also the method to be used. True == Fast / False == weights. Example can be found as DatabaseExample. USERCONFIG is not filled and needs to be done manually. Usage: sframe_batch --XMLDatabase DATABASE_DIR FILENAME_TO_STORE."
- )
+ help="Specify how many workers you want to have. Usefull at the moment to run "
+ + "missing_files.txt in parallel on one machine.")
parser.add_option("--el7worker",
action='store_true',
help='Use singularity to run inside SL6-container on EL7-Nodes.')
@@ -110,44 +113,41 @@ def SFrameBatchMain(input_options):
help='Use singularity to run inside SL6-container on EL7-Nodes.')
parser.add_option("--silent-warnings",
action='store_true',
- help='silent all warnings.')
+ help='silent all warnings.')
(options, args) = parser.parse_args(input_options)
import warnings
- if(options.silent_warnings):
+ if options.silent_warnings:
warnings.simplefilter('ignore')
-
- if(options.el7worker):
+
+ if options.el7worker:
options.sl6container = True
warnings.simplefilter("once", DeprecationWarning)
- warnings.warn("\033[93m\nYou are using the option --el7worker, which will be removed soon. Although this still works, you should consider to switch to the option --sl6container, if you want to run jobs inside a SL6-container on EL7 HTCondor machines using singularity.\n\033[0m", DeprecationWarning)
-
+ warnings.warn(
+ "\033[93m\nYou are using the option --el7worker, which will be removed soon. "
+ + "Although this still works, you should consider to switch to the option --sl6container, "
+ + "if you want to run jobs inside a SL6-container on EL7 HTCondor machines using singularity.\n\033[0m",
+ DeprecationWarning,
+ )
+
start = timeit.default_timer()
if 'missing_files.txt' in args[0]:
filename = args[0]
currentDir = os.getcwd()
if "/" in filename:
- directory = filename.replace("missing_files.txt",'')
+ directory = filename.replace("missing_files.txt", '')
os.chdir(directory)
- run_missing_files('missing_files.txt', int(options.numberOfWorker))
+ missing_files_runner.run_missing_files('missing_files.txt', int(options.numberOfWorker))
os.chdir(currentDir)
return 0
-
- #global header
+ # global header
if len(args) != 1:
parser.error("wrong number of arguments. Help can be invoked with --help")
xmlfile = args[0]
- if options.xmldatabaseDir:
- XMLBuilder = lumicalc_autobuilder(options.xmldatabaseDir)
- XMLBuilder.write_to_toyxml(xmlfile)
- stop = timeit.default_timer()
- print "SFrame Batch was running for",round(stop - start,2),"sec"
- print "SFrame Batch just build",xmlfile,"for you. Fill the USERCONFIG yourself."
- return 0
if os.path.islink(xmlfile):
xmlfile = os.path.abspath(os.readlink(xmlfile))
@@ -157,102 +157,104 @@ def SFrameBatchMain(input_options):
if not os.path.exists('JobConfig.dtd'):
os.system('ln -sf %s/JobConfig.dtd .' % scriptpath)
- #print xmlfile, os.getcwd
- proc_xmllint = subprocess.Popen(['xmllint','--noent','--dtdattr',xmlfile],stdout=subprocess.PIPE)
- xmlfile_strio = StringIO.StringIO(proc_xmllint.communicate()[0])
+ # print(xmlfile, os.getcwd)
+ proc_xmllint = subprocess.Popen(["xmllint", "--noent", "--dtdattr", xmlfile], stdout=subprocess.PIPE)
+ xmlfile_strio = StringIO(str(proc_xmllint.communicate()[0], 'utf-8'))
sax_parser = xml.sax.make_parser()
- xmlparsed = parse(xmlfile_strio,sax_parser)
- header = fileheader(xmlfile)
+ xmlparsed = parse(xmlfile_strio, sax_parser)
+ header = io_func.fileheader(xmlfile)
if options.FileSplitFileCheck:
header.RemoveEmptyFileSplit = True
else:
header.RemoveEmptyFiles = False
if header.RemoveEmptyFileSplit and header.FileSplit:
- print "Removing all empty files in FileSplit mode."
+ print("Removing all empty files in FileSplit mode.")
node = xmlparsed.getElementsByTagName('JobConfiguration')[0]
- Job = JobConfig(node)
-
+ Job = Inf_Classes.JobConfig(node)
+
workdir = header.Workdir
if options.workdir:
- print "Overwriting workdir:",workdir,"with",options.workdir
+ print("Overwriting workdir:", workdir, "with", options.workdir)
workdir = options.workdir
- if not workdir : workdir="workdir"
- #if not workdir.endswith("/"): workdir += "/"
+ if not workdir:
+ workdir = "workdir"
+
currentDir = os.getcwd()
if not os.path.exists(workdir+'/'):
os.makedirs(workdir+'/')
- print workdir,'has been created'
- shutil.copy(scriptpath+"JobConfig.dtd",workdir)
- shutil.copy(args[0],workdir)
- #print header.Version[0]
+ print(workdir, 'has been created')
+ shutil.copy(scriptpath+"JobConfig.dtd", workdir)
+ shutil.copy(args[0], workdir)
for cycle in Job.Job_Cylce:
- if len(options.outputdir)>0:
- print 'Overwriting',cycle.OutputDirectory,'with',options.outputdir
- cycle.OutputDirectory=options.outputdir
- if not cycle.OutputDirectory.endswith("/"): cycle.OutputDirectory +="/"
- if cycle.OutputDirectory.startswith('./'):
- cycle.OutputDirectory = currentDir+cycle.OutputDirectory[1:]
- if len(options.useritemlist)>0 :
- print 'Searching to replace UserConfig Values'
+ if len(options.outputdir) > 0:
+ print("Overwriting", cycle.OutputDirectory, "with", options.outputdir)
+ cycle.OutputDirectory = options.outputdir
+ if not cycle.OutputDirectory.endswith("/"):
+ cycle.OutputDirectory += "/"
+ if cycle.OutputDirectory.startswith('./'):
+ cycle.OutputDirectory = currentDir + cycle.OutputDirectory[1:]
+ if len(options.useritemlist) > 0:
+ print("Searching to replace UserConfig Values")
for item in options.useritemlist:
if ',' not in item:
- print 'No , found in the substitution:',item
+ print("No , found in the substitution:", item)
continue
- else:
+ else:
pair_name_value = item.split(",")
item_name = pair_name_value[0]
item_value = pair_name_value[1]
for cycle_item in cycle.Cycle_UserConf:
if item_name == cycle_item.Name:
- print "Replacing",item_name,"Value:",cycle_item.Value ,"with",item_value
+ print("Replacing", item_name, "Value:", cycle_item.Value, "with", item_value)
cycle_item.Value = item_value
- print 'starting manager'
- manager = JobManager(options,header,workdir)
- manager.process_jobs(cycle.Cycle_InputData,Job)
- nameOfCycle = cycle.Cyclename.replace('::','.')
- #this small function creates a xml file with the expected files
- if result_info(Job, workdir, header,options.sframeTreeInfo) == 1:
- print ' Result.xml created for further jobs'
- #submit jobs if asked for
- if options.submit: manager.submit_jobs(cycle.OutputDirectory,nameOfCycle)
- manager.check_jobstatus(cycle.OutputDirectory, nameOfCycle,False,False)
- if options.resubmit: manager.resubmit_jobs()
- #get once into the loop for resubmission & merging
+ print('starting manager')
+ manager = Manager.JobManager(options, header, workdir)
+ manager.process_jobs(cycle.Cycle_InputData, Job)
+ nameOfCycle = cycle.Cyclename.replace("::", ".")
+ # this small function creates a xml file with the expected files
+ if io_func.result_info(Job, workdir, header, options.sframeTreeInfo) == 1:
+ print(" Result.xml created for further jobs")
+ # submit jobs if asked for
+ if options.submit:
+ manager.submit_jobs(cycle.OutputDirectory, nameOfCycle)
+ manager.check_jobstatus(cycle.OutputDirectory, nameOfCycle, False, False)
+ if options.resubmit:
+ manager.resubmit_jobs()
+ # get once into the loop for resubmission & merging
if not options.loop and options.forceMerge and not options.waitMerge:
- manager.check_jobstatus(cycle.OutputDirectory,nameOfCycle)
- manager.merge_files(cycle.OutputDirectory,nameOfCycle,cycle.Cycle_InputData)
+ manager.check_jobstatus(cycle.OutputDirectory, nameOfCycle)
+ manager.merge_files(cycle.OutputDirectory, nameOfCycle, cycle.Cycle_InputData)
return 0
-
- loop_check = True
- while loop_check==True:
+ loop_check = True
+ while loop_check is True:
if not options.loop:
loop_check = False
# This is necessary since qstat sometimes does not find the jobs it should monitor.
# So it checks that it does not find the job 5 times before auto resubmiting it.
for i in range(6):
- manager.check_jobstatus(cycle.OutputDirectory,nameOfCycle)
+ manager.check_jobstatus(cycle.OutputDirectory, nameOfCycle)
else:
- manager.check_jobstatus(cycle.OutputDirectory,nameOfCycle)
-
- manager.merge_files(cycle.OutputDirectory,nameOfCycle,cycle.Cycle_InputData)
- if manager.get_subInfoFinish() or (not manager.merge.get_mergerStatus() and manager.missingFiles==0):
- print 'if grid pid information got lost root Files could still be transferring'
+ manager.check_jobstatus(cycle.OutputDirectory, nameOfCycle)
+
+ manager.merge_files(cycle.OutputDirectory, nameOfCycle, cycle.Cycle_InputData)
+ if manager.get_subInfoFinish() or (not manager.merge.get_mergerStatus() and manager.missingFiles == 0):
+ print('if grid pid information got lost root Files could still be transferring')
break
- if options.loop:
+ if options.loop:
manager.print_status()
time.sleep(5)
- #print 'Total progress', tot_prog
+ # print 'Total progress', tot_prog
manager.merge_wait()
- manager.check_jobstatus(cycle.OutputDirectory,nameOfCycle,False,False)
- print '-'*80
+ manager.check_jobstatus(cycle.OutputDirectory, nameOfCycle, False, False)
+ print('-'*80)
manager.print_status()
stop = timeit.default_timer()
- print "SFrame Batch was running for",round(stop - start,2),"sec"
- #exit gracefully
+ print("SFrame Batch was running for", round(stop - start, 2), "sec")
+ # exit gracefully
if (options.submit or options.resubmit):
# return code here indicates (re)submission went OK, not job statuses are all done
@@ -265,6 +267,6 @@ def SFrameBatchMain(input_options):
if __name__ == "__main__":
- #print 'Arguments',sys.argv[1:]
+ # print('Arguments',sys.argv[1:])
status = SFrameBatchMain(sys.argv[1:])
exit(status)
diff --git a/tree_checker.py b/tree_checker.py
deleted file mode 100755
index d3c1742..0000000
--- a/tree_checker.py
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/usr/bin/env python
-
-import ROOT
-import sys
-from glob import glob
-
-def check_TreeExists(filename,treename):
- rootfile = ROOT.TFile.Open(filename)
- #print filename
- try:
- rootTree = rootfile.Get(str(treename))
- entries = rootTree.GetEntriesFast()
- #print filename,'True entries',entries,entries>0
- return entries>0
- except:
- #print 'False'
- return False
- #entries = rootTree.GetEntriesFast()
- #if rootTree: return False
- #return True
-
-
-if __name__ == "__main__":
- for arg in sys.argv[2:]:
- if '*' in arg:
- for itfile in glob(pattern):
- check_TreeExists(itfile,sys.argv[1])
- else:
- check_TreeExists(arg,sys.argv[1])