diff --git a/src/python/CRABClient/Commands/getcommand.py b/src/python/CRABClient/Commands/getcommand.py index 83a45182a..6ccc59646 100644 --- a/src/python/CRABClient/Commands/getcommand.py +++ b/src/python/CRABClient/Commands/getcommand.py @@ -24,7 +24,7 @@ def __call__(self, **argv): self.dest = os.path.join(self.requestarea, 'results') # Destination is a URL. - if re.match("^[a-z]+://", self.dest): + if re.match("^[a-z]+://", self.dest): if not self.dest.endswith("/"): self.dest += "/" #Creating the destination directory if necessary @@ -62,7 +62,7 @@ def __call__(self, **argv): cpresults = [] # for workflow in dictresult['result']: TODO re-enable this when we will have resubmissions workflow = dictresult['result'] #TODO assigning workflow to dictresult. for the moment we have only one wf - arglist = ['-d', self.dest, '-i', workflow, '-t', self.options.task, '-p', self.proxyfilename] + arglist = ['-d', self.dest, '-i', workflow, '-t', self.options.task, '-p', self.proxyfilename, '-l',self.options.nparallel, '-w',self.options.waittime] if len(workflow) > 0: if self.dump: for fileInfo in workflow: diff --git a/src/python/CRABClient/Commands/getlog.py b/src/python/CRABClient/Commands/getlog.py index 65c33058a..55ac6b27c 100644 --- a/src/python/CRABClient/Commands/getlog.py +++ b/src/python/CRABClient/Commands/getlog.py @@ -29,6 +29,12 @@ def setOptions(self): self.parser.add_option( '-q', '--quantity', dest = 'quantity', help = 'The number of logs you want to retrieve (or "all"). Ignored if --jobids is used.' ) + self.parser.add_option( '-l', '--parallel', + dest = 'nparallel', + help = 'Number of parallel download, default is 10 parallel download',) + self.parser.add_option( '-w', '--wait', + dest = 'waittime', + help = 'Increase the sendreceive-timeout in second',) getcommand.setOptions(self) """ diff --git a/src/python/CRABClient/Commands/getoutput.py b/src/python/CRABClient/Commands/getoutput.py index 0b4fdff2b..337664d1e 100644 --- a/src/python/CRABClient/Commands/getoutput.py +++ b/src/python/CRABClient/Commands/getoutput.py @@ -25,4 +25,10 @@ def setOptions(self): self.parser.add_option( '-q', '--quantity', dest = 'quantity', help = 'The number of output files you want to retrieve (or "all"). Ignored if --jobids is used.' ) + self.parser.add_option( '-l', '--parallel', + dest = 'nparallel', + help = 'Number of parallel download, default is 10 parallel download',) + self.parser.add_option( '-w', '--wait', + dest = 'waittime', + help = 'Increase the sendreceive-timeout in second',) getcommand.setOptions(self) diff --git a/src/python/CRABClient/Commands/remote_copy.py b/src/python/CRABClient/Commands/remote_copy.py index 814ab698b..1dbd57e39 100644 --- a/src/python/CRABClient/Commands/remote_copy.py +++ b/src/python/CRABClient/Commands/remote_copy.py @@ -1,12 +1,11 @@ from __future__ import division import os -import logging import subprocess -import threading import multiprocessing, Queue import time import re from math import ceil +from multiprocessing import Manager from WMCore.FwkJobReport.FileInfo import readAdler32, readCksum @@ -40,32 +39,57 @@ def setOptions(self): dest = "inputdict", default = None ) + self.parser.add_option("-l", "--parallel", + dest = "nparallel") + + self.parser.add_option("-w", "--wait", + dest = "waittime") + def __call__(self): """ Copying locally files staged remotely. *Using a subprocess to encapsulate the copy command. - *Using a timeout to avoid wiating too long - - srm timeout based on file size - - first transfer assumes a relatively slow bandiwth `downspeed` - - next transfers depends on previous speed - - srm timeout cannot be less then `minsrmtimeout` - - if size is unknown default srm timeout is `srmtimeout` + * maximum parallel download is 10, line 61 + * default --sendreceive-timeout is 1800 s, line 75 and 77 """ + globalExitcode = -1 dicttocopy = self.options.inputdict - lcgCmd = 'lcg-cp --connect-timeout 20 --sendreceive-timeout 240 --verbose -b -D srmv2' - lcgtimeout = 20 + 240 + 60 #giving 1 extra minute: 5min20" + #taking number of parallel download to create from user, default is 10 + if self.options.nparallel==None: + nsubprocess=10 + else: + nsubprocess=int(self.options.nparallel) + + if nsubprocess <=0 or nsubprocess >20: + self.logger.info("Inappropriate number of parallel download, must between 0 to 20 ") + return -1 + + #lcgCmd = 'lcg-cp --connect-timeout 20 --sendreceive-timeout 240 --verbose -b -D srmv2' + lcgCmd = 'lcg-cp --connect-timeout 20 --verbose -b -D srmv2' + + #Increase the client timeout + if self.options.waittime==None: + lcgCmd=lcgCmd+" --sendreceive-timeout 1800" + else: + sendrecievetimeadd=1800+int(self.options.waittime) + lcgCmd=lcgCmd+" --sendreceive-timeout " + str(sendrecievetimeadd) + + #lcgtimeout = 20 + 240 + 60 #giving 1 extra minute: 5min20" srmtimeout = 900 #default transfer timeout in case the file size is unknown: 15min minsrmtimeout = 60 #timeout cannot be less then 1min downspeed = float(250*1024) #default speed assumes a download of 250KB/s mindownspeed = 20*1024. - finalresults = {} + manager=Manager() + successfiles = manager.dict() + failedfiles = manager.dict() + - #this can be parallelized starting more processes in startchildproc - input, result, proc = self.startchildproc(processWorker) + self.logger.debug("Starting ChildProcess with %s ChildProcess" % nsubprocess) + inputq, processarray = self.startchildproc(self.processWorker,nsubprocess, successfiles, failedfiles) for myfile in dicttocopy: if downspeed < mindownspeed: @@ -104,109 +128,137 @@ def __call__(self): else: cmd = cmd % ("file://%s" % localFilename) - self.logger.info("Retrieving file '%s' " % fileid) - self.logger.debug("Executing '%s' " % cmd) - input.put((fileid, cmd)) - starttime = time.time() - endtime = 0 - res = None - stdout = '' - stderr = '' - exitcode = -1 - try: - res = result.get(block = True, timeout = lcgtimeout+localsrmtimeout) - self.logger.debug("Command finished") - endtime = time.time() - stdout = res['stdout'] - stderr = res['stderr'] - exitcode = res['exit'] - except Queue.Empty: - self.logger.debug("Command timed out") - stderr = "Timeout retrieving result after %i seconds" % (lcgtimeout+localsrmtimeout) - stdout = '' - exitcode = -1 - downspeed -= downspeed*0.5 #if fails for timeout, reducing download bandwidth of 50% - - checkout = simpleOutputCheck(stdout) - checkerr = simpleOutputCheck(stderr) - checksumOK = False - if not url_input and hasattr(myfile, 'checksum'): - self.logger.debug("Checksum '%s'" %str(myfile['checksum'])) - checksumOK = checksumChecker(localFilename, myfile['checksum']) - else: - checksumOK = True # No checksums provided + self.logger.info("Placing file '%s' in retrieval queue " % fileid) + inputq.put((myfile, cmd)) + + + self.logger.info("Please wait") + + + self.stopchildproc(inputq, processarray,nsubprocess) - if exitcode is not 0 or (len(checkout) + len(checkerr)) > 0: - ## check to track srmv1 issues, probably this is strong enough to find all of them - ## REMOVE this check as soon as sites will have switched to srmv2 - if ('srmv1' in myfile['pfn'] or 'managerv1' in myfile['pfn']) and len( filter(lambda elem: elem.find('communication error on send')!=-1, checkerr) ) > 0: - msgFail = '\n\tThe site storage is using srmv1, which is deprecated and not anymore supported.\n' - msgFail += '\tPlease report this issue with the PFN provided here below.\n\tPFN: "%s".' % str(myfile['pfn']) - finalresults[fileid] = {'exit': False, 'error': msgFail, 'dest': None} - else: - if 'timeout' in stdout or 'timeout' in stderr or 'timed out' in stdout or 'timed out' in stderr: - downspeed -= downspeed*0.5 #if fails for timeout, reducing download bandwidth of 50% - finalresults[fileid] = {'exit': False, 'output': checkout, 'error' : checkerr, 'dest': None} - self.logger.info(colors.RED + "Failed retrieving file %s" % fileid + colors.NORMAL) - if len(finalresults[fileid]['output']) > 0: - self.logger.info("Output:") - [self.logger.info("\t %s" % x) for x in finalresults[fileid]['output']] - if len(finalresults[fileid]['error']) > 0: - self.logger.info("Error:") - [self.logger.info("\t %s" % x) for x in finalresults[fileid]['error']] - elif not checksumOK: - msg = "Checksum failed for job " + str(fileid) - finalresults[fileid] = {'exit': False, 'error': msg, 'dest': None} - self.logger.info( msg ) - else: - finalresults[fileid] = {'exit': True, 'dest': os.path.join(dirpath, str(fileid)), 'error': None} - self.logger.info(colors.GREEN + "Successfully retrived file %s" % fileid + colors.NORMAL) - tottime = endtime - starttime - if myfile['size']: - downspeed = myfile['size']/tottime #calculating average of download bandwidth during last copy - self.logger.debug("Transfer took %.1f sec. and average speed of %.1f KB/s" % (tottime, downspeed/1024)) - - self.stopchildproc(input, proc) - - for fileid in finalresults: - if finalresults[fileid]['exit']: - self.logger.info("File %s has been placed in %s" %(fileid, finalresults[fileid]['dest'])) - else: - self.logger.debug(str(finalresults[fileid])) - self.logger.debug("File %s: transfer problem %s" %(fileid, str(finalresults[fileid]['error']))) - globalExitcode = 1 - if len(finalresults.keys()) is 0: - self.logger.info("Nothing has been retrieved.") + #getting output for global exit + if len(successfiles)==0: + self.logger.info("No file retrieved") + globalExitcode= -1 + elif len(failedfiles) != 0: + self.logger.info(colors.GREEN+"Number of file successfully retrieve: %s" % len(successfiles)+colors.NORMAL) + self.logger.info(colors.RED+"Number of file failed retrieve: %s" % len(failedfiles)+colors.NORMAL) + #self.logger.debug("List of failed file and reason: %s" % failedfiles) + globalExitcode= -1 else: - self.logger.info("Retrieval completed") + self.logger.info(colors.GREEN+"All fails successfully retrieve "+colors.NORMAL) + globalExitcode=0 + + + - if globalExitcode == -1: - globalExitcode = 0 return CommandResult(globalExitcode, '') - def startchildproc(self, childprocess): + def startchildproc(self, childprocess, nsubprocess, successfiles, failedfiles): """ starting sub process and creating the queue """ - input = multiprocessing.Queue() - result = multiprocessing.Queue() - p = multiprocessing.Process(target = childprocess, args = (input, result)) - p.start() - return input, result, p + inputq = multiprocessing.Queue() + subprocessarray=[] + + for i in xrange(nsubprocess): + p = multiprocessing.Process(target = childprocess, args = (inputq, successfiles, failedfiles)) + subprocessarray.append(p) + subprocessarray[i].start() - def stopchildproc(self, inqueue, childprocess): + + return inputq,subprocessarray + + def stopchildproc(self,inputq,processarray,nsubprocess): """ simply sending a STOP message to the sub process """ + self.logger.debug("stopchildproc() method has been called") try: - inqueue.put( ('-1', 'STOP', 'control') ) - except Exception, ex: - pass + for i in range(nsubprocess): + inputq.put(('-1', 'STOP')) + + #except Exception, ex: + # pass finally: # giving the time to the sub-process to exit - childprocess.terminate() - time.sleep(1) + for process in processarray: + process.join() + #time.sleep(1) + + def processWorker(self,input, successfiles, failedfiles): + """ + _processWorker_ + + Runs a subprocessed command. + """ + # Get this started + + + while True: + workid = None + try: + myfile, work = input.get() + t1 = time.time() + except (EOFError, IOError): + crashMessage = "Hit EOF/IO in getting new work\n" + crashMessage += "Assuming this is a graceful break attempt." + print crashMessage + break + + if work == 'STOP': + break + else: + fileid = myfile['pfn'].split('/')[-1] + dirpath = os.path.join(self.options.destination, myfile['suffix'] if 'suffix' in myfile else '') + if not os.path.isdir(dirpath): + os.makedirs(dirpath) + url_input = bool(re.match("^[a-z]+://", dirpath)) + localFilename = os.path.join(dirpath, str(fileid)) + command = work + + self.logger.info("Retrieving %s " % fileid) + self.logger.debug("Executing %s" % command) + pipe = subprocess.Popen(command, stdout = subprocess.PIPE, + stderr = subprocess.PIPE, shell = True) + stdout, stderr = pipe.communicate() + error=simpleOutputCheck(stderr) + + self.logger.debug("Finish executing for file %s" % fileid) + + if pipe.returncode != 0 or len(error) > 0: + self.logger.info(colors.RED + "Failed retrieving %s" % fileid + colors.NORMAL) + #self.logger.debug(colors.RED +"Stderr: %s " %stderr+ colors.NORMAL) + [self.logger.debug(colors.RED +"\t %s" % x + colors.NORMAL) for x in error] + failedfiles[fileid]=str(error) + + if "timed out" in stderr or "timed out" in stdout: + self.logger.info(colors.RED + "Failed due to connection timeout"+ colors.NORMAL ) + self.logger.info("Please use the '-w' option to increase the connection timeout") + + if os.path.isfile(localFilename) and os.path.getsize(localFilename)!=myfile['size']: + self.logger.debug("File %s has the wrong size, deleting it" % fileid) + try: + os.remove(localFilename) + except Exception, ex: + self.logger.debug("Cannot remove the file because of: %s" % ex) + time.sleep(60) + return + else: + self.logger.info(colors.GREEN + "Success in retrieving %s " %fileid + colors.NORMAL) + if not url_input and hasattr(myfile, 'checksum'): + self.logger.debug("Checksum '%s'" %str(myfile['checksum'])) + checksumOK = checksumChecker(localFilename, myfile['checksum']) + else: + checksumOK = True # No checksums provided + + if not checksumOK: + failedfiles[fileid]="Checksum failed" + else: + successfiles[fileid]=' Successfully retrieve' + return def simpleOutputCheck(outlines): @@ -250,53 +302,6 @@ def simpleOutputCheck(outlines): return set(problems) -import time, fcntl, select,signal -from subprocess import Popen, PIPE, STDOUT - -from os import kill -from signal import alarm, signal, SIGALRM, SIGKILL -from subprocess import PIPE, Popen - - -def processWorker(input, results): - """ - _processWorker_ - - Runs a subprocessed command. - """ - - # Get this started - t1 = None - jsout = None - - while True: - workid = None - try: - pfn, work = input.get() - t1 = time.time() - except (EOFError, IOError): - crashMessage = "Hit EOF/IO in getting new work\n" - crashMessage += "Assuming this is a graceful break attempt." - print crashMessage - break - - if work == 'STOP': - break - - command = work - pipe = subprocess.Popen(command, stdout = subprocess.PIPE, - stderr = subprocess.PIPE, shell = True) - stdout, stderr = pipe.communicate() - - results.put( { - 'pfn': pfn, - 'stdout': stdout, - 'stderr': stderr, - 'exit': pipe.returncode - }) - - return 0 - def checksumChecker(localFilename, checksums): """ Check given checksums vs. what's on disk