Skip to content

Commit

Permalink
Merge pull request #217 from stanfordnmbl/make-request-with-retry
Browse files Browse the repository at this point in the history
Add retry mechanism to many database requests
  • Loading branch information
carmichaelong authored Dec 5, 2024
2 parents 6b7bdeb + 57e14f4 commit 43cd355
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 65 deletions.
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,4 @@ Examples/reprocessDataServer.py
*.ini
*.stats

tests/

newsletter.py
22 changes: 14 additions & 8 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from utils import (getDataDirectory, checkTime, checkResourceUsage,
sendStatusEmail, checkForTrialsWithStatus,
getCommitHash, getHostname, postLocalClientInfo,
postProcessedDuration)
postProcessedDuration, makeRequestWithRetry)

logging.basicConfig(level=logging.INFO)

Expand Down Expand Up @@ -120,8 +120,10 @@
error_msg['error_msg'] = 'No videos uploaded. Ensure phones are connected and you have stable internet connection.'
error_msg['error_msg_dev'] = 'No videos uploaded.'

r = requests.patch(trial_url, data={"status": "error", "meta": json.dumps(error_msg)},
headers = {"Authorization": "Token {}".format(API_TOKEN)})
r = makeRequestWithRetry('PATCH',
trial_url,
data={"status": "error", "meta": json.dumps(error_msg)},
headers = {"Authorization": "Token {}".format(API_TOKEN)})
continue

# The following is now done in main, to allow reprocessing trials with missing videos
Expand Down Expand Up @@ -149,14 +151,18 @@

# note a result needs to be posted for the API to know we finished, but we are posting them
# automatically thru procesTrial now
r = requests.patch(trial_url, data={"status": "done"},
headers = {"Authorization": "Token {}".format(API_TOKEN)})
r = makeRequestWithRetry('PATCH',
trial_url,
data={"status": "done"},
headers = {"Authorization": "Token {}".format(API_TOKEN)})

logging.info('0.5s pause if need to restart.')
time.sleep(0.5)

except Exception as e:
r = requests.patch(trial_url, data={"status": "error"},
headers = {"Authorization": "Token {}".format(API_TOKEN)})
r = makeRequestWithRetry('PATCH',
trial_url, data={"status": "error"},
headers = {"Authorization": "Token {}".format(API_TOKEN)})
traceback.print_exc()

# Antoine: Removing this, it is too often causing the machines to stop. Not because
Expand All @@ -182,4 +188,4 @@
folders = glob.glob(os.path.join(getDataDirectory(isDocker=True),'Data','*'))
for f in folders:
shutil.rmtree(f)
logging.info('deleting ' + f)
logging.info('deleting ' + f)
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ pingouin==0.5.2
openpyxl
ffmpeg-python
psutil
boto3
boto3
pytest
78 changes: 78 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import logging
import pytest
import requests
from unittest.mock import patch, Mock, ANY
from http.client import HTTPMessage

from utils import makeRequestWithRetry

class TestMakeRequestWithRetry:
logging.getLogger('urllib3').setLevel(logging.DEBUG)

@patch("requests.Session.request")
def test_get(self, mock_response):
status_code = 200
mock_response.return_value.status_code = status_code

response = makeRequestWithRetry('GET', 'https://test.com', retries=2)
assert response.status_code == status_code
mock_response.assert_called_once_with('GET', 'https://test.com',
headers=None,
data=None,
params=None,
files=None)

@patch("requests.Session.request")
def test_put(self, mock_response):
status_code = 201
mock_response.return_value.status_code = status_code

data = {
"key1": "value1",
"key2": "value2"
}

params = {
"param1": "value1"
}

response = makeRequestWithRetry('POST',
'https://test.com',
data=data,
headers={"Authorization": "my_token"},
params=params,
retries=2)

assert response.status_code == status_code
mock_response.assert_called_once_with('POST',
'https://test.com',
data=data,
headers={"Authorization": "my_token"},
params=params,
files=None)

@patch("urllib3.connectionpool.HTTPConnectionPool._get_conn")
def test_success_after_retries(self, mock_response):
mock_response.return_value.getresponse.side_effect = [
Mock(status=500, msg=HTTPMessage()),
Mock(status=502, msg=HTTPMessage()),
Mock(status=200, msg=HTTPMessage()),
Mock(status=429, msg=HTTPMessage()),
]

response = makeRequestWithRetry('GET',
'https://test.com',
retries=5,
backoff_factor=0.1)

assert response.status_code == 200
assert mock_response.call_count == 3

