diff --git a/.gitignore b/.gitignore index 2e1618d..1e0f5da 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ parts bin var sdist +/scripts diff --git a/README.md b/README.md index 40c7b12..08d055f 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ INSTALL Version 0.1.1 of BaseSpacePy can be checked out here: - git clone git@github.com:basespace/basespace-python-sdk.git + git clone https://github.com/basespace/basespace-python-sdk.git To install 'BaseSpacePy' run the 'setup.py' script in the main directory (for a global install you will need to run this command with root privileges): diff --git a/src/BaseSpacePy/api/APIClient.py b/src/BaseSpacePy/api/APIClient.py index f112eee..6a76a42 100644 --- a/src/BaseSpacePy/api/APIClient.py +++ b/src/BaseSpacePy/api/APIClient.py @@ -45,13 +45,11 @@ def __forcePostCall__(self,resourcePath,postData,headers,data=None): :param qParams:Query oaramter string :param resourcePath: The url ''' -# print "Forcing post" -# print resourcePath postData = [(p,postData[p]) for p in postData] headerPrep = [k + ':' + headers[k] for k in headers.keys()] post = urllib.urlencode(postData) -# print headerPrep -# print post +# print "header prep " + str(headerPrep) +# print "post " + str(post) response = cStringIO.StringIO() c = pycurl.Curl() c.setopt(pycurl.URL,resourcePath + '?' + post) @@ -97,8 +95,10 @@ def callAPI(self, resourcePath, method, queryParams, postData, if headerParams: for param, value in headerParams.iteritems(): headers[param] = value - - if not headers.has_key('Content-Type') and not method=='PUT': headers['Content-Type'] = 'application/json' + + # specify the content type + if not headers.has_key('Content-Type') and not method=='PUT' and not forcePost: headers['Content-Type'] = 'application/json' + headers['Authorization'] = 'Bearer ' + self.apiKey data = None diff --git a/src/BaseSpacePy/api/BaseSpaceAPI.py b/src/BaseSpacePy/api/BaseSpaceAPI.py index 40c8ad6..02b0367 100644 --- a/src/BaseSpacePy/api/BaseSpaceAPI.py +++ b/src/BaseSpacePy/api/BaseSpaceAPI.py @@ -192,8 +192,8 @@ def getAppSession(self,Id=''): if (not self.appSessionId) and (not Id): raise Exception("This BaseSpaceAPI instance has no appSessionId set and no alternative id was supplied for method getAppSession") -# if (not id) and (not self.key): -# raise Exception("This BaseSpaceAPI instance has no client_secret (key) set and no alternative id was supplied for method getAppSession") + if (not id) and (not self.key): + raise Exception("This BaseSpaceAPI instance has no client_secret (key) set and no alternative id was supplied for method getAppSession") resourcePath = self.apiServer + '/appsessions/{AppSessionId}' if not Id: @@ -260,7 +260,7 @@ def getWebVerificationCode(self,scope,redirectURL,state=''): data = {'client_id':self.key,'redirect_uri':redirectURL,'scope':scope,'response_type':'code',"state":state} return self.weburl + webAuthorize + '?' + urllib.urlencode(data) - def obtainAccessToken(self,deviceCode): + def obtainAccessToken(self,deviceCode,grantType='device'): ''' Returns a user specific access token. @@ -268,12 +268,12 @@ def obtainAccessToken(self,deviceCode): ''' if (not self.key) or (not self.secret): raise Exception("This BaseSpaceAPI instance has either no client_secret or no client_id set and no alternative id was supplied for method getVerificationCode") - data = [('client_id',self.key),('client_secret', self.secret),('code',deviceCode),('grant_type','device'),('redirect_uri','google.com')] + data = [('client_id',self.key),('client_secret', self.secret),('code',deviceCode),('grant_type',grantType),('redirect_uri','google.com')] dict = self.__makeCurlRequest__(data,self.apiServer + tokenURL) return dict['access_token'] - def updatePrivileges(self,code): - token = self.obtainAccessToken(code) + def updatePrivileges(self,code,grantType='device'): + token = self.obtainAccessToken(code,grantType=grantType) self.setAccessToken(token) def getAccessToken(self): @@ -618,9 +618,9 @@ def createAppResult(self,Id,name,desc,samples=[],appSessionId=None): :param desc: A describtion of the AppResult :param samples: (Optional) The samples :param appSessionId: (Optional) If no appSessionId is given, the id used to initialize the BaseSpaceAPI instance - will be used. If appSessionId is set equal to an empty string, a new appsession will be created for the + will be used. If appSessionId is set equal to an empty string, a new appsession will be created for the appresult object ''' - if (not self.appSessionId) and (not appSessionId): + if (not self.appSessionId) and (appSessionId==None): raise Exception("This BaseSpaceAPI instance has no appSessionId set and no alternative id was supplied for method createAppResult") resourcePath = '/projects/{ProjectId}/appresults' @@ -671,6 +671,7 @@ def appResultFileUpload(self, Id, localPath, fileName, directory, contentType, m queryParams['directory'] = directory headerParams = {} headerParams['Content-Type'] = contentType + # three cases, two for multipart, starting if multipart==1: queryParams['multipart'] = 'true' @@ -678,12 +679,6 @@ def appResultFileUpload(self, Id, localPath, fileName, directory, contentType, m # Set force post as this need to use POST though no data is being streamed return self.__singleRequest__(FileResponse.FileResponse,resourcePath, method,\ queryParams, headerParams,postData=postData,verbose=0,forcePost=1) - elif multipart==2: - queryParams = {'uploadstatus':'complete'} - postData = None - # Set force post as this need to use POST though no data is being streamed - return self.__singleRequest__(FileResponse.FileResponse,resourcePath, method,\ - queryParams, headerParams,postData=postData,verbose=0,forcePost=1) else: postData = open(localPath).read() return self.__singleRequest__(FileResponse.FileResponse,resourcePath, method,\ @@ -740,10 +735,6 @@ def __uploadMultipartUnit__(self,Id,partNumber,md5,data): headerParams = {'Content-MD5':md5.strip()} out = self.apiClient.callAPI(resourcePath, method, queryParams, data, headerParams=headerParams,forcePost=0) return out -# curl -v -H "x-access-token: {access token}" \ -# -H "Content-MD5: 9mvo6qaA+FL1sbsIn1tnTg==" \ -# -T reportarchive.zipaa \ -# -X PUT https://api.cloud-endor.illumina.com/v1pre2/files/7094087/parts/1 # def largeFileDownload(self): # ''' @@ -752,31 +743,46 @@ def __uploadMultipartUnit__(self,Id,partNumber,md5,data): # raise Exception('Not yet implemented') -# def multipartFileUpload(self,Id, localPath, fileName, directory, contentType, tempdir='',cpuCount=2,partSize=25,verbose=0): -# ''' -# Method for multi-threaded file-upload for parallel transfer of very large files (currently only runs on unix systems) -# -# -# :param Id: The AppResult ID -# :param localPath: The local path of the file to be uploaded -# :param fileName: The desired filename on the server -# :param directory: The server directory to place the file in (empty string will place it in the root directory) -# :param contentType: The content type of the file -# :param tempdir: Temp directory to use, if blank the directory for 'localPath' will be used -# :param cpuCount: The number of CPUs to be used -# :param partSize: The size of individual upload parts (must be between 5 and 25mb) -# :param verbose: Write process output to stdout as upload progresses -# ''' -# -# # Create file object on server -# myFile = self.AppResultFileUpload(Id, localPath, fileName, directory, contentType,multipart=1) -# -# # prepare multi-par upload objects -# myMpu = mpu(self,Id,localPath,myFile,cpuCount,partSize,tempdir=tempdir,verbose=verbose) -# return myMpu -# -# def markFileState(self,Id): -# pass + def multipartFileUpload(self,Id, localPath, fileName, directory, contentType, tempdir='',cpuCount=2,partSize=25,startChunk=1,verbose=0): + ''' + Method for multi-threaded file-upload for parallel transfer of very large files (currently only runs on unix systems) + The call returns + + :param Id: The AppResult ID + :param localPath: The local path of the file to be uploaded + :param fileName: The desired filename on the server + :param directory: The server directory to place the file in (empty string will place it in the root directory) + :param contentType: The content type of the file + :param tempdir: Temp directory to use, if blank the directory for 'localPath' will be used + :param cpuCount: The number of CPUs to be used + :param partSize: The size of individual upload parts (must be between 5 and 25mb) + :param verbose: Write process output to stdout as upload progresses + ''' + + # Create file object on server + myFile = self.appResultFileUpload(Id, localPath, fileName, directory, contentType,multipart=1) + + # prepare multi-part upload objects + myMpu = mpu(self,Id,localPath,myFile,cpuCount,partSize,tempdir=tempdir,startChunk=startChunk,verbose=verbose) + return myMpu,myFile + + def markFileState(self,Id): + ''' + Marks a multi-part upload file as complete + :param Id: file id. + ''' + resourcePath = '/files/{Id}' + resourcePath = resourcePath.replace('{format}', 'json') + method = 'POST' + resourcePath = resourcePath.replace('{Id}', Id) + headerParams = {} + queryParams = {'uploadstatus':'complete'} + postData = None + + # Set force post as this need to use POST though no data is being streamed + return self.__singleRequest__(FileResponse.FileResponse,resourcePath, method,\ + queryParams, headerParams,postData=postData,verbose=0,forcePost=1) + def setAppSessionState(self,Id,Status,Summary): ''' @@ -800,4 +806,4 @@ def setAppSessionState(self,Id,Status,Summary): postData['status'] = Status.lower() postData['statussummary'] = Summary return self.__singleRequest__(AppSessionResponse.AppSessionResponse,resourcePath, method,\ - queryParams, headerParams,postData=postData,verbose=0) \ No newline at end of file + queryParams, headerParams,postData=postData,verbose=0) diff --git a/src/BaseSpacePy/model/File.py b/src/BaseSpacePy/model/File.py index faea948..3335612 100644 --- a/src/BaseSpacePy/model/File.py +++ b/src/BaseSpacePy/model/File.py @@ -39,7 +39,7 @@ def __init__(self): def __str__(self): s = self.Name try: - s += "- status: " + self.UploadStatus + s += " - status: " + self.UploadStatus except: e=1 return s @@ -142,6 +142,16 @@ def getVariantMeta(self, api): Id = self.HrefVariants.split('/')[-1] return api.getVariantMetadata(Id,'txt') + def markAsComplete(self,api): + ''' + Mark a file object created as part of a multipart upload as complete + :param api: An instance of BaseSpaceAPI + ''' + api.markFileState(self.Id) + self.UploadStatus ='Complete' + + + self.Name = None # str # If set, provides the relative Uri to fetch the mean coverage statistics for data stored in the file diff --git a/src/BaseSpacePy/model/MultipartUpload.py b/src/BaseSpacePy/model/MultipartUpload.py index 8aadb54..82d92b3 100644 --- a/src/BaseSpacePy/model/MultipartUpload.py +++ b/src/BaseSpacePy/model/MultipartUpload.py @@ -76,14 +76,14 @@ def run(self): if next_task is None or self.halt.is_set(): # check if we are out of jobs or have been halted # Poison pill means shutdown - print '%s: Exiting' % proc_name +# print '%s: Exiting' % proc_name self.task_queue.task_done() break elif self.pause.is_set(): # if we have been paused, sleep for a bit then check back - print '%s: Paused' % proc_name +# print '%s: Paused' % proc_name time.sleep(3) else: # do some work - print '%s: %s' % (proc_name, next_task) +# print '%s: %s' % (proc_name, next_task) answer = next_task() self.task_queue.task_done() if answer.state == 1: # case everything went well @@ -96,10 +96,10 @@ def run(self): return class MultipartUpload: - def __init__(self,api,aId,localFile,fileObject,cpuCount,partSize,tempdir,verbose): + def __init__(self,api,aId,localFile,fileObject,cpuCount,partSize,tempdir,startChunk=1,verbose=0): self.api = api self.analysisId = aId - self.localFile = localFile + self.localFile = localFile # File object self.remoteFile = fileObject self.partSize = partSize self.cpuCount = cpuCount @@ -107,6 +107,7 @@ def __init__(self,api,aId,localFile,fileObject,cpuCount,partSize,tempdir,verbose self.tempDir = tempdir # self.Status = 'Initialized' self.StartTime = -1 + self.startChunk = startChunk # self.repeatCount = 0 # number of chunks we uploaded multiple times self.setup() @@ -129,6 +130,7 @@ def run(self): def setup(self): # determine the +# print self.localFile totalSize = os.path.getsize(self.localFile) fileCount = int(math.ceil(totalSize/(self.partSize*1024.0*1000))) @@ -137,11 +139,12 @@ def setup(self): print "Using split size " + str(self.partSize) +"Mb" print "Filecount " + str(fileCount) print "CPUs " + str(self.cpuCount) + print "startChunk " + str(self.startChunk) # Establish communication queues self.tasks = multiprocessing.JoinableQueue() self.completedPool = multiprocessing.Queue() - for i in xrange(1,fileCount+1): # set up the task queue + for i in xrange(self.startChunk,fileCount+1): # set up the task queue t = uploadTask(self.api,self.remoteFile.Id,i, fileCount, self.localFile, 0) self.tasks.put(t) self.totalTask = self.tasks.qsize() @@ -180,18 +183,20 @@ def startUpload(self,returnOnFinish=0,testInterval=5): self.finalize() return 1 else: + self.finalize() return 1 - def finalize(self): if self.getRunningThreadCount(): raise Exception('Cannot finalize a transfer with running threads.') if self.Status=='Running': - # code here for + time.sleep(1) # sleep one to make sure + print self.remoteFile.Id + self.remoteFile = self.api.markFileState(self.remoteFile.Id) self.Status=='Completed' else: raise Exception('To finalize the status of the transfer must be "Running."') - +# def hasFinished(self): if self.Status == 'Initialized': return 0 return not self.getRunningThreadCount()>0 @@ -214,7 +219,8 @@ def getRunningThreadCount(self): return sum([c.is_alive() for c in self.consumers]) def getTransRate(self): - # tasks completed size of file-parts + # tasks completed size of file-parts + if not self.getRunningTime()>0: return '0 mb/s' return str((self.totalTask - self.tasks.qsize())*self.partSize/self.getRunningTime())[:6] + ' mb/s' def getRunningTime(self): @@ -223,11 +229,13 @@ def getRunningTime(self): def getTotalTransfered(self): ''' - Returns the total data amoun transfered in Gb + Returns the total data amount transfered in Gb ''' return float((self.totalTask - self.tasks.qsize())*self.partSize)/1000.0 def getProgressRatio(self): - currentQ = float(self.tasks.qsize() - len(self.consumers)) - return str(float(self.totalTask - currentQ)/self.totalTask)[:6] \ No newline at end of file + currentQ = float(self.tasks.qsize()) + res = float(self.totalTask - currentQ)/self.totalTask + if res>1.0: res=1.0 + return str(res)[:5] \ No newline at end of file diff --git a/src/BaseSpacePy/model/Project.py b/src/BaseSpacePy/model/Project.py index 175d653..16cfa50 100644 --- a/src/BaseSpacePy/model/Project.py +++ b/src/BaseSpacePy/model/Project.py @@ -80,7 +80,7 @@ def createAppResult(self,api,name,desc,appSessionId=None,samples=[]): :param api: An instance of BaseSpaceAPI :param name: The name of the app result - :param desc: A describtion of the app result + :param desc: A description of the app result ''' self.isInit() return api.createAppResult(self.Id,name,desc,appSessionId=appSessionId,samples=samples)