Skip to content

Commit

Permalink
make --dryrun work again with TW v3.2411xx (#5347)
Browse files Browse the repository at this point in the history
  • Loading branch information
belforte authored Nov 7, 2024
1 parent 01d569e commit 653bafe
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 135 deletions.
122 changes: 51 additions & 71 deletions src/python/CRABClient/Commands/preparelocal.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
"""
The commands prepares a directory and the relative scripts to execute the jobs locally.
It can also execute a specific job if the jobid option is passed
"""
import os
import json
import shutil
import tarfile
import tempfile

from ServerUtilities import getProxiedWebDir, getColumn, downloadFromS3
from ServerUtilities import getColumn, downloadFromS3

from CRABClient.UserUtilities import curlGetFileFromURL
from CRABClient.ClientUtilities import execute_command
from CRABClient.Commands.SubCommand import SubCommand
from CRABClient.ClientExceptions import ClientException


class preparelocal(SubCommand):
""" The commands prepares a directory and the relative scripts to execute the jobs locally.
It can also execute a specific job if the jobid option is passed
"""
""" the preparelocal command instance """

def __init__(self, logger, cmdargs=None):
SubCommand.__init__(self, logger, cmdargs)
Expand All @@ -41,10 +42,11 @@ def __call__(self):

if self.options.jobid:
self.logger.info("Executing job %s locally" % self.options.jobid)
self.executeTestRun(inputArgs, self.options.jobid)
self.prepareDir(inputArgs, self.options.destdir)
self.executeTestRun(self.options.destdir, self.options.jobid)
self.logger.info("Job execution terminated")
else:
self.logger.info("Copying an preparing files for local execution in %s" % self.options.destdir)
self.logger.info("Copying and preparing files for local execution in %s" % self.options.destdir)
self.prepareDir(inputArgs, self.options.destdir)
self.logger.info("go to that directory IN A CLEAN SHELL and use 'sh run_job.sh NUMJOB' to execute the job")
finally:
Expand All @@ -55,7 +57,8 @@ def __call__(self):
return {'commandStatus': 'SUCCESS'}

def getInputFiles(self):
""" Get the InputFiles.tar.gz and extract the necessary files
"""
Get the InputFiles.tar.gz and extract the necessary files
"""
taskname = self.cachedinfo['RequestName']

Expand All @@ -67,74 +70,46 @@ def getInputFiles(self):
self.destination = getColumn(crabDBInfo, 'tm_asyncdest')
username = getColumn(crabDBInfo, 'tm_username')
sandboxName = getColumn(crabDBInfo, 'tm_user_sandbox')

inputsFilename = os.path.join(os.getcwd(), 'InputFiles.tar.gz')
sandboxFilename = os.path.join(os.getcwd(), 'sandbox.tar.gz')
if status not in ['UPLOADED', 'SUBMITTED']:
raise ClientException('Can only execute from tasks in status SUBMITTED or UPLOADED. Current status is %s' % status)
try:

if not status in ['UPLOADED', 'SUBMITTED']:
raise ClientException('Can only execute jobs from tasks in status SUBMITTED or UPLOADED. Current status is %s' % status)
# new way first
# following try-except can be removed and only the code in the try kept once
# there are no more tasks wubmitted with TW version v3.241018 or earlier
try: # this will fail with old tasks
inputsFilename = os.path.join(os.getcwd(), 'InputFiles.tar.gz')
sandboxFilename = os.path.join(os.getcwd(), 'sandbox.tar.gz')
downloadFromS3(crabserver=self.crabserver, filepath=inputsFilename,
objecttype='runtimefiles', taskname=taskname, logger=self.logger)
downloadFromS3(crabserver=self.crabserver, filepath=sandboxFilename,
objecttype='sandbox', logger=self.logger,
tarballname=sandboxName, username=username)
except Exception:
# fall back to WEB_DIR
if status == 'UPLOADED':
raise ClientException('Currently crab preparelocal only works for tasks successfully submitted')
elif status == 'SUBMITTED':
webdir = getProxiedWebDir(crabserver=self.crabserver, task=taskname,
logFunction=self.logger.debug)
if not webdir:
webdir = getColumn(crabDBInfo, 'tm_user_webdir')
self.logger.debug("Downloading 'InputFiles.tar.gz' from %s" % webdir)
httpCode = curlGetFileFromURL(webdir + '/InputFiles.tar.gz', inputsFilename, self.proxyfilename,
logger=self.logger)
if httpCode != 200:
raise ClientException("Failed to download 'InputFiles.tar.gz' from %s" % webdir)

for name in [inputsFilename, 'CMSRunAnalysis.tar.gz', 'sandbox.tar.gz']:
with tarfile.open(name) as tf:
with tarfile.open(inputsFilename) as tf:
tf.extractall()
except:
# old way for taks submitted "some time ago". They should better have bootstrapped
# so webdir should be defined.
self.logger.info('Task was submitted with old TaskWorker, fall back to WEB_DIR for tarballs')
from ServerUtilities import getProxiedWebDir
from CRABClient.UserUtilities import curlGetFileFromURL
webdir = getProxiedWebDir(crabserver=self.crabserver, task=taskname,
logFunction=self.logger.debug)
httpCode = curlGetFileFromURL(webdir + '/InputFiles.tar.gz', inputsFilename, self.proxyfilename,
logger=self.logger)
if httpCode != 200:
raise ClientException("Failed to download 'InputFiles.tar.gz' from %s" % webdir)
with tarfile.open(inputsFilename) as tf:
tf.extractall()

def executeTestRun(self, inputArgs, jobnr):
""" Execute a test run calling CMSRunAnalysis.sh
def executeTestRun(self, destDir, jobnr):
"""
os.environ.update({'CRAB3_RUNTIME_DEBUG': 'True', '_CONDOR_JOB_AD': 'Job.submit'})

optsList = [
os.path.join(os.getcwd(), 'TweakPSet.py'),
'-a %s' % inputArgs[jobnr-1]['CRAB_Archive'],
'-o %s' % inputArgs[jobnr-1]['CRAB_AdditionalOutputFiles'],
'--sourceURL=%s' % inputArgs[jobnr-1]['CRAB_ISB'],
'--location=%s' % os.getcwd(),
'--inputFile=%s' % inputArgs[jobnr-1]['inputFiles'],
'--runAndLumis=%s' % inputArgs[jobnr-1]['runAndLumiMask'],
'--firstEvent=%s' % inputArgs[jobnr-1]['firstEvent'], #jobs goes from 1 to N, inputArgs from 0 to N-1
'--lastEvent=%s' % inputArgs[jobnr-1]['lastEvent'],
'--firstLumi=%s' % inputArgs[jobnr-1]['firstLumi'],
'--firstRun=%s' % inputArgs[jobnr-1]['firstRun'],
'--seeding=%s' % inputArgs[jobnr-1]['seeding'],
'--lheInputFiles=%s' % inputArgs[jobnr-1]['lheInputFiles'],
'--oneEventMode=0',
'--eventsPerLumi=%s' % inputArgs[jobnr-1]['eventsPerLumi'],
'--maxRuntime=-1',
'--jobNumber=%s' % (jobnr-1),
'--cmsswVersion=%s' % inputArgs[jobnr-1]['CRAB_JobSW'],
'--scramArch=%s' % inputArgs[jobnr-1]['CRAB_JobArch'],
'--scriptExe=%s' % inputArgs[jobnr-1]['scriptExe'],
'--scriptArgs=%s' % inputArgs[jobnr-1]['scriptArgs'],
]
# from a python list to a string which can be used as shell command argument
opts = ''
for opt in optsList:
opts = opts + ' %s'%opt
command = 'sh CMSRunAnalysis.sh ' + opts
out, err, returncode = execute_command(command=command)
self.logger.debug(out)
self.logger.debug(err)
if returncode != 0:
raise ClientException('Failed to execute local test run:\n StdOut: %s\n StdErr: %s' % (out, err))
Execute a test run calling CMSRunAnalysis.sh
"""
os.chdir(destDir)
cmd = 'eval `scram unsetenv -sh`;'\
' bash run_job.sh %s' % str(jobnr)
execute_command(cmd, logger=self.logger, redirect=False)

def prepareDir(self, inputArgs, targetDir):
""" Prepare a directory with just the necessary files:
Expand Down Expand Up @@ -182,9 +157,15 @@ def prepareDir(self, inputArgs, targetDir):

for f in ["gWMS-CMSRunAnalysis.sh", "CMSRunAnalysis.sh", "cmscp.py", "CMSRunAnalysis.tar.gz",
"sandbox.tar.gz", "run_and_lumis.tar.gz", "input_files.tar.gz", "Job.submit",
"submit_env.sh"
"submit_env.sh", "splitting-summary.json"
]:
shutil.copy2(f, targetDir)
try: # for backward compatibility with TW v3.241017 where splitting-summary.json is missing
shutil.copy2(f, targetDir)
except FileNotFoundError:
pass

cmd = "cd %s; tar xf CMSRunAnalysis.tar.gz" % targetDir
execute_command(command=cmd, logger=self.logger)

# this InputArgs.txt is for backward compatibility with old TW
# but may also be useful for local submission https://twiki.cern.ch/twiki/bin/view/CMSPublic/CRABPrepareLocal
Expand All @@ -202,8 +183,7 @@ def prepareDir(self, inputArgs, targetDir):
bashWrapper = """#!/bin/bash
. ./submit_env.sh && save_env && setup_local_env
tar xzmf CMSRunAnalysis.tar.gz
#
#
export _CONDOR_JOB_AD=Job.${1}.submit
# leading '+' signs must be removed to use JDL as classAd file
sed -e 's/^+//' Job.submit > Job.${1}.submit
Expand Down
114 changes: 50 additions & 64 deletions src/python/CRABClient/Commands/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
setSubmitParserOptions, validateSubmitOptions, checkStatusLoop, execute_command

from ServerUtilities import MAX_MEMORY_PER_CORE, MAX_MEMORY_SINGLE_CORE, downloadFromS3, FEEDBACKMAIL
from CRABClient.Commands.preparelocal import preparelocal as crabPreparelocal

class submit(SubCommand):
"""
Expand Down Expand Up @@ -162,7 +163,9 @@ def __call__(self):
checkStatusLoop(self.logger, server, self.defaultApi, uniquerequestname, targetTaskStatus, self.name)

if self.options.dryrun:
self.printDryRunResults(*self.executeTestRun(filecacheurl, uniquerequestname))
self.logger.info("Dry run")
self.runPrepareLocal(projDir)
self.printDryRunResults(*self.executeTestRun(filecacheurl, uniquerequestname, projDir))

self.logger.debug("About to return")

Expand All @@ -171,6 +174,26 @@ def __call__(self):
returnDict['commandStatus'] = 'SUCCESS'

return returnDict
def runPrepareLocal(self, projDir):
"""
"""
cmdargs = []
cmdargs.append("-d")
cmdargs.append(projDir)
#cmdargs += ["--jobid", "1"]
if "instance" in self.options.__dict__.keys():
cmdargs.append("--instance")
cmdargs.append(self.options.__dict__["instance"])
if "proxy" in self.options.__dict__.keys():
cmdargs.append("--proxy")
cmdargs.append(self.options.__dict__["proxy"])

self.logger.debug("preparelocal, cmdargs: %s", cmdargs)
preparelocalCmd = crabPreparelocal(logger=self.logger, cmdargs=cmdargs)

retval = preparelocalCmd()

return retval


def setOptions(self):
Expand Down Expand Up @@ -384,23 +407,25 @@ def _encodeRequest(self, configreq, listParams):
return str(encoded)


def executeTestRun(self, filecacheurl, uniquerequestname):
def executeTestRun(self, filecacheurl, uniquerequestname, projDir):
"""
Downloads the dry run tarball from the User File Cache and unpacks it in a temporary directory.
Runs a trial to obtain the performance report. Repeats trial with successively larger input events
until a job length of maxSeconds is reached (this improves accuracy for fast-running CMSSW parameter sets.)
"""
cwd = os.getcwd()
try:
tmpDir = tempfile.mkdtemp()
self.logger.info('Created temporary directory for dry run sandbox in %s' % tmpDir)
os.chdir(tmpDir)
downloadFromS3(crabserver=self.crabserver, filepath=os.path.join(tmpDir, 'dry-run-sandbox.tar.gz'),
objecttype='runtimefiles', taskname=uniquerequestname, logger=self.logger)
for name in ['dry-run-sandbox.tar.gz', 'InputFiles.tar.gz', 'CMSRunAnalysis.tar.gz', 'sandbox.tar.gz']:
tf = tarfile.open(os.path.join(tmpDir, name))
tf.extractall(tmpDir)
tf.close()
#tmpDir = tempfile.mkdtemp()
#self.logger.info('Created temporary directory for dry run sandbox in %s' % tmpDir)
self.logger.info('Execute rest run in local sub-directory of %s', projDir)
os.chdir(os.path.join(projDir, 'local'))
#downloadFromS3(crabserver=self.crabserver, filepath=os.path.join(tmpDir, 'dry-run-sandbox.tar.gz'),
# objecttype='runtimefiles', taskname=uniquerequestname, logger=self.logger)
#for name in ['dry-run-sandbox.tar.gz', 'InputFiles.tar.gz', 'CMSRunAnalysis.tar.gz', 'sandbox.tar.gz']:

#tf = tarfile.open('dry-run-sandbox.tar.gz')
#tf.extractall('splitting-summary.json')
#tf.close()
os.environ.update({'_CONDOR_JOB_AD': 'Job.submit'})

with open('splitting-summary.json') as f:
Expand All @@ -417,34 +442,18 @@ def executeTestRun(self, filecacheurl, uniquerequestname):
while totalJobSeconds < maxSeconds:
if totalJobSeconds != 0:
self.logger.info("Last trial took only %.1f seconds. We are trying now with %.0f events", totalJobSeconds, events)
optsList = getCMSRunAnalysisOpts('Job.submit', 'RunJobs.dag', job=1, events=events)
# from a python list to a string which can be used as shell command argument
opts = ''
for opt in optsList:
opts = opts + ' %s' % opt
setCMSRunAnalysisOpts(events=events)
# job wrapper needs to be executed in a clean shell, like it happens in the WN, not
# inside the environemnt where CRABClient runs (i.e. some CMSSW env. which may conflict
# with the WMCore code used in the wrapper
undoScram = "eval `scram unsetenv -sh`; "
setEnv = """
echo $PWD && ls -lrth
if [ -f ./submit_env.sh ]; then
# (dario, 202212)
# this if/else has been introduced only for backwards compatibility of the crab client
# with the old TW with the old jobwrapper.
# once the new TW with the new jobwrapper is deployed, we can remove this
# if/else and keep only the code inside the "if" clause.
. ./submit_env.sh && save_env && setup_local_env;
else
# this code in the "else" clause can be discarded after we merge the new
# TW with the new jobwrapper.
export SCRAM_ARCH=slc6_amd64_gcc481
export CRAB_RUNTIME_TARBALL=local
export CRAB_TASKMANAGER_TARBALL=local
export CRAB3_RUNTIME_DEBUG=True
fi
. ./submit_env.sh && save_env && setup_local_env;
tar xzmf CMSRunAnalysis.tar.gz
"""
command = undoScram + setEnv + 'sh CMSRunAnalysis.sh ' + opts
#command = undoScram + setEnv + 'sh CMSRunAnalysis.sh ' + opts
command = undoScram + setEnv + 'sh CMSRunAnalysis.sh --json DryRunJobArg.json'
out, err, returncode = execute_command(command=command)
self.logger.debug(out)
if returncode != 0:
Expand All @@ -462,7 +471,7 @@ def executeTestRun(self, filecacheurl, uniquerequestname):

finally:
os.chdir(cwd)
shutil.rmtree(tmpDir)
#shutil.rmtree(tmpDir)

return splitting, report

Expand Down Expand Up @@ -531,38 +540,15 @@ def printDryRunResults(self, splitting, report):
self.logger.info("\nDry run requested: task paused\nTo continue processing, use 'crab proceed'\n")


def getCMSRunAnalysisOpts(ad, dag, job=1, events=10):
def setCMSRunAnalysisOpts(events=10):
"""
Parse the job ad to obtain the arguments that were passed to condor.
"""

set_re = re.compile(r'\+?(\w+)\s*=\s*(.*)$')

info = {}
with open(ad) as f:
for line in f:
m = set_re.match(line)
if not m:
continue
key, value = m.groups()
# Somehow, Condor likes doubled double quotes?
info[key] = value.strip("'\"").replace('""', '"')
with open(dag) as f:
for line in f:
if line.startswith('VARS Job{job}'.format(job=job)):
break
else:
raise ClientException('Dry run failed to execute parse DAG description.')
for setting in shlex.split(line):
m = set_re.match(setting)
if not m:
continue
key, value = m.groups()
info[key] = value.replace('""', '"')

info.update({'CRAB_Id': '0', 'firstEvent': '1', 'lastEvent': str(int(events) + 1)})

args = shlex.split(info['Arguments'])
def repl(match):
return info[match.group(1)]
return [re.sub(r'\$\((\w+)\)', repl, arg) for arg in args]
with open('JobArgs-1.json', 'r') as f:
args = json.load(f)
args.update({'CRAB_Id': '0', 'firstEvent': '1', 'lastEvent': str(int(events) + 1)})
with open('DryRunJobArg.json', 'w') as f:
json.dump(args, f)
return

0 comments on commit 653bafe

Please sign in to comment.