# comment out test since httpbin can be unstable and we don't want to rely
# on it for tests. uncomment and see debug log to see retry attempts
'''def test_httpbin(self):
response = makeRequestWithRetry('GET',
'https://httpbin.org/status/500',
retries=4,
backoff_factor=0.1)
'''
137 changes: 108 additions & 29 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import numpy as np
import pandas as pd
from scipy import signal
from urllib3.util.retry import Retry

from utilsAuth import getToken
from utilsAPI import getAPIURL
Expand Down Expand Up @@ -102,13 +103,17 @@ def download_file(url, file_name):
shutil.copyfileobj(response, out_file)

def getTrialJson(trial_id):
trialJson = requests.get(API_URL + "trials/{}/".format(trial_id),
headers = {"Authorization": "Token {}".format(API_TOKEN)}).json()
response = makeRequestWithRetry('GET',
API_URL + "trials/{}/".format(trial_id),
headers = {"Authorization": "Token {}".format(API_TOKEN)})
trialJson = response.json()
return trialJson

def getSessionJson(session_id):
sessionJson = requests.get(API_URL + "sessions/{}/".format(session_id),
headers = {"Authorization": "Token {}".format(API_TOKEN)}).json()
response = makeRequestWithRetry('GET',
API_URL + "sessions/{}/".format(session_id),
headers = {"Authorization": "Token {}".format(API_TOKEN)})
sessionJson = response.json()

# sort trials by time recorded
def getCreatedAt(trial):
Expand All @@ -118,8 +123,10 @@ def getCreatedAt(trial):
return sessionJson

def getSubjectJson(subject_id):
subjectJson = requests.get(API_URL + "subjects/{}/".format(subject_id),
headers = {"Authorization": "Token {}".format(API_TOKEN)}).json()
response = makeRequestWithRetry('GET',
API_URL + "subjects/{}/".format(subject_id),
headers = {"Authorization": "Token {}".format(API_TOKEN)})
subjectJson = response.json()
return subjectJson

def getTrialName(trial_id):
Expand Down Expand Up @@ -182,8 +189,10 @@ def postCalibrationOptions(session_path,session_id,overwrite=False):
"meta":json.dumps({'calibration':calibOptionsJson})
}
trial_url = "{}{}{}/".format(API_URL, "trials/", calibration_id)
r= requests.patch(trial_url, data=data,
headers = {"Authorization": "Token {}".format(API_TOKEN)})
r = makeRequestWithRetry('PATCH',
trial_url,
data=data,
headers = {"Authorization": "Token {}".format(API_TOKEN)})

if r.status_code == 200:
print('Wrote calibration selections to metadata.')
Expand Down Expand Up @@ -456,8 +465,9 @@ def deleteResult(trial_id, tag=None,resultNum=None):
resultNums = [r['id'] for r in trial['results']]

for rNum in resultNums:
requests.delete(API_URL + "results/{}/".format(rNum),
headers = {"Authorization": "Token {}".format(API_TOKEN)})
makeRequestWithRetry('DELETE',
API_URL + "results/{}/".format(rNum),
headers = {"Authorization": "Token {}".format(API_TOKEN)})

def deleteAllResults(session_id):

Expand Down Expand Up @@ -685,8 +695,10 @@ def changeSessionMetadata(session_ids,newMetaDict):

data = {"meta":json.dumps(existingMeta)}

r= requests.patch(session_url, data=data,
headers = {"Authorization": "Token {}".format(API_TOKEN)})
r = makeRequestWithRetry('PATCH',
session_url,
data=data,
headers = {"Authorization": "Token {}".format(API_TOKEN)})

if r.status_code !=200:
print('Changing metadata failed.')
Expand Down Expand Up @@ -733,9 +745,11 @@ def makeSessionPublic(session_id,publicStatus=True):
data = {
"public":publicStatus
}

r= requests.patch(session_url, data=data,
headers = {"Authorization": "Token {}".format(API_TOKEN)})

r = makeRequestWithRetry('PATCH',
session_url,
data=data,
headers = {"Authorization": "Token {}".format(API_TOKEN)})

if r.status_code == 200:
print('Successfully made ' + session_id + ' public.')
Expand Down Expand Up @@ -859,11 +873,17 @@ def postFileToTrial(filePath,trial_id,tag,device_id):

# get S3 link
data = {'fileName':os.path.split(filePath)[1]}
r = requests.get(API_URL + "sessions/null/get_presigned_url/",data=data).json()
response = makeRequestWithRetry('GET',
API_URL + "sessions/null/get_presigned_url/",
data=data)
r = response.json()

