Skip to content

Commit

Permalink
Remove cromwell-metadata docker (#105)
Browse files Browse the repository at this point in the history
* Remove cromwell-metadata docker

Use the default application credentials
of the VM, which are the same credentials
provided in workflow options, when requesting
metadata from cromwell instead of storing the
JSON key in a private docker image.

* Use PAPIv2

* Remove cromwell-tools requirement

* Remove use_caas parameter

The parameter was previously being
used to determine which credentials
to use when sending an API request to
Cromwell to retrieve analysis workflow
metadata. Now the submit wdl will only
run in caas so the parameter is no longer
needed.

* Update documentation

* Update pipeline-tools docker image version
  • Loading branch information
samanehsan authored Feb 1, 2019
1 parent 03f681d commit a2ab84a
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 98 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
[![Documentation Status](https://readthedocs.org/projects/pipeline-tools/badge/?version=latest)](http://pipeline-tools.readthedocs.io/en/latest/?badge=latest)


This repo contains Python code and pipelines for interacting with the Human Cell Atlas Data Coordination Platform. They are used by the Secondary Analysis Service.
This repository contains Python code and pipelines for interacting with the Human Cell Atlas Data Coordination Platform (DCP). They are used by the Secondary Analysis Service.

The pipelines wrap analysis pipelines from the Skylab repo and provide some glue to interface with the DCP. The adapter pipelines take bundle ids as inputs, query the Data Storage Service to find the input files needed by the analysis pipelines, then run the analysis pipelines and submit the results to the Ingest Service. This helps us keep the analysis pipelines themselves free of dependencies on the DCP.
The pipelines wrap analysis pipelines from the Skylab repository and provide some glue to interface with the DCP. The adapter pipelines take bundle ids as inputs, query the Data Storage Service to find the input files needed by the analysis pipelines, then run the analysis pipelines and submit the results to the Ingest Service. This helps us keep the analysis pipelines themselves free of dependencies on the DCP.

Note: The adapter pipelines can only run in Cromwell instances that use SAM for Identity and Access Management (IAM), such as Cromwell-as-a-Service.

## Run tests

Expand Down
4 changes: 1 addition & 3 deletions adapter_pipelines/Optimus/adapter.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ workflow AdapterOptimus {
Int? retry_timeout
Int? individual_request_timeout
String reference_bundle
Boolean use_caas
# Set runtime environment such as "dev" or "staging" or "prod" so submit task could choose proper docker image to use
String runtime_environment
Expand All @@ -127,7 +126,7 @@ workflow AdapterOptimus {
Int max_cromwell_retries = 0
Boolean add_md5s = false
String pipeline_tools_version = "v0.45.0"
String pipeline_tools_version = "v0.46.0"
call GetInputs as prep {
input:
Expand Down Expand Up @@ -214,7 +213,6 @@ workflow AdapterOptimus {
retry_timeout = retry_timeout,
individual_request_timeout = individual_request_timeout,
runtime_environment = runtime_environment,
use_caas = use_caas,
record_http = record_http,
pipeline_tools_version = pipeline_tools_version,
add_md5s = add_md5s,
Expand Down
4 changes: 1 addition & 3 deletions adapter_pipelines/cellranger/adapter.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ workflow Adapter10xCount {
Int? retry_timeout
Int? individual_request_timeout
String reference_bundle
Boolean use_caas
# Set runtime environment such as "dev" or "staging" or "prod" so submit task could choose proper docker image to use
String runtime_environment
Expand All @@ -151,7 +150,7 @@ workflow Adapter10xCount {
Int max_cromwell_retries = 0
Boolean add_md5s = false
String pipeline_tools_version = "v0.45.0"
String pipeline_tools_version = "v0.46.0"
call GetInputs {
input:
Expand Down Expand Up @@ -252,7 +251,6 @@ workflow Adapter10xCount {
retry_timeout = retry_timeout,
individual_request_timeout = individual_request_timeout,
runtime_environment = runtime_environment,
use_caas = use_caas,
record_http = record_http,
pipeline_tools_version = pipeline_tools_version,
add_md5s = add_md5s,
Expand Down
4 changes: 1 addition & 3 deletions adapter_pipelines/ss2_single_sample/adapter.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ workflow AdapterSmartSeq2SingleCell{
Int? retry_timeout
Int? individual_request_timeout
String reference_bundle
Boolean use_caas
# Set runtime environment such as "dev" or "staging" or "prod" so submit task could choose proper docker image to use
String runtime_environment
Expand All @@ -83,7 +82,7 @@ workflow AdapterSmartSeq2SingleCell{
Int max_cromwell_retries = 0
Boolean add_md5s = false
String pipeline_tools_version = "v0.45.0"
String pipeline_tools_version = "v0.46.0"
call GetInputs as prep {
input:
Expand Down Expand Up @@ -208,7 +207,6 @@ workflow AdapterSmartSeq2SingleCell{
retry_timeout = retry_timeout,
individual_request_timeout = individual_request_timeout,
runtime_environment = runtime_environment,
use_caas = use_caas,
record_http = record_http,
pipeline_tools_version = pipeline_tools_version,
add_md5s = add_md5s,
Expand Down
10 changes: 4 additions & 6 deletions adapter_pipelines/submit.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ task get_metadata {
Float? retry_multiplier
Int? retry_timeout
Int? individual_request_timeout
Boolean use_caas
Boolean record_http
String pipeline_tools_version
Int max_retries = 0
command <<<
Expand All @@ -25,11 +25,10 @@ task get_metadata {
get-analysis-workflow-metadata \
--analysis_output_path ${analysis_output_path} \
--cromwell_url ${cromwell_url} \
--use_caas ${use_caas}
--cromwell_url ${cromwell_url}
>>>
runtime {
docker: (if runtime_environment == "prod" then "gcr.io/hca-dcp-pipelines-prod/cromwell-metadata:" else "gcr.io/broad-dsde-mint-${runtime_environment}/cromwell-metadata:") + "v1.2.0"
docker: "quay.io/humancellatlas/secondary-analysis-pipeline-tools:" + pipeline_tools_version
maxRetries: max_retries
}
output {
Expand Down Expand Up @@ -251,7 +250,6 @@ workflow submit {
Float? retry_multiplier
Int? retry_timeout
Int? individual_request_timeout
Boolean use_caas
# By default, don't record http requests
Boolean record_http = false
String pipeline_tools_version
Expand All @@ -267,12 +265,12 @@ workflow submit {
analysis_output_path = outputs[0],
runtime_environment = runtime_environment,
cromwell_url = cromwell_url,
use_caas=use_caas,
record_http = record_http,
retry_timeout = retry_timeout,
individual_request_timeout = individual_request_timeout,
retry_multiplier = retry_multiplier,
retry_max_interval = retry_max_interval,
pipeline_tools_version = pipeline_tools_version,
max_retries = max_retries
}

Expand Down
1 change: 0 additions & 1 deletion adapter_pipelines/submit_stub/submit.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ workflow submit {
String analysis_file_version
String method
String runtime_environment
Boolean use_caas
Int? retry_max_interval
Float? retry_multiplier
Int? retry_timeout
Expand Down
58 changes: 21 additions & 37 deletions pipeline_tools/get_analysis_workflow_metadata.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import argparse
import json
from cromwell_tools import cromwell_tools
from requests.auth import HTTPBasicAuth

import google.auth
import google.auth.transport.requests
from pipeline_tools.http_requests import HttpRequests


Expand All @@ -25,34 +24,33 @@ def get_analysis_workflow_id(analysis_output_path):
return workflow_id


def get_auth(credentials_file=None):
"""Parse cromwell username and password from credentials file.
Args:
credentials_file (str): Path to the file containing cromwell authentication credentials.
def get_auth_headers():
""" Get a bearer token from the default google account credentials on the machine that executes
this function. The credentials must have the scopes "https://www.googleapis.com/auth/userinfo.email"
and "https://www.googleapis.com/auth/userinfo.profile", which Cromwell will add automatically if
it is confiugred to use the Pipelines API v2 backend.
Returns:
requests.auth.HTTPBasicAuth: An object to be used for cromwell requests.
headers (dict): authorization header containing bearer token {'Authorization': 'bearer 12345'}
"""
credentials_file = credentials_file or '/cromwell-metadata/cromwell_credentials.txt'
with open(credentials_file) as f:
credentials = f.read().split()
user = credentials[0]
password = credentials[1]
return HTTPBasicAuth(user, password)
credentials, project = google.auth.default()
if not credentials.valid:
credentials.refresh(google.auth.transport.requests.Request())
headers = {}
credentials.apply(headers)
return headers


def get_metadata(cromwell_url, workflow_id, http_requests, use_caas=False, caas_key_file=None):
"""Get metadata for analysis workflow from Cromwell and write it to a JSON file. Retry the request with
exponentially increasing wait times if there is an error.
def get_metadata(cromwell_url, workflow_id, http_requests):
"""Get metadata for analysis workflow from Cromwell and write it to a JSON file. This is only
compatible with instances of Cromwell that use SAM for Identity Access Management (IAM), such
as Cromwell-as-a-Service.
Args:
cromwell_url (str): Url to the cromwell environment the workflow was run in.
workflow_id (str): The analysis workflow id.
http_requests: `http_requests.HttpRequests` instance, a wrapper around requests provides better retry and
logging.
use_caas (bool): Whether or not to use Cromwell-as-a-Service.
caas_key_file (str): Path to CaaS service account JSON key file.
Raises:
requests.HTTPError: For 4xx errors or 5xx errors beyond the timeout
Expand All @@ -61,17 +59,9 @@ def get_metadata(cromwell_url, workflow_id, http_requests, use_caas=False, caas_
def log_before(workflow_id):
print('Getting metadata for workflow {}'.format(workflow_id))

cromwell_url = cromwell_url

if use_caas:
json_credentials = caas_key_file or "/cromwell-metadata/caas_key.json"
headers = cromwell_tools.generate_auth_header_from_key_file(json_credentials)
auth = None
else:
headers = None
auth = get_auth()
headers = get_auth_headers()
url = '{0}/{1}/metadata?expandSubWorkflows=true'.format(cromwell_url, workflow_id)
response = http_requests.get(url, auth=auth, headers=headers, before=log_before(workflow_id))
response = http_requests.get(url, headers=headers, before=log_before(workflow_id))
with open('metadata.json', 'w') as f:
json.dump(response.json(), f, indent=2)

Expand All @@ -80,21 +70,15 @@ def main():
parser = argparse.ArgumentParser()
parser.add_argument('--analysis_output_path', required=True)
parser.add_argument('--cromwell_url', required=True)
parser.add_argument('--use_caas', required=True)
parser.add_argument('--caas_key_file', required=False, default=None)
args = parser.parse_args()

use_caas = True if args.use_caas.lower() == 'true' else False

print('Using analysis output path: {0}'.format(args.analysis_output_path))

# Get the workflow id and metadata, write them to files
workflow_id = get_analysis_workflow_id(analysis_output_path=args.analysis_output_path)
get_metadata(cromwell_url=args.cromwell_url,
workflow_id=workflow_id,
http_requests=HttpRequests(),
use_caas=use_caas,
caas_key_file=args.caas_key_file)
http_requests=HttpRequests())


if __name__ == '__main__':
Expand Down
47 changes: 7 additions & 40 deletions pipeline_tools/tests/test_get_analysis_workflow_metadata.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import pytest
import requests
from requests.auth import HTTPBasicAuth
from unittest.mock import patch

from pipeline_tools import get_analysis_workflow_metadata
Expand Down Expand Up @@ -52,11 +51,7 @@ class Data:
return Data


def mocked_get_auth():
return HTTPBasicAuth('user', 'password')


def mocked_generate_auth_header_from_key_file(foo_credentials):
def mocked_get_auth_headers():
return {'Authorization': 'bearer 12345'}


Expand All @@ -72,32 +67,6 @@ def test_get_analysis_workflow_id(self, test_data, tmpdir):
assert result == expected
assert current_file_path.read() == 'analysis_subworkflow_id'

def test_get_auth(self):
credentials_file = '{0}test_credentials.txt'.format(data_dir)
auth = get_analysis_workflow_metadata.get_auth(credentials_file)
expected_auth = HTTPBasicAuth('fake-user', 'fake-password')
assert auth == expected_auth

def test_get_metadata_success(self, requests_mock, test_data, tmpdir):
current_file_path = tmpdir.join('metadata.json')

def _request_callback(request, context):
context.status_code = 200
return {
'workflowName': 'TestWorkflow'
}

requests_mock.get(test_data.cromwell_metadata_url, json=_request_callback)
with patch('pipeline_tools.get_analysis_workflow_metadata.get_auth', side_effect=mocked_get_auth), \
tmpdir.as_cwd(), \
HttpRequestsManager():
get_analysis_workflow_metadata.get_metadata(test_data.base_url,
test_data.workflow_id,
HttpRequests(),
use_caas=False)
assert requests_mock.call_count == 1
assert current_file_path.read() is not None

def test_get_metadata_using_caas(self, requests_mock, test_data, tmpdir):
current_file_path = tmpdir.join('metadata.json')

Expand All @@ -108,12 +77,11 @@ def _request_callback(request, context):
}

requests_mock.get(test_data.caas_metadata_url, json=_request_callback)
with patch('pipeline_tools.get_analysis_workflow_metadata.cromwell_tools.generate_auth_header_from_key_file',
side_effect=mocked_generate_auth_header_from_key_file), tmpdir.as_cwd(), HttpRequestsManager():
with patch('pipeline_tools.get_analysis_workflow_metadata.get_auth_headers',
side_effect=mocked_get_auth_headers), tmpdir.as_cwd(), HttpRequestsManager():
get_analysis_workflow_metadata.get_metadata(test_data.caas_base_url,
test_data.workflow_id,
HttpRequests(),
use_caas=True)
HttpRequests())
assert requests_mock.call_count == 1
assert current_file_path.read() is not None

Expand All @@ -123,10 +91,9 @@ def _request_callback(request, context):
return {'status': 'error', 'message': 'Internal Server Error'}

requests_mock.get(test_data.cromwell_metadata_url, json=_request_callback)
with patch('pipeline_tools.get_analysis_workflow_metadata.get_auth', side_effect=mocked_get_auth), \
pytest.raises(requests.HTTPError), HttpRequestsManager():
with patch('pipeline_tools.get_analysis_workflow_metadata.get_auth_headers',
side_effect=mocked_get_auth_headers), pytest.raises(requests.HTTPError), HttpRequestsManager():
get_analysis_workflow_metadata.get_metadata(test_data.base_url,
test_data.workflow_id,
HttpRequests(),
use_caas=False)
HttpRequests())
assert requests_mock.call_count == 3
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
requests==2.20.0
google-auth>=1.6.1,<2
google-cloud-storage==1.8.0
tenacity==4.10.0
PyJWT==1.6.4
git+git://github.com/broadinstitute/[email protected]
git+git://github.com/HumanCellAtlas/metadata-api@release/1.0b4#egg=hca-metadata-api[dss]
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
license='BSD 3-clause "New" or "Revised" License',
packages=['pipeline_tools'],
install_requires=[
'cromwell-tools',
'google-auth>=1.6.1,<2',
'google-cloud-storage>=1.8.0,<2',
'hca>=4.5.0,<5',
'hca-metadata-api',
Expand All @@ -39,7 +39,6 @@
},
# FIXME: DEPRECATION: Dependency Links processing has been deprecated and will be removed in a future release.
dependency_links=[
'git+git://github.com/broadinstitute/[email protected]#egg=cromwell-tools-1.0.1',
# FIXME: install hca-metadata-api from PyPI once it is available (shortly)
# Pin to a specific commit of the hca-metadata-api so we won't be broken by changes to that repo before it's
# available on PyPI
Expand Down

0 comments on commit a2ab84a

Please sign in to comment.