Skip to content

Commit

Permalink
Improve retry logging and restrict dependencies to a single major ver…
Browse files Browse the repository at this point in the history
…sion (807). (#57)
  • Loading branch information
David Shiga authored Jul 2, 2018
1 parent 841d0c0 commit fb38b1b
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 38 deletions.
9 changes: 5 additions & 4 deletions adapter_pipelines/submit.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,12 @@ task stage_files {
export PYTHONUNBUFFERED=TRUE

# Get the urn needed for staging files
staging_urn=$(get-staging-urn --envelope_url ${submission_url})
get-upload-urn -envelope_url ${submission_url} -output upload_urn.txt
upload_urn=$(cat upload_urn.txt)

# Select staging area
echo "hca upload select $staging_urn"
hca upload select $staging_urn
# Select upload area
echo "hca upload select $upload_urn"
hca upload select $upload_urn

# Stage the files
files=( ${sep=' ' files} )
Expand Down
4 changes: 3 additions & 1 deletion pipeline_tools/confirm_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import argparse
from tenacity import retry_if_result, RetryError
from datetime import datetime
from pipeline_tools.http_requests import HttpRequests


Expand All @@ -22,7 +23,8 @@ def wait_for_valid_status(envelope_url, http_requests):
tenacity.RetryError: if status is invalid past timeout
"""
def log_before(envelope_url):
print('Getting status for {}'.format(envelope_url))
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('{0} Getting status for {1}'.format(now, envelope_url))

def status_is_invalid(response):
envelope_js = response.json()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,30 @@

import argparse
from tenacity import retry_if_result, RetryError
from datetime import datetime
from pipeline_tools.http_requests import HttpRequests


def run(envelope_url, http_requests):
"""Check the contents of the submission envelope for the staging urn. Retry until the envelope contains a
staging urn, or if there is an error with the request.
"""Check the contents of the submission envelope for the upload urn. Retry until the envelope contains a
upload urn, or if there is an error with the request.
Args:
http_requests (HttpRequests): an HttpRequests object.
envelope_url (str): the submission envelope url
Returns:
String giving the staging urn in the format dcp:upl:aws:integration:12345:abcde
String giving the upload urn in the format dcp:upl:aws:integration:12345:abcde
Raises:
requests.HTTPError: for 4xx errors or 5xx errors beyond timeout
tenacity.RetryError: if urn is missing beyond timeout
"""
def urn_is_none(response):
envelope_js = response.json()
urn = get_staging_urn(envelope_js)
urn = get_upload_urn(envelope_js)
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('{0} Upload urn: {1}'.format(now, urn))
return urn is None

response = (
Expand All @@ -32,18 +35,18 @@ def urn_is_none(response):
retry=retry_if_result(urn_is_none)
)
)
urn = get_staging_urn(response.json())
urn = get_upload_urn(response.json())
return urn


def get_staging_urn(envelope_js):
"""Get the staging urn from the submission envelope.
def get_upload_urn(envelope_js):
"""Get the upload urn from the submission envelope.
Args:
envelope_js (dict): the submission envelope contents
Returns:
String giving the staging urn in the format dcp:upl:aws:integration:12345:abcde,
String giving the upload urn in the format s3://<bucket>/<uuid>,
or None if the envelope doesn't contain a urn
"""
details = envelope_js.get('stagingDetails')
Expand All @@ -58,14 +61,16 @@ def get_staging_urn(envelope_js):

def main():
parser = argparse.ArgumentParser()
parser.add_argument('--envelope_url', required=True)
parser.add_argument('-envelope_url', required=True)
parser.add_argument('-output', default='upload_urn.txt')
args = parser.parse_args()
try:
urn = run(args.envelope_url, HttpRequests())
except RetryError:
message = 'Timed out while trying to get urn.'
raise ValueError(message)
print(urn)
with open('upload_urn.txt', 'w') as f:
f.write(urn)


if __name__ == '__main__':
Expand Down
3 changes: 3 additions & 0 deletions pipeline_tools/http_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import glob
from tenacity import retry, stop_after_attempt, stop_after_delay, wait_exponential, retry_if_exception
from datetime import datetime


HTTP_RECORD_DIR = 'HTTP_RECORD_DIR'
Expand Down Expand Up @@ -140,6 +141,8 @@ def post(self, *args, **kwargs):

def _http_request_with_retry(self, *args, **kwargs):
def is_retryable(error):
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('{0} {1}'.format(now, repr(error)))
def is_retryable_status_code(error):
return isinstance(error, requests.HTTPError) and not (400 <= error.response.status_code <= 499)
return is_retryable_status_code(error) or isinstance(error,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest
import requests
import requests_mock
import pipeline_tools.get_staging_urn as gsu
import pipeline_tools.get_upload_urn as getter
from .http_requests_manager import HttpRequestsManager
from pipeline_tools.http_requests import HttpRequests
from tenacity import RetryError
Expand All @@ -19,36 +19,36 @@ def setUp(self):
}
}

def test_get_staging_urn_empty_js(self):
def test_get_upload_urn_empty_js(self):
js = {}
self.assertIsNone(gsu.get_staging_urn(js))
self.assertIsNone(getter.get_upload_urn(js))

def test_get_staging_urn_null_details(self):
def test_get_upload_urn_null_details(self):
js = {
'stagingDetails': None
}
self.assertIsNone(gsu.get_staging_urn(js))
self.assertIsNone(getter.get_upload_urn(js))

def test_get_staging_urn_null_location(self):
def test_get_upload_urn_null_location(self):
js = {
'stagingDetails': {
'stagingAreaLocation': None
}
}
self.assertIsNone(gsu.get_staging_urn(js))
self.assertIsNone(getter.get_upload_urn(js))

def test_get_staging_urn_null_value(self):
def test_get_upload_urn_null_value(self):
js = {
'stagingDetails': {
'stagingAreaLocation': {
'value': None
}
}
}
self.assertIsNone(gsu.get_staging_urn(js))
self.assertIsNone(getter.get_upload_urn(js))

def test_get_staging_urn_valid_value(self):
self.assertEqual(gsu.get_staging_urn(self.envelope_json), 'test_urn')
def test_get_upload_urn_valid_value(self):
self.assertEqual(getter.get_upload_urn(self.envelope_json), 'test_urn')

@requests_mock.mock()
def test_run(self, mock_request):
Expand All @@ -57,7 +57,7 @@ def _request_callback(request, context):
return self.envelope_json
mock_request.get(self.envelope_url, json=_request_callback)
with HttpRequestsManager():
response = gsu.run(self.envelope_url, HttpRequests())
response = getter.run(self.envelope_url, HttpRequests())
self.assertEqual(mock_request.call_count, 1)

@requests_mock.mock()
Expand All @@ -67,7 +67,7 @@ def _request_callback(request, context):
return {}
mock_request.get(self.envelope_url, json=_request_callback)
with self.assertRaises(RetryError), HttpRequestsManager():
gsu.run(self.envelope_url, HttpRequests())
getter.run(self.envelope_url, HttpRequests())
self.assertEqual(mock_request.call_count, 3)

@requests_mock.mock()
Expand All @@ -77,7 +77,7 @@ def _request_callback(request, context):
return {}
mock_request.get(self.envelope_url, json=_request_callback)
with self.assertRaises(requests.HTTPError), HttpRequestsManager():
gsu.run(self.envelope_url, HttpRequests())
getter.run(self.envelope_url, HttpRequests())
self.assertEqual(mock_request.call_count, 3)

@requests_mock.mock()
Expand All @@ -87,7 +87,7 @@ def _request_callback(request, context):
raise requests.ReadTimeout
mock_request.get(self.envelope_url, json=_request_callback)
with self.assertRaises(requests.ReadTimeout), HttpRequestsManager():
gsu.run(self.envelope_url, HttpRequests())
getter.run(self.envelope_url, HttpRequests())
self.assertEqual(mock_request.call_count, 3)


Expand Down
16 changes: 8 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@
packages=['pipeline_tools'],
install_requires=[
'cromwell-tools',
'google-cloud-storage>=1.8.0',
'hca>=4.0.0',
'mock>=2.0.0',
'requests>=2.18.4',
'requests-mock>=1.4.0',
'setuptools_scm==2.0.0',
'tenacity>=4.10.0',
'google-cloud-storage>=1.8.0,<2',
'hca>=4.0.0,<5',
'mock>=2.0.0,<3',
'requests>=2.18.4,<3',
'requests-mock>=1.4.0,<2',
'setuptools_scm>=2.0.0,<3',
'tenacity>=4.10.0,<5',
],
entry_points={
"console_scripts": [
'get-analysis-metadata=pipeline_tools.get_analysis_metadata:main',
'create-analysis-json=pipeline_tools.create_analysis_json:main',
'create-envelope=pipeline_tools.create_envelope:main',
'get-staging-urn=pipeline_tools.get_staging_urn:main',
'get-upload-urn=pipeline_tools.get_upload_urn:main',
'confirm-submission=pipeline_tools.confirm_submission:main'
]
},
Expand Down

0 comments on commit fb38b1b

Please sign in to comment.