Skip to content

Commit

Permalink
Merge pull request #19 from ga4gh/elixir-wes-client-issue-11
Browse files Browse the repository at this point in the history
Add ELIXIR WES client [#11]
Additional notebook using DNAStack WES
Adding the above identified need for return value from FASPRunner.RunQuery - added
  • Loading branch information
ianfore authored Jan 22, 2021
2 parents 8b207a5 + 63f1030 commit c0bd053
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 99 deletions.
10 changes: 8 additions & 2 deletions fasp/runner/fasp_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ def runQuery(self, query, note):
query_job = self.searchClient.runQuery(query) # Send the query
creditor.creditClass(self.searchClient)
# repeat steps 2 and 3 for each row of the query
runIds = []
for row in query_job:

print("subject={}, drsID={}".format(row[0], row[1]))
# To do - get the subject/sample name from the query
runKey = 'subject'
print("{}={}, drsID={}".format(runKey, row[0], row[1]))


# Step 2 - Use DRS to get the URL
objInfo = self.drsClient.getObject(row[1])
Expand All @@ -72,6 +76,7 @@ def runQuery(self, query, note):
if self.live:
pipeline_id = self.workClient.runWorkflow(url, outfile)
print('workflow submitted, run:{}'.format(pipeline_id))
runIds.append({runKey:row[0], 'run_id':pipeline_id})
creditor.creditClass(self.workClient)
via = 'sh'
#pipeline_id = 'paste here'
Expand All @@ -80,4 +85,5 @@ def runQuery(self, query, note):
if self.live:
self.pipelineLogger.logRun(time, via, note, pipeline_id, outfile, str(fileSize),
self.searchClient, self.drsClient, self.workClient)


return runIds
99 changes: 31 additions & 68 deletions fasp/workflow/dnastack_wesclient.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
'''
Client for DNAStack WES Service
'''

import requests
import os
import json
import os
import tempfile

import pandas as pd
import sys
import requests

from fasp.workflow import WESClient


class DNAStackWESClient(WESClient):
"""
Client for DNAStack WES Service
"""

def __init__(self, client_credentials_path, debug=False):
super(DNAStackWESClient, self).__init__('https://workspaces-wes.prod.dnastack.com/ga4gh/wes/v1/runs')
self.tokenUrl = 'https://wallet.prod.dnastack.com/oauth/token'
full_credentials_path = os.path.expanduser(client_credentials_path)
with open(full_credentials_path) as f:
self.credentials = json.load(f)
self.__updateAccessToken__()
self.headers = {'Authorization': 'Bearer {}'.format(self.accessToken)}
self.__updateAccessToken()
self.headers['Authorization'] = 'Bearer {}'.format(self.accessToken)
self.debug = debug
self.modulePath = os.path.dirname(os.path.abspath(__file__))
self.wdlPath = self.modulePath + '/wes/gwas'

def __updateAccessToken__(self):
def __updateAccessToken(self):
if 'id' not in self.credentials or 'secret' not in self.credentials:
raise RuntimeError('Credentials file must have "id" and "secret" values')
payload = [
Expand All @@ -48,50 +48,23 @@ def __updateAccessToken__(self):

def runWorkflow(self, fileurl, outfile):
# use a temporary file to write out the input file
inputJson = {"md5Sum.inputFile":fileurl}
with tempfile.TemporaryFile() as fp:
fp.write(json.dumps(inputJson).encode('utf-8'))
fp.seek(0)
payload = {'workflow_url': 'checksum.wdl'}
files = {
'workflow_params': ('inputs.json', fp, 'application/json'),
'workflow_attachment': ('checksum.wdl', open(self.modulePath+'/wes/checksum.wdl', 'rb'), 'text/plain')
}
inputs = {"md5Sum.inputFile": fileurl}

return self.runGenericWorkflow(
workflow_url='checksum.wdl',
workflow_params=json.dumps(inputs),
workflow_attachment=('checksum.wdl', open(self.modulePath+'/wes/checksum.wdl', 'rb'), 'text/plain')
)


response = requests.request("POST", self.api_url_base, headers=self.headers, data = payload, files = files)
if self.debug:
print(response)
if response.status_code == 200:
return response.json()['run_id']
elif response.status_code == 401:
print("WES server authentication failed")
sys.exit(1)
else:
print("WES run submission failed. Response status:{}".format(response.status_code))
sys.exit(1)



def runGWASWorkflowTest(self):
''' run the GWAS workflow by submitting local files '''
payload = {'workflow_url': 'gwas.wdl'}
files = {
'workflow_params': ('inputs.gwas.json', open(self.wdlPath+'/inputs.gwas.json', 'rb'), 'application/json'),
'workflow_attachment': ('gwas.wdl', open(self.wdlPath+'/gwas.wdl', 'rb'), 'text/plain')
}

response = requests.request("POST", self.api_url_base, headers=self.headers, data = payload, files = files)
if response.status_code == 200:
return response.json()['run_id']
elif response.status_code == 401:
print("WES server authentication failed")
sys.exit(1)
else:
print("WES run submission failed. Response status:{}".format(response.status_code))
sys.exit(1)
return self.runGenericWorkflow(
workflow_url='gwas.wdl',
workflow_params=open(self.wdlPath+'/inputs.gwas.json', 'rb'),
workflow_attachment=('gwas.wdl', open(self.wdlPath+'/gwas.wdl', 'rb'), 'text/plain')
)

return response.json()['run_id']

def runGWASWorkflow(self, vcfFileurl, csvfileurl):
''' run the GWAS workflow by using files accessed by DRS'''
Expand All @@ -100,27 +73,18 @@ def runGWASWorkflow(self, vcfFileurl, csvfileurl):
with tempfile.TemporaryFile() as fp:
fp.write(json.dumps(inputJson).encode('utf-8'))
fp.seek(0)
payload = {'workflow_url': 'gwas.wdl'}
files = {
payload = {
'workflow_url': 'gwas.wdl',
'workflow_params': ('inputs.gwas.json', fp, 'application/json'),
'workflow_attachment': ('gwas.wdl', open(self.wdlPath+'/gwas.wdl', 'rb'), 'text/plain')
}


response = requests.request("POST", self.api_url_base, headers=self.headers, data = payload, files = files)
if self.debug:
print(response)
if response.status_code == 200:
return response.json()['run_id']
elif response.status_code == 401:
print("WES server authentication failed")
sys.exit(1)
else:
print("WES run submission failed. Response status:{}".format(response.status_code))
sys.exit(1)
return self.runGenericWorkflow(
workflow_url='gwas.wdl',
workflow_params=fp,
workflow_attachment=('gwas.wdl', open(self.wdlPath+'/gwas.wdl', 'rb'), 'text/plain')
)

return response.json()['run_id']


def addRun(self, run_id, runsdf):
runURL = "{}/{}".format(self.api_url_base, run_id)
Expand Down Expand Up @@ -167,8 +131,7 @@ def getRuns(self):
if __name__ == "__main__":
myClient = DNAStackWESClient('~/.keys/dnastack_wes_credentials.json')

res = myClient.runGWASWorkflowTest()
#res = myClient.runWorkflow('gs://dnastack-public-bucket/thousand_genomes_meta.csv', '')

# res = myClient.runGWASWorkflowTest()
res = myClient.runWorkflow('gs://dnastack-public-bucket/thousand_genomes_meta.csv', '')

print(res)
47 changes: 47 additions & 0 deletions fasp/workflow/elixir_wesclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import json
import os

from fasp.workflow import WESClient


class ElixirWESClient(WESClient):
"""
Client for ELIXIR WES Service
"""

def __init__(self, client_credentials_path, debug=False):
super(ElixirWESClient, self).__init__('https://wes-eu.egci-endpoints.imsi.athenarc.gr/ga4gh/wes/v1/runs')
full_credentials_path = os.path.expanduser(client_credentials_path)
with open(full_credentials_path) as f:
self.credentials = json.load(f)
if 'access_token' not in self.credentials:
raise RuntimeError('Must define "access_token" in credentials file')
self.headers['Authorization'] = 'Bearer {}'.format(self.credentials['access_token'])
self.debug = debug
self.modulePath = os.path.dirname(os.path.abspath(__file__))
self.wdlPath = self.modulePath + '/wes/gwas'

def runWorkflow(self):
# use a temporary file to write out the input file
workflow_url = 'https://github.com/uniqueg/cwl-example-workflows/blob/master/hashsplitter-workflow.cwl'
params = {
'input': {
'class': 'File',
'path': 'http://62.217.82.57/test.txt'
}
}

return self.runGenericWorkflow(
workflow_url=workflow_url,
workflow_params=json.dumps(params),
workflow_type='CWL',
workflow_type_version='v1.0'
)


if __name__ == "__main__":
myClient = ElixirWESClient('~/.keys/elixir_wes_credentials.json')

res = myClient.runWorkflow()

print(res)
54 changes: 37 additions & 17 deletions fasp/workflow/wesclient.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import requests
import json
import sys
from typing import Dict, Optional, Union, IO

import requests

''' base class for a WES Client'''
class WESClient:
api_url_base: str
headers: Dict[str, str]


def __init__(self, api_url_base):
self.api_url_base = api_url_base
self.headers = {}


def getTaskStatus(self, run_id, verbose=False):
Expand All @@ -32,25 +36,41 @@ def GetRunLog(self, run_id, verbose=False):
if runResp.status_code == 400:
return 'task not found'
print(runResp)

def runGenericWorkflow(self, body, verbose=False):
if verbose: print("sending to {}".format( self.api_url_base))

response = requests.request('POST', self.api_url_base, headers=self.headers, files=body)

def runGenericWorkflow(
self,
workflow_url: str,
workflow_params: Union[str, IO[bytes], None] = None,
workflow_engine_params: Union[str, IO[bytes], None] = None,
workflow_type: Optional[str] = None,
workflow_type_version: Optional[str] = None,
tags: Union[str, IO[bytes], None] = None,
workflow_attachment=None,
verbose=False
):
if verbose:
print("sending to {}".format( self.api_url_base))

attachments = {
'workflow_url': (None, workflow_url,'text/plain'),
'workflow_params': (None, workflow_params, 'application/json'),
'workflow_engine_params': (None, workflow_engine_params, 'application/json'),
'workflow_type': (None, workflow_type, 'text/plain'),
'workflow_type_version': (None, workflow_type_version, 'text/plain'),
'tags': (None, tags, 'text/plain'),
'workflow_attachment': workflow_attachment,
}

response = requests.request('POST', self.api_url_base, headers=self.headers, files=attachments)
if verbose:
print(response.request.body)
print(response.text)
print(response)
if response.status_code == 200:
return response.json()['run_id']
elif response.status_code == 401:
print("WES server authentication failed")
sys.exit(1)
raise RuntimeError("WES server authentication failed")
else:
print("Full response status:\n{}".format(response))
print("WES run submission failed. Response status:{}".format(response.status_code))

sys.exit(1)




print("Full response content:\n{}".format(response.content))
raise RuntimeError("WES run submission failed. Response status:{}".format(response.status_code))
Loading

0 comments on commit c0bd053

Please sign in to comment.