# upload to S3
files = {'file': open(filePath, 'rb')}
requests.post(r['url'], data=r['fields'],files=files)
makeRequestWithRetry('POST',
r['url'],
data=r['fields'],
files=files)
files["file"].close()

# post link to and data to results
Expand All @@ -874,8 +894,10 @@ def postFileToTrial(filePath,trial_id,tag,device_id):
"media_url" : r['fields']['key']
}

rResult = requests.post(API_URL + "results/", data=data,
headers = {"Authorization": "Token {}".format(API_TOKEN)})
rResult = makeRequestWithRetry('POST',
API_URL + "results/",
data=data,
headers = {"Authorization": "Token {}".format(API_TOKEN)})

if rResult.status_code != 201:
print('server response was + ' + str(r.status_code))
Expand Down Expand Up @@ -1482,8 +1504,11 @@ def checkForTrialsWithStatus(status,hours=9999999,relativeTime='newer'):
'justNumber':1,
'relativeTime':relativeTime}

r = requests.get(API_URL+"trials/get_trials_with_status/",params=params,
headers = {"Authorization": "Token {}".format(API_TOKEN)}).json()
response = makeRequestWithRetry('GET',
API_URL+"trials/get_trials_with_status/",
params=params,
headers = {"Authorization": "Token {}".format(API_TOKEN)})
r = response.json()

return r['nTrials']

Expand Down Expand Up @@ -1563,8 +1588,10 @@ def checkCudaTF():
# %% Some functions for loading subject data

def getSubjectNumber(subjectName):
subjects = requests.get(API_URL + "subjects/",
headers = {"Authorization": "Token {}".format(API_TOKEN)}).json()
response = makeRequestWithRetry('GET',
API_URL + "subjects/",
headers = {"Authorization": "Token {}".format(API_TOKEN)})
subjects = response.json()
sNum = [s['id'] for s in subjects if s['name'] == subjectName]
if len(sNum)>1:
print(len(sNum) + ' subjects with the name ' + subjectName + '. Will use the first one.')
Expand All @@ -1574,8 +1601,10 @@ def getSubjectNumber(subjectName):
return sNum[0]

def getUserSessions():
sessionJson = requests.get(API_URL + "sessions/valid/",
headers = {"Authorization": "Token {}".format(API_TOKEN)}).json()
response = makeRequestWithRetry('GET',
API_URL + "sessions/valid/",
headers = {"Authorization": "Token {}".format(API_TOKEN)})
sessionJson = response.json()
return sessionJson

def getSubjectSessions(subjectName):
Expand Down Expand Up @@ -1647,8 +1676,10 @@ def postLocalClientInfo(trial_url):
"git_commit": getCommitHash(),
"hostname": getHostname()
}
r = requests.patch(trial_url, data=data,
headers = {"Authorization": "Token {}".format(API_TOKEN)})
r = makeRequestWithRetry('PATCH',
trial_url,
data=data,
headers = {"Authorization": "Token {}".format(API_TOKEN)})

return r

Expand All @@ -1659,7 +1690,55 @@ def postProcessedDuration(trial_url, duration):
data = {
"processed_duration": duration
}
r = requests.patch(trial_url, data=data,
headers = {"Authorization": "Token {}".format(API_TOKEN)})
r = makeRequestWithRetry('PATCH',
trial_url,
data=data,
headers = {"Authorization": "Token {}".format(API_TOKEN)})

return r

# utils for common HTTP requests
def makeRequestWithRetry(method, url,
headers=None, data=None, params=None, files=None,
retries=5, backoff_factor=1):
"""
Makes an HTTP request with retry logic and returns the Response object.
Args:
method (str): HTTP method (e.g., 'GET', 'POST', 'PUT', etc.) as used in
requests.Session().request()
url (str): The endpoint URL.
headers (dict): Headers to include in the request.
data (dict): Data to send in the request body.
params (dict): URL query parameters.
retries (int): Number of retry attempts.
backoff_factor (float): Backoff factor for exponential delays.
Returns:
requests.Response: The response object for further processing.
"""
try:
retry_strategy = Retry(
total=retries,
backoff_factor=backoff_factor,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods={'DELETE', 'GET', 'POST', 'PUT', 'PATCH'}
)

adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy)
with requests.Session() as session:
session.mount("https://", adapter)
response = session.request(method,
url,
headers=headers,
data=data,
params=params,
files=files)
response.raise_for_status()
return response

except requests.exceptions.HTTPError as e:
raise Exception(f"HTTP error occurred: {e}")

except Exception as e:
raise Exception(f"An error occurred: {e}")
Loading

0 comments on commit 43cd355

Please sign in to comment.