diff --git a/bin/crab.py b/bin/crab.py index b67a8757..790a6804 100755 --- a/bin/crab.py +++ b/bin/crab.py @@ -132,8 +132,13 @@ def log_exception(exc_type, exc_value, tback): print("'" + str(args[0]) + "' is not a valid command.") self.parser.print_help() sys.exit(-1) - self.cmd = sub_cmd(self.logger, args[1:]) + self.cmd = sub_cmd(self.logger, args[1:]) # the crab command to be executed + # Every command returns a dictionary which MUST contain the "commandStatus" key. + # Any value other then "SUCCESS" for this key indicates command failure + # and will result in 'crab' terminating with non-zero exit code + # Additional keys may be present in the dictionary, depending on the specific command, + # which are used to pass information to caller when CRABAPI is used returnDict = self.cmd() if returnDict['commandStatus'] != 'SUCCESS': raise CommandFailedException("Command %s failed" % str(args[0])) @@ -172,14 +177,14 @@ def log_exception(exc_type, exc_value, tback): errorId = re.search(r'(?<=X-Error-Id:\s)[^\n]*', err).group(0) client.logger.info('Error Id: %s', errorId) logging.getLogger('CRAB3').exception('Caught RESTInterfaceException exception') - except ClientException as ce: - client.logger.error(ce) - logging.getLogger('CRAB3').exception('Caught ClientException exception') - exitcode = ce.exitcode except CommandFailedException as ce: client.logger.warning(ce) logging.getLogger('CRAB3').exception('Caught CommandFailedException exception') exitcode = 1 + except ClientException as ce: + client.logger.error(ce) + logging.getLogger('CRAB3').exception('Caught ClientException exception') + exitcode = ce.exitcode except KeyboardInterrupt: client.logger.error('Keyboard Interrupted') exitcode = 1 diff --git a/src/python/CRABClient/Commands/recover.py b/src/python/CRABClient/Commands/recover.py index ebeb1ec3..22d76cfc 100644 --- a/src/python/CRABClient/Commands/recover.py +++ b/src/python/CRABClient/Commands/recover.py @@ -28,7 +28,6 @@ # step submit from CRABClient.Commands.submit import submit from CRABClient.UserUtilities import getColumn -from CRABClient.ClientUtilities import colors from ServerUtilities import SERVICE_INSTANCES SPLITTING_RECOVER_LUMIBASED = set(("LumiBased", "Automatic", "EventAwareLumiBased")) @@ -44,6 +43,17 @@ class recover(SubCommand): shortnames = ["rec"] def __call__(self): + """ + Code is organized as a series of steps, if any step fails, command exits + Each step returns a "retval" dictionary which always contains keys: 'commandStatus' and 'step' + 'step' value is the name of the step + 'commandStatus' can be: SUCCESS, FAILED, NothingToDo + Only the first two can be returned by this method to crab.py, the latter "NothingToDo" + is used as a "break" to exit the chain of steps early and will be converted to SUCCES before + this method exits + Other keys may be present as present in the return dict of subcommands used in here + if a 'msg' key is present, stepExit will log that message + """ retval = self.stepInit() if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) @@ -62,6 +72,7 @@ def __call__(self): self.logger.debug("no need to run crab remake - self.restHostCommonname %s", self.restHostCommonname) self.crabProjDir = self.requestarea + retval = self.stepValidate() if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) @@ -83,7 +94,7 @@ def __call__(self): if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) if "recoverLumimaskPath" not in retval: - return retval + return self.stepExit(retval) retval = self.stepSubmitLumiBased(retval["recoverLumimaskPath"]) if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) @@ -110,6 +121,11 @@ def stepExit(self, retval): > retval = self.stepYYY() > if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) """ + if 'msg' in retval: + self.logger.info("recover process prematurely exited during %s step", retval['step']) + self.logger.info(retval['msg']) + if retval['commandStatus'] == 'NothingToDo': + retval['commandStatus'] = "SUCCESS" # tell crab.py to exit cleanly with no error return retval def stepInit(self): @@ -132,7 +148,7 @@ def stepInit(self): self.failingTaskInfo = {} self.failedJobs = [] - return {"commandStatus": "SUCCESS", "init": None } + return {"commandStatus": "SUCCESS", "step": "init"} def stepRemake(self): """ @@ -169,6 +185,9 @@ def stepRemake(self): retval = remakeCmd.remakecache(self.failingTaskName) self.logger.debug("stepRemakeAndValidate() - remake, retval: %s", retval) self.logger.debug("stepRemakeAndValidate() - remake, after, self.configuration: %s", self.configuration) + retval['step'] = "remake" + if retval['commandStatus'] != "SUCCESS": + retval['msg'] = "Could not remake the task project directory" return retval def stepValidate(self): @@ -202,6 +221,7 @@ def stepValidate(self): splitalgo = getColumn(self.failingCrabDBInfo, 'tm_split_algo') if not splitalgo in SPLITTING_RECOVER_LUMIBASED.union(SPLITTING_RECOVER_FILEBASED): msg = 'crab recover supports only tasks with LumiBased and FileBased splitting, you have {}'.format(splitalgo) + self.logger.info(msg) return {"commandStatus": "FAILED", "step": "RemakeAndValidate" , "msg": msg } self.failingTaskInfo["splitalgo"] = splitalgo @@ -210,7 +230,7 @@ def stepValidate(self): self.logger.debug("stepRemakeAndValidate() - failingtaskinfo - %s", self.failingTaskInfo) - return {"commandStatus": "SUCCESS", "validate": None } + return {"commandStatus": "SUCCESS", "step": "validate"} def stepStatus(self): """ @@ -261,6 +281,9 @@ def stepStatus(self): # [1, 2, 4] self.failedJobs = [job[1] for job in retval["jobList"] if job[0] == "failed"] self.logger.debug("stepStatus() - status, failedJobs: %s", self.failedJobs) + retval['step'] = "status" + if retval['commandStatus'] != "SUCCESS": + retval['msg'] = "Could not retrieve task status" return retval @@ -270,22 +293,23 @@ def stepKill(self): - kills the original failing task """ ## step2: kill + retval = {"step": "kill"} # if the task is already killed or about to be killed, do not kill again if self.failingTaskStatus["dbStatus"] == "KILLED" or \ (self.failingTaskStatus["dbStatus"] in ("NEW", "QUEUED") and self.failingTaskStatus["command"] == "KILL"): - returnDict = {'kill' : 'already killed', 'commandStatus': 'SUCCESS'} - self.logger.info("step kill - task already killed") - return returnDict + retval['commandStatus'] = "SUCCESS" + self.logger.debug("step kill - task already killed") + return retval # avoid that crab operators kill users tasks by mistake. # if the user who is running crab recover differs from the one who submitted the original task, # then kill the task only if the option "--forcekill" is used. username = getUsername(self.proxyfilename, logger=self.logger) if self.failingTaskInfo["username"] != username and not self.options.__dict__["forceKill"]: - returnDict = {'kill' : 'do not kill task submitted by another user', 'commandStatus': 'FAILED'} - self.logger.info("step kill - task submitted by another user, will not kill it") - return returnDict + retval['commandStatus'] = "FAILED" + retval['msg'] = "task submitted by another user, will not kill it" + return retval cmdargs = [] cmdargs.append("-d") @@ -301,7 +325,7 @@ def stepKill(self): self.logger.debug("stepKill() - cmdargs: %s", cmdargs) killCmd = kill(logger=self.logger, cmdargs=cmdargs) with SubcommandExecution(self.logger, "kill") as _: - retval = killCmd() + retval.update(killCmd()) self.logger.debug("stepKill() - retval: %s", retval) self.logger.debug("stepKill() - after, self.configuration: %s", self.configuration) @@ -369,23 +393,28 @@ def stepCheckKill(self): self.logger.debug("stepCheckKill() - dagStatus %s", self.failingTaskStatus["dagStatus"]) self.logger.debug("stepCheckKill() - dbStatus %s", self.failingTaskStatus["dbStatus"]) + retval = {'step': "checkKill"} + # check the task status. # it does not make sense to recover a task in COMPLETED if not self.failingTaskStatus["status"] in ("SUBMITTED", "FAILED", "FAILED (KILLED)"): - msg = "In order to recover a task, the combined status of the task needs can not be {}".format(self.failingTaskStatus["status"]) - return {"commandStatus": "FAILED", "step": "checkKill" , "msg": msg } + msg = "Tasks in status {} can not be recovered".format(self.failingTaskStatus["status"]) + retval.update({"commandStatus": "NothingToDo", "msg": msg}) + return retval # the status on the db should be submitted or killed. or about to be killed if self.failingTaskStatus["dbStatus"] in ("NEW", "QUEUED"): if not self.failingTaskStatus["command"] in ("KILL"): msg = "In order to recover a task, when the status of the task in the oracle DB is {}, the task command can not be {}"\ .format(self.failingTaskStatus["dbStatus"], self.failingTaskStatus["command"]) - return {"commandStatus": "FAILED", "step": "checkKill" , "msg": msg } + retval.update({"commandStatus": "FAILED", "msg": msg}) + return retval else: if not self.failingTaskStatus["dbStatus"] in ("SUBMITTED", "KILLED"): msg = "In order to recover a task, the status of the task in the oracle DB can not be {}"\ .format(self.failingTaskStatus["dbStatus"]) - return {"commandStatus": "FAILED", "step": "checkKill" , "msg": msg } + retval.update({"commandStatus": "FAILED", "msg": msg}) + return retval # make sure that the jobs ad publications are in a final state. # - [x] make sure that there are no ongoing transfers @@ -396,7 +425,8 @@ def stepCheckKill(self): if not set(self.failingTaskStatus["jobsPerStatus"].keys()).issubset(terminalStates): msg = "In order to recover a task, all the jobs need to be in a terminal state ({}). You have {}"\ .format(terminalStates, self.failingTaskStatus["jobsPerStatus"].keys()) - return {"commandStatus": "FAILED", "step": "checkKill" , "msg": msg } + retval.update({"commandStatus": "FAILED", "msg": msg}) + return retval # - [x] make sure that there are no ongoing publications self.logger.debug("stepCheckKill - publication %s", self.failingTaskStatus["publication"] ) @@ -404,15 +434,18 @@ def stepCheckKill(self): if not set(self.failingTaskStatus["publication"].keys()).issubset(terminalStatesPub): msg = "In order to recover a task, publication for all the jobs need to be in a terminal state ({}). You have {}"\ .format(terminalStatesPub, self.failingTaskStatus["publication"].keys()) - return {"commandStatus": "FAILED", "step": "checkKill" , "msg": msg } + retval.update({"commandStatus": "FAILED", "msg": msg}) + return retval # - [x] if all jobs failed, then exit. it is better to submit again the task than using crab recover :) # check that "failed" is the only key of the jobsPerStatus dictionary if set(self.failingTaskStatus["jobsPerStatus"].keys()) == set(("failed",)): - msg = "All the jobs of the original task failed. better submitting it again from scratch than recovering it." - return {"commandStatus": "FAILED", "step": "checkKill" , "msg": msg } + msg = "All the jobs of the original task failed. Better investigate and submit it again than recover." + retval.update({"commandStatus": "FAILED", "msg": msg}) + return retval - return {"commandStatus": "SUCCESS", "checkkill": "task can be recovered"} + retval.update({"commandStatus": "SUCCESS", "msg": "task can be recovered"}) + return retval def stepReport(self): """ @@ -425,6 +458,8 @@ def stepReport(self): with the output of crab report """ + retval = {"step": "report"} + failingTaskPublish = getColumn(self.failingCrabDBInfo, 'tm_publication') self.logger.debug("stepReport() - tm_publication: %s %s", type(failingTaskPublish), failingTaskPublish) # - if the user specified --strategy=notPublished but the original failing task @@ -459,7 +494,7 @@ def stepReport(self): reportCmd = report(logger=self.logger, cmdargs=cmdargs) with SubcommandExecution(self.logger, "report") as _: # FIXME - stays noisy because interference with getMutedStatusInfo() - retval = reportCmd() + retval.update(reportCmd()) self.logger.debug("stepReport() - report, after, self.configuration: %s", self.configuration) self.logger.debug("stepReport() - report, retval: %s", retval) @@ -486,17 +521,20 @@ def stepReport(self): # we will likely never reach this if, because in this case the status on the schedd # should be COMPLETED, which is not accepted by stepCheckKill self.logger.info("stepReport() - all lumis have been processed by original task. crab recover will exit") + retval.update({'commandStatus' : 'SUCCESS'}) + return retval self.logger.debug("crab report - recovery task will process lumis contained in file %s", recoverLumimaskPath) if os.path.exists(recoverLumimaskPath): - returnDict = {'commandStatus' : 'SUCCESS', 'recoverLumimaskPath': recoverLumimaskPath} + retval.update({'commandStatus' : 'SUCCESS', 'recoverLumimaskPath': recoverLumimaskPath}) else: - msg = 'the file {} does not exist. crab report could not produce it, the task can not be recovered'.format(recoverLumimaskPath) - returnDict = {'commandStatus' : 'FAILED', 'msg': msg} + msg = 'File {} does not exist. crab report could not produce it, the task can not be recovered'.format(recoverLumimaskPath) + self.logger.info(msg) + retval.update({'commandStatus' : 'FAILED', 'msg': msg}) - return returnDict + return retval def stepGetsandbox(self): """ @@ -504,6 +542,8 @@ def stepGetsandbox(self): - download the user_ and debug_sandbox from s3 or from the schedd """ + retval = {"step": "getSandbox"} + cmdargs = [] cmdargs.append("-d") cmdargs.append(str(self.crabProjDir)) @@ -516,7 +556,7 @@ def stepGetsandbox(self): self.logger.debug("stepGetsandbox() - cmdargs: %s", cmdargs) getsandboxCmd = getsandbox(logger=self.logger, cmdargs=cmdargs) with SubcommandExecution(self.logger, "getsandbox") as _: - retval = getsandboxCmd() + retval.update(getsandboxCmd()) self.logger.debug("stepGetsandbox() - retval: %s", retval) return retval @@ -528,6 +568,9 @@ def stepExtractSandbox(self, sandbox_paths): - extracts the user_ and debug_sandbox, so that the files that they contain can be used by crab submit at a later step """ + + retval = {"step": "extractSandbox"} + debug_sandbox = tarfile.open(sandbox_paths[0]) debug_sandbox.extractall(path=os.path.join(self.crabProjDir, "user_sandbox")) debug_sandbox.close() @@ -539,7 +582,8 @@ def stepExtractSandbox(self, sandbox_paths): self.recoverconfig = os.path.join(self.crabProjDir, "debug_sandbox", "debug" , "crabConfig.py") - return {"commandStatus": "SUCCESS", } + retval.update({"commandStatus": "SUCCESS"}) + return retval def stepSubmitLumiBased(self, notFinishedJsonPath): """ @@ -551,6 +595,8 @@ def stepSubmitLumiBased(self, notFinishedJsonPath): - submits a new task """ + retval = {"step": "submitLumiBased"} + cmdargs = [] cmdargs.append("-c") cmdargs.append(self.recoverconfig) @@ -588,7 +634,7 @@ def stepSubmitLumiBased(self, notFinishedJsonPath): submitCmd = submit(logger=self.logger, cmdargs=cmdargs) # with SubcommandExecution(self.logger, "submit") as _: - retval = submitCmd() + retval.update(submitCmd()) self.logger.debug("stepSubmit() - retval %s", retval) return retval @@ -602,8 +648,11 @@ def stepSubmitFileBased(self): - [ ] if the input is from DBS, then write info to runs_and_lumis.tar.gz """ + retval = {"step": "submitFileBased"} + # TODO # I will need to implement this! + raise NotImplementedError return {'commandStatus': 'FAILED', 'error': 'not implemented yet'} def setOptions(self):