diff --git a/README.md b/README.md index c43e42e1..59af1c8b 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,11 @@ [![Documentation Status](https://readthedocs.org/projects/pipeline-tools/badge/?version=latest)](http://pipeline-tools.readthedocs.io/en/latest/?badge=latest) -This repo contains Python code and pipelines for interacting with the Human Cell Atlas Data Coordination Platform. They are used by the Secondary Analysis Service. +This repository contains Python code and pipelines for interacting with the Human Cell Atlas Data Coordination Platform (DCP). They are used by the Secondary Analysis Service. -The pipelines wrap analysis pipelines from the Skylab repo and provide some glue to interface with the DCP. The adapter pipelines take bundle ids as inputs, query the Data Storage Service to find the input files needed by the analysis pipelines, then run the analysis pipelines and submit the results to the Ingest Service. This helps us keep the analysis pipelines themselves free of dependencies on the DCP. +The pipelines wrap analysis pipelines from the Skylab repository and provide some glue to interface with the DCP. The adapter pipelines take bundle ids as inputs, query the Data Storage Service to find the input files needed by the analysis pipelines, then run the analysis pipelines and submit the results to the Ingest Service. This helps us keep the analysis pipelines themselves free of dependencies on the DCP. + +Note: The adapter pipelines can only run in Cromwell instances that use SAM for Identity and Access Management (IAM), such as Cromwell-as-a-Service. ## Run tests diff --git a/adapter_pipelines/Optimus/adapter.wdl b/adapter_pipelines/Optimus/adapter.wdl index f334d912..dceb6305 100644 --- a/adapter_pipelines/Optimus/adapter.wdl +++ b/adapter_pipelines/Optimus/adapter.wdl @@ -118,7 +118,6 @@ workflow AdapterOptimus { Int? retry_timeout Int? individual_request_timeout String reference_bundle - Boolean use_caas # Set runtime environment such as "dev" or "staging" or "prod" so submit task could choose proper docker image to use String runtime_environment @@ -127,7 +126,7 @@ workflow AdapterOptimus { Int max_cromwell_retries = 0 Boolean add_md5s = false - String pipeline_tools_version = "v0.45.0" + String pipeline_tools_version = "v0.46.0" call GetInputs as prep { input: @@ -214,7 +213,6 @@ workflow AdapterOptimus { retry_timeout = retry_timeout, individual_request_timeout = individual_request_timeout, runtime_environment = runtime_environment, - use_caas = use_caas, record_http = record_http, pipeline_tools_version = pipeline_tools_version, add_md5s = add_md5s, diff --git a/adapter_pipelines/cellranger/adapter.wdl b/adapter_pipelines/cellranger/adapter.wdl index 380b966f..fbd244a0 100644 --- a/adapter_pipelines/cellranger/adapter.wdl +++ b/adapter_pipelines/cellranger/adapter.wdl @@ -142,7 +142,6 @@ workflow Adapter10xCount { Int? retry_timeout Int? individual_request_timeout String reference_bundle - Boolean use_caas # Set runtime environment such as "dev" or "staging" or "prod" so submit task could choose proper docker image to use String runtime_environment @@ -151,7 +150,7 @@ workflow Adapter10xCount { Int max_cromwell_retries = 0 Boolean add_md5s = false - String pipeline_tools_version = "v0.45.0" + String pipeline_tools_version = "v0.46.0" call GetInputs { input: @@ -252,7 +251,6 @@ workflow Adapter10xCount { retry_timeout = retry_timeout, individual_request_timeout = individual_request_timeout, runtime_environment = runtime_environment, - use_caas = use_caas, record_http = record_http, pipeline_tools_version = pipeline_tools_version, add_md5s = add_md5s, diff --git a/adapter_pipelines/ss2_single_sample/adapter.wdl b/adapter_pipelines/ss2_single_sample/adapter.wdl index 67c0c959..27fb3e24 100644 --- a/adapter_pipelines/ss2_single_sample/adapter.wdl +++ b/adapter_pipelines/ss2_single_sample/adapter.wdl @@ -74,7 +74,6 @@ workflow AdapterSmartSeq2SingleCell{ Int? retry_timeout Int? individual_request_timeout String reference_bundle - Boolean use_caas # Set runtime environment such as "dev" or "staging" or "prod" so submit task could choose proper docker image to use String runtime_environment @@ -83,7 +82,7 @@ workflow AdapterSmartSeq2SingleCell{ Int max_cromwell_retries = 0 Boolean add_md5s = false - String pipeline_tools_version = "v0.45.0" + String pipeline_tools_version = "v0.46.0" call GetInputs as prep { input: @@ -208,7 +207,6 @@ workflow AdapterSmartSeq2SingleCell{ retry_timeout = retry_timeout, individual_request_timeout = individual_request_timeout, runtime_environment = runtime_environment, - use_caas = use_caas, record_http = record_http, pipeline_tools_version = pipeline_tools_version, add_md5s = add_md5s, diff --git a/adapter_pipelines/submit.wdl b/adapter_pipelines/submit.wdl index 1244325a..1ab14097 100644 --- a/adapter_pipelines/submit.wdl +++ b/adapter_pipelines/submit.wdl @@ -7,8 +7,8 @@ task get_metadata { Float? retry_multiplier Int? retry_timeout Int? individual_request_timeout - Boolean use_caas Boolean record_http + String pipeline_tools_version Int max_retries = 0 command <<< @@ -25,11 +25,10 @@ task get_metadata { get-analysis-workflow-metadata \ --analysis_output_path ${analysis_output_path} \ - --cromwell_url ${cromwell_url} \ - --use_caas ${use_caas} + --cromwell_url ${cromwell_url} >>> runtime { - docker: (if runtime_environment == "prod" then "gcr.io/hca-dcp-pipelines-prod/cromwell-metadata:" else "gcr.io/broad-dsde-mint-${runtime_environment}/cromwell-metadata:") + "v1.2.0" + docker: "quay.io/humancellatlas/secondary-analysis-pipeline-tools:" + pipeline_tools_version maxRetries: max_retries } output { @@ -251,7 +250,6 @@ workflow submit { Float? retry_multiplier Int? retry_timeout Int? individual_request_timeout - Boolean use_caas # By default, don't record http requests Boolean record_http = false String pipeline_tools_version @@ -267,12 +265,12 @@ workflow submit { analysis_output_path = outputs[0], runtime_environment = runtime_environment, cromwell_url = cromwell_url, - use_caas=use_caas, record_http = record_http, retry_timeout = retry_timeout, individual_request_timeout = individual_request_timeout, retry_multiplier = retry_multiplier, retry_max_interval = retry_max_interval, + pipeline_tools_version = pipeline_tools_version, max_retries = max_retries } diff --git a/adapter_pipelines/submit_stub/submit.wdl b/adapter_pipelines/submit_stub/submit.wdl index 237f4470..b920f684 100644 --- a/adapter_pipelines/submit_stub/submit.wdl +++ b/adapter_pipelines/submit_stub/submit.wdl @@ -23,7 +23,6 @@ workflow submit { String analysis_file_version String method String runtime_environment - Boolean use_caas Int? retry_max_interval Float? retry_multiplier Int? retry_timeout diff --git a/pipeline_tools/get_analysis_workflow_metadata.py b/pipeline_tools/get_analysis_workflow_metadata.py index 684c1f55..701c0612 100644 --- a/pipeline_tools/get_analysis_workflow_metadata.py +++ b/pipeline_tools/get_analysis_workflow_metadata.py @@ -1,8 +1,7 @@ import argparse import json -from cromwell_tools import cromwell_tools -from requests.auth import HTTPBasicAuth - +import google.auth +import google.auth.transport.requests from pipeline_tools.http_requests import HttpRequests @@ -25,34 +24,33 @@ def get_analysis_workflow_id(analysis_output_path): return workflow_id -def get_auth(credentials_file=None): - """Parse cromwell username and password from credentials file. - - Args: - credentials_file (str): Path to the file containing cromwell authentication credentials. +def get_auth_headers(): + """ Get a bearer token from the default google account credentials on the machine that executes + this function. The credentials must have the scopes "https://www.googleapis.com/auth/userinfo.email" + and "https://www.googleapis.com/auth/userinfo.profile", which Cromwell will add automatically if + it is confiugred to use the Pipelines API v2 backend. Returns: - requests.auth.HTTPBasicAuth: An object to be used for cromwell requests. + headers (dict): authorization header containing bearer token {'Authorization': 'bearer 12345'} """ - credentials_file = credentials_file or '/cromwell-metadata/cromwell_credentials.txt' - with open(credentials_file) as f: - credentials = f.read().split() - user = credentials[0] - password = credentials[1] - return HTTPBasicAuth(user, password) + credentials, project = google.auth.default() + if not credentials.valid: + credentials.refresh(google.auth.transport.requests.Request()) + headers = {} + credentials.apply(headers) + return headers -def get_metadata(cromwell_url, workflow_id, http_requests, use_caas=False, caas_key_file=None): - """Get metadata for analysis workflow from Cromwell and write it to a JSON file. Retry the request with - exponentially increasing wait times if there is an error. +def get_metadata(cromwell_url, workflow_id, http_requests): + """Get metadata for analysis workflow from Cromwell and write it to a JSON file. This is only + compatible with instances of Cromwell that use SAM for Identity Access Management (IAM), such + as Cromwell-as-a-Service. Args: cromwell_url (str): Url to the cromwell environment the workflow was run in. workflow_id (str): The analysis workflow id. http_requests: `http_requests.HttpRequests` instance, a wrapper around requests provides better retry and logging. - use_caas (bool): Whether or not to use Cromwell-as-a-Service. - caas_key_file (str): Path to CaaS service account JSON key file. Raises: requests.HTTPError: For 4xx errors or 5xx errors beyond the timeout @@ -61,17 +59,9 @@ def get_metadata(cromwell_url, workflow_id, http_requests, use_caas=False, caas_ def log_before(workflow_id): print('Getting metadata for workflow {}'.format(workflow_id)) - cromwell_url = cromwell_url - - if use_caas: - json_credentials = caas_key_file or "/cromwell-metadata/caas_key.json" - headers = cromwell_tools.generate_auth_header_from_key_file(json_credentials) - auth = None - else: - headers = None - auth = get_auth() + headers = get_auth_headers() url = '{0}/{1}/metadata?expandSubWorkflows=true'.format(cromwell_url, workflow_id) - response = http_requests.get(url, auth=auth, headers=headers, before=log_before(workflow_id)) + response = http_requests.get(url, headers=headers, before=log_before(workflow_id)) with open('metadata.json', 'w') as f: json.dump(response.json(), f, indent=2) @@ -80,21 +70,15 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument('--analysis_output_path', required=True) parser.add_argument('--cromwell_url', required=True) - parser.add_argument('--use_caas', required=True) - parser.add_argument('--caas_key_file', required=False, default=None) args = parser.parse_args() - use_caas = True if args.use_caas.lower() == 'true' else False - print('Using analysis output path: {0}'.format(args.analysis_output_path)) # Get the workflow id and metadata, write them to files workflow_id = get_analysis_workflow_id(analysis_output_path=args.analysis_output_path) get_metadata(cromwell_url=args.cromwell_url, workflow_id=workflow_id, - http_requests=HttpRequests(), - use_caas=use_caas, - caas_key_file=args.caas_key_file) + http_requests=HttpRequests()) if __name__ == '__main__': diff --git a/pipeline_tools/tests/test_get_analysis_workflow_metadata.py b/pipeline_tools/tests/test_get_analysis_workflow_metadata.py index e97e063f..0db8fe1f 100644 --- a/pipeline_tools/tests/test_get_analysis_workflow_metadata.py +++ b/pipeline_tools/tests/test_get_analysis_workflow_metadata.py @@ -1,7 +1,6 @@ import os import pytest import requests -from requests.auth import HTTPBasicAuth from unittest.mock import patch from pipeline_tools import get_analysis_workflow_metadata @@ -52,11 +51,7 @@ class Data: return Data -def mocked_get_auth(): - return HTTPBasicAuth('user', 'password') - - -def mocked_generate_auth_header_from_key_file(foo_credentials): +def mocked_get_auth_headers(): return {'Authorization': 'bearer 12345'} @@ -72,32 +67,6 @@ def test_get_analysis_workflow_id(self, test_data, tmpdir): assert result == expected assert current_file_path.read() == 'analysis_subworkflow_id' - def test_get_auth(self): - credentials_file = '{0}test_credentials.txt'.format(data_dir) - auth = get_analysis_workflow_metadata.get_auth(credentials_file) - expected_auth = HTTPBasicAuth('fake-user', 'fake-password') - assert auth == expected_auth - - def test_get_metadata_success(self, requests_mock, test_data, tmpdir): - current_file_path = tmpdir.join('metadata.json') - - def _request_callback(request, context): - context.status_code = 200 - return { - 'workflowName': 'TestWorkflow' - } - - requests_mock.get(test_data.cromwell_metadata_url, json=_request_callback) - with patch('pipeline_tools.get_analysis_workflow_metadata.get_auth', side_effect=mocked_get_auth), \ - tmpdir.as_cwd(), \ - HttpRequestsManager(): - get_analysis_workflow_metadata.get_metadata(test_data.base_url, - test_data.workflow_id, - HttpRequests(), - use_caas=False) - assert requests_mock.call_count == 1 - assert current_file_path.read() is not None - def test_get_metadata_using_caas(self, requests_mock, test_data, tmpdir): current_file_path = tmpdir.join('metadata.json') @@ -108,12 +77,11 @@ def _request_callback(request, context): } requests_mock.get(test_data.caas_metadata_url, json=_request_callback) - with patch('pipeline_tools.get_analysis_workflow_metadata.cromwell_tools.generate_auth_header_from_key_file', - side_effect=mocked_generate_auth_header_from_key_file), tmpdir.as_cwd(), HttpRequestsManager(): + with patch('pipeline_tools.get_analysis_workflow_metadata.get_auth_headers', + side_effect=mocked_get_auth_headers), tmpdir.as_cwd(), HttpRequestsManager(): get_analysis_workflow_metadata.get_metadata(test_data.caas_base_url, test_data.workflow_id, - HttpRequests(), - use_caas=True) + HttpRequests()) assert requests_mock.call_count == 1 assert current_file_path.read() is not None @@ -123,10 +91,9 @@ def _request_callback(request, context): return {'status': 'error', 'message': 'Internal Server Error'} requests_mock.get(test_data.cromwell_metadata_url, json=_request_callback) - with patch('pipeline_tools.get_analysis_workflow_metadata.get_auth', side_effect=mocked_get_auth), \ - pytest.raises(requests.HTTPError), HttpRequestsManager(): + with patch('pipeline_tools.get_analysis_workflow_metadata.get_auth_headers', + side_effect=mocked_get_auth_headers), pytest.raises(requests.HTTPError), HttpRequestsManager(): get_analysis_workflow_metadata.get_metadata(test_data.base_url, test_data.workflow_id, - HttpRequests(), - use_caas=False) + HttpRequests()) assert requests_mock.call_count == 3 diff --git a/requirements.txt b/requirements.txt index 8667c5df..0761dd2e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ requests==2.20.0 +google-auth>=1.6.1,<2 google-cloud-storage==1.8.0 tenacity==4.10.0 PyJWT==1.6.4 -git+git://github.com/broadinstitute/cromwell-tools.git@v0.5.0 git+git://github.com/HumanCellAtlas/metadata-api@release/1.0b4#egg=hca-metadata-api[dss] diff --git a/setup.py b/setup.py index 70b1504c..e1c6235a 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ license='BSD 3-clause "New" or "Revised" License', packages=['pipeline_tools'], install_requires=[ - 'cromwell-tools', + 'google-auth>=1.6.1,<2', 'google-cloud-storage>=1.8.0,<2', 'hca>=4.5.0,<5', 'hca-metadata-api', @@ -39,7 +39,6 @@ }, # FIXME: DEPRECATION: Dependency Links processing has been deprecated and will be removed in a future release. dependency_links=[ - 'git+git://github.com/broadinstitute/cromwell-tools.git@v0.5.0#egg=cromwell-tools-1.0.1', # FIXME: install hca-metadata-api from PyPI once it is available (shortly) # Pin to a specific commit of the hca-metadata-api so we won't be broken by changes to that repo before it's # available on PyPI