From 653bafe0f37f50459b3d785eb76be361b7fc6acd Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Thu, 7 Nov 2024 09:31:35 +0100 Subject: [PATCH] make --dryrun work again with TW v3.2411xx (#5347) --- .../CRABClient/Commands/preparelocal.py | 122 ++++++++---------- src/python/CRABClient/Commands/submit.py | 114 +++++++--------- 2 files changed, 101 insertions(+), 135 deletions(-) diff --git a/src/python/CRABClient/Commands/preparelocal.py b/src/python/CRABClient/Commands/preparelocal.py index b6cac317..ac08efa0 100644 --- a/src/python/CRABClient/Commands/preparelocal.py +++ b/src/python/CRABClient/Commands/preparelocal.py @@ -1,21 +1,22 @@ +""" + The commands prepares a directory and the relative scripts to execute the jobs locally. + It can also execute a specific job if the jobid option is passed +""" import os import json import shutil import tarfile import tempfile -from ServerUtilities import getProxiedWebDir, getColumn, downloadFromS3 +from ServerUtilities import getColumn, downloadFromS3 -from CRABClient.UserUtilities import curlGetFileFromURL from CRABClient.ClientUtilities import execute_command from CRABClient.Commands.SubCommand import SubCommand from CRABClient.ClientExceptions import ClientException class preparelocal(SubCommand): - """ The commands prepares a directory and the relative scripts to execute the jobs locally. - It can also execute a specific job if the jobid option is passed - """ + """ the preparelocal command instance """ def __init__(self, logger, cmdargs=None): SubCommand.__init__(self, logger, cmdargs) @@ -41,10 +42,11 @@ def __call__(self): if self.options.jobid: self.logger.info("Executing job %s locally" % self.options.jobid) - self.executeTestRun(inputArgs, self.options.jobid) + self.prepareDir(inputArgs, self.options.destdir) + self.executeTestRun(self.options.destdir, self.options.jobid) self.logger.info("Job execution terminated") else: - self.logger.info("Copying an preparing files for local execution in %s" % self.options.destdir) + self.logger.info("Copying and preparing files for local execution in %s" % self.options.destdir) self.prepareDir(inputArgs, self.options.destdir) self.logger.info("go to that directory IN A CLEAN SHELL and use 'sh run_job.sh NUMJOB' to execute the job") finally: @@ -55,7 +57,8 @@ def __call__(self): return {'commandStatus': 'SUCCESS'} def getInputFiles(self): - """ Get the InputFiles.tar.gz and extract the necessary files + """ + Get the InputFiles.tar.gz and extract the necessary files """ taskname = self.cachedinfo['RequestName'] @@ -67,74 +70,46 @@ def getInputFiles(self): self.destination = getColumn(crabDBInfo, 'tm_asyncdest') username = getColumn(crabDBInfo, 'tm_username') sandboxName = getColumn(crabDBInfo, 'tm_user_sandbox') - inputsFilename = os.path.join(os.getcwd(), 'InputFiles.tar.gz') - sandboxFilename = os.path.join(os.getcwd(), 'sandbox.tar.gz') - if status not in ['UPLOADED', 'SUBMITTED']: - raise ClientException('Can only execute from tasks in status SUBMITTED or UPLOADED. Current status is %s' % status) - try: + + if not status in ['UPLOADED', 'SUBMITTED']: + raise ClientException('Can only execute jobs from tasks in status SUBMITTED or UPLOADED. Current status is %s' % status) + # new way first + # following try-except can be removed and only the code in the try kept once + # there are no more tasks wubmitted with TW version v3.241018 or earlier + try: # this will fail with old tasks + inputsFilename = os.path.join(os.getcwd(), 'InputFiles.tar.gz') + sandboxFilename = os.path.join(os.getcwd(), 'sandbox.tar.gz') downloadFromS3(crabserver=self.crabserver, filepath=inputsFilename, objecttype='runtimefiles', taskname=taskname, logger=self.logger) downloadFromS3(crabserver=self.crabserver, filepath=sandboxFilename, objecttype='sandbox', logger=self.logger, tarballname=sandboxName, username=username) - except Exception: - # fall back to WEB_DIR - if status == 'UPLOADED': - raise ClientException('Currently crab preparelocal only works for tasks successfully submitted') - elif status == 'SUBMITTED': - webdir = getProxiedWebDir(crabserver=self.crabserver, task=taskname, - logFunction=self.logger.debug) - if not webdir: - webdir = getColumn(crabDBInfo, 'tm_user_webdir') - self.logger.debug("Downloading 'InputFiles.tar.gz' from %s" % webdir) - httpCode = curlGetFileFromURL(webdir + '/InputFiles.tar.gz', inputsFilename, self.proxyfilename, - logger=self.logger) - if httpCode != 200: - raise ClientException("Failed to download 'InputFiles.tar.gz' from %s" % webdir) - - for name in [inputsFilename, 'CMSRunAnalysis.tar.gz', 'sandbox.tar.gz']: - with tarfile.open(name) as tf: + with tarfile.open(inputsFilename) as tf: + tf.extractall() + except: + # old way for taks submitted "some time ago". They should better have bootstrapped + # so webdir should be defined. + self.logger.info('Task was submitted with old TaskWorker, fall back to WEB_DIR for tarballs') + from ServerUtilities import getProxiedWebDir + from CRABClient.UserUtilities import curlGetFileFromURL + webdir = getProxiedWebDir(crabserver=self.crabserver, task=taskname, + logFunction=self.logger.debug) + httpCode = curlGetFileFromURL(webdir + '/InputFiles.tar.gz', inputsFilename, self.proxyfilename, + logger=self.logger) + if httpCode != 200: + raise ClientException("Failed to download 'InputFiles.tar.gz' from %s" % webdir) + with tarfile.open(inputsFilename) as tf: tf.extractall() - def executeTestRun(self, inputArgs, jobnr): - """ Execute a test run calling CMSRunAnalysis.sh + def executeTestRun(self, destDir, jobnr): """ - os.environ.update({'CRAB3_RUNTIME_DEBUG': 'True', '_CONDOR_JOB_AD': 'Job.submit'}) - - optsList = [ - os.path.join(os.getcwd(), 'TweakPSet.py'), - '-a %s' % inputArgs[jobnr-1]['CRAB_Archive'], - '-o %s' % inputArgs[jobnr-1]['CRAB_AdditionalOutputFiles'], - '--sourceURL=%s' % inputArgs[jobnr-1]['CRAB_ISB'], - '--location=%s' % os.getcwd(), - '--inputFile=%s' % inputArgs[jobnr-1]['inputFiles'], - '--runAndLumis=%s' % inputArgs[jobnr-1]['runAndLumiMask'], - '--firstEvent=%s' % inputArgs[jobnr-1]['firstEvent'], #jobs goes from 1 to N, inputArgs from 0 to N-1 - '--lastEvent=%s' % inputArgs[jobnr-1]['lastEvent'], - '--firstLumi=%s' % inputArgs[jobnr-1]['firstLumi'], - '--firstRun=%s' % inputArgs[jobnr-1]['firstRun'], - '--seeding=%s' % inputArgs[jobnr-1]['seeding'], - '--lheInputFiles=%s' % inputArgs[jobnr-1]['lheInputFiles'], - '--oneEventMode=0', - '--eventsPerLumi=%s' % inputArgs[jobnr-1]['eventsPerLumi'], - '--maxRuntime=-1', - '--jobNumber=%s' % (jobnr-1), - '--cmsswVersion=%s' % inputArgs[jobnr-1]['CRAB_JobSW'], - '--scramArch=%s' % inputArgs[jobnr-1]['CRAB_JobArch'], - '--scriptExe=%s' % inputArgs[jobnr-1]['scriptExe'], - '--scriptArgs=%s' % inputArgs[jobnr-1]['scriptArgs'], - ] - # from a python list to a string which can be used as shell command argument - opts = '' - for opt in optsList: - opts = opts + ' %s'%opt - command = 'sh CMSRunAnalysis.sh ' + opts - out, err, returncode = execute_command(command=command) - self.logger.debug(out) - self.logger.debug(err) - if returncode != 0: - raise ClientException('Failed to execute local test run:\n StdOut: %s\n StdErr: %s' % (out, err)) + Execute a test run calling CMSRunAnalysis.sh + """ + os.chdir(destDir) + cmd = 'eval `scram unsetenv -sh`;'\ + ' bash run_job.sh %s' % str(jobnr) + execute_command(cmd, logger=self.logger, redirect=False) def prepareDir(self, inputArgs, targetDir): """ Prepare a directory with just the necessary files: @@ -182,9 +157,15 @@ def prepareDir(self, inputArgs, targetDir): for f in ["gWMS-CMSRunAnalysis.sh", "CMSRunAnalysis.sh", "cmscp.py", "CMSRunAnalysis.tar.gz", "sandbox.tar.gz", "run_and_lumis.tar.gz", "input_files.tar.gz", "Job.submit", - "submit_env.sh" + "submit_env.sh", "splitting-summary.json" ]: - shutil.copy2(f, targetDir) + try: # for backward compatibility with TW v3.241017 where splitting-summary.json is missing + shutil.copy2(f, targetDir) + except FileNotFoundError: + pass + + cmd = "cd %s; tar xf CMSRunAnalysis.tar.gz" % targetDir + execute_command(command=cmd, logger=self.logger) # this InputArgs.txt is for backward compatibility with old TW # but may also be useful for local submission https://twiki.cern.ch/twiki/bin/view/CMSPublic/CRABPrepareLocal @@ -202,8 +183,7 @@ def prepareDir(self, inputArgs, targetDir): bashWrapper = """#!/bin/bash . ./submit_env.sh && save_env && setup_local_env -tar xzmf CMSRunAnalysis.tar.gz -# +# export _CONDOR_JOB_AD=Job.${1}.submit # leading '+' signs must be removed to use JDL as classAd file sed -e 's/^+//' Job.submit > Job.${1}.submit diff --git a/src/python/CRABClient/Commands/submit.py b/src/python/CRABClient/Commands/submit.py index cacc1ba2..acfafc6d 100644 --- a/src/python/CRABClient/Commands/submit.py +++ b/src/python/CRABClient/Commands/submit.py @@ -23,6 +23,7 @@ setSubmitParserOptions, validateSubmitOptions, checkStatusLoop, execute_command from ServerUtilities import MAX_MEMORY_PER_CORE, MAX_MEMORY_SINGLE_CORE, downloadFromS3, FEEDBACKMAIL +from CRABClient.Commands.preparelocal import preparelocal as crabPreparelocal class submit(SubCommand): """ @@ -162,7 +163,9 @@ def __call__(self): checkStatusLoop(self.logger, server, self.defaultApi, uniquerequestname, targetTaskStatus, self.name) if self.options.dryrun: - self.printDryRunResults(*self.executeTestRun(filecacheurl, uniquerequestname)) + self.logger.info("Dry run") + self.runPrepareLocal(projDir) + self.printDryRunResults(*self.executeTestRun(filecacheurl, uniquerequestname, projDir)) self.logger.debug("About to return") @@ -171,6 +174,26 @@ def __call__(self): returnDict['commandStatus'] = 'SUCCESS' return returnDict + def runPrepareLocal(self, projDir): + """ + """ + cmdargs = [] + cmdargs.append("-d") + cmdargs.append(projDir) + #cmdargs += ["--jobid", "1"] + if "instance" in self.options.__dict__.keys(): + cmdargs.append("--instance") + cmdargs.append(self.options.__dict__["instance"]) + if "proxy" in self.options.__dict__.keys(): + cmdargs.append("--proxy") + cmdargs.append(self.options.__dict__["proxy"]) + + self.logger.debug("preparelocal, cmdargs: %s", cmdargs) + preparelocalCmd = crabPreparelocal(logger=self.logger, cmdargs=cmdargs) + + retval = preparelocalCmd() + + return retval def setOptions(self): @@ -384,7 +407,7 @@ def _encodeRequest(self, configreq, listParams): return str(encoded) - def executeTestRun(self, filecacheurl, uniquerequestname): + def executeTestRun(self, filecacheurl, uniquerequestname, projDir): """ Downloads the dry run tarball from the User File Cache and unpacks it in a temporary directory. Runs a trial to obtain the performance report. Repeats trial with successively larger input events @@ -392,15 +415,17 @@ def executeTestRun(self, filecacheurl, uniquerequestname): """ cwd = os.getcwd() try: - tmpDir = tempfile.mkdtemp() - self.logger.info('Created temporary directory for dry run sandbox in %s' % tmpDir) - os.chdir(tmpDir) - downloadFromS3(crabserver=self.crabserver, filepath=os.path.join(tmpDir, 'dry-run-sandbox.tar.gz'), - objecttype='runtimefiles', taskname=uniquerequestname, logger=self.logger) - for name in ['dry-run-sandbox.tar.gz', 'InputFiles.tar.gz', 'CMSRunAnalysis.tar.gz', 'sandbox.tar.gz']: - tf = tarfile.open(os.path.join(tmpDir, name)) - tf.extractall(tmpDir) - tf.close() + #tmpDir = tempfile.mkdtemp() + #self.logger.info('Created temporary directory for dry run sandbox in %s' % tmpDir) + self.logger.info('Execute rest run in local sub-directory of %s', projDir) + os.chdir(os.path.join(projDir, 'local')) + #downloadFromS3(crabserver=self.crabserver, filepath=os.path.join(tmpDir, 'dry-run-sandbox.tar.gz'), + # objecttype='runtimefiles', taskname=uniquerequestname, logger=self.logger) + #for name in ['dry-run-sandbox.tar.gz', 'InputFiles.tar.gz', 'CMSRunAnalysis.tar.gz', 'sandbox.tar.gz']: + + #tf = tarfile.open('dry-run-sandbox.tar.gz') + #tf.extractall('splitting-summary.json') + #tf.close() os.environ.update({'_CONDOR_JOB_AD': 'Job.submit'}) with open('splitting-summary.json') as f: @@ -417,34 +442,18 @@ def executeTestRun(self, filecacheurl, uniquerequestname): while totalJobSeconds < maxSeconds: if totalJobSeconds != 0: self.logger.info("Last trial took only %.1f seconds. We are trying now with %.0f events", totalJobSeconds, events) - optsList = getCMSRunAnalysisOpts('Job.submit', 'RunJobs.dag', job=1, events=events) - # from a python list to a string which can be used as shell command argument - opts = '' - for opt in optsList: - opts = opts + ' %s' % opt + setCMSRunAnalysisOpts(events=events) # job wrapper needs to be executed in a clean shell, like it happens in the WN, not # inside the environemnt where CRABClient runs (i.e. some CMSSW env. which may conflict # with the WMCore code used in the wrapper undoScram = "eval `scram unsetenv -sh`; " setEnv = """ echo $PWD && ls -lrth -if [ -f ./submit_env.sh ]; then - # (dario, 202212) - # this if/else has been introduced only for backwards compatibility of the crab client - # with the old TW with the old jobwrapper. - # once the new TW with the new jobwrapper is deployed, we can remove this - # if/else and keep only the code inside the "if" clause. - . ./submit_env.sh && save_env && setup_local_env; -else - # this code in the "else" clause can be discarded after we merge the new - # TW with the new jobwrapper. - export SCRAM_ARCH=slc6_amd64_gcc481 - export CRAB_RUNTIME_TARBALL=local - export CRAB_TASKMANAGER_TARBALL=local - export CRAB3_RUNTIME_DEBUG=True -fi +. ./submit_env.sh && save_env && setup_local_env; +tar xzmf CMSRunAnalysis.tar.gz """ - command = undoScram + setEnv + 'sh CMSRunAnalysis.sh ' + opts + #command = undoScram + setEnv + 'sh CMSRunAnalysis.sh ' + opts + command = undoScram + setEnv + 'sh CMSRunAnalysis.sh --json DryRunJobArg.json' out, err, returncode = execute_command(command=command) self.logger.debug(out) if returncode != 0: @@ -462,7 +471,7 @@ def executeTestRun(self, filecacheurl, uniquerequestname): finally: os.chdir(cwd) - shutil.rmtree(tmpDir) + #shutil.rmtree(tmpDir) return splitting, report @@ -531,38 +540,15 @@ def printDryRunResults(self, splitting, report): self.logger.info("\nDry run requested: task paused\nTo continue processing, use 'crab proceed'\n") -def getCMSRunAnalysisOpts(ad, dag, job=1, events=10): +def setCMSRunAnalysisOpts(events=10): """ Parse the job ad to obtain the arguments that were passed to condor. """ - set_re = re.compile(r'\+?(\w+)\s*=\s*(.*)$') - - info = {} - with open(ad) as f: - for line in f: - m = set_re.match(line) - if not m: - continue - key, value = m.groups() - # Somehow, Condor likes doubled double quotes? - info[key] = value.strip("'\"").replace('""', '"') - with open(dag) as f: - for line in f: - if line.startswith('VARS Job{job}'.format(job=job)): - break - else: - raise ClientException('Dry run failed to execute parse DAG description.') - for setting in shlex.split(line): - m = set_re.match(setting) - if not m: - continue - key, value = m.groups() - info[key] = value.replace('""', '"') - - info.update({'CRAB_Id': '0', 'firstEvent': '1', 'lastEvent': str(int(events) + 1)}) - - args = shlex.split(info['Arguments']) - def repl(match): - return info[match.group(1)] - return [re.sub(r'\$\((\w+)\)', repl, arg) for arg in args] + with open('JobArgs-1.json', 'r') as f: + args = json.load(f) + args.update({'CRAB_Id': '0', 'firstEvent': '1', 'lastEvent': str(int(events) + 1)}) + with open('DryRunJobArg.json', 'w') as f: + json.dump(args, f) + return +