Skip to content

Commit

Permalink
Added automatic job retries; ManagedJobModel updated to add retries f…
Browse files Browse the repository at this point in the history
…ield as object attribute; jobHandler updated to attempt retries on error condition being raised; imageGrabber updated to ignore previously fetched images on retry; version bump
  • Loading branch information
johncanthony committed Jan 31, 2025
1 parent f8a3a7e commit 12d95b9
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 5 deletions.
7 changes: 7 additions & 0 deletions ImageGrabber/imageGrabber/grabber.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ def download_image(image_name):
url = request_base_url + image_name
filename = IMAGE_DESTINATION + image_name

#For retries, we should not pull every image again.
# Check if the image exists on disk and exit if it does
# This is potentially a time expensive procedure, an alternative
# would be to have a second field in the job object to
if os.path.exists(filename):
return

try:
response = requests.get(url, timeout=request_timeout)
response.raise_for_status()
Expand Down
33 changes: 32 additions & 1 deletion JobHandler/jobHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
import os

BASE_JOB_HANDLER_URL = os.getenv('MANAGERAPI_BASE_URL', 'http://localhost:8000/')

MAX_JOB_RETRY_COUNT = os.getenv('MAX_JOB_RETRY_COUNT',5)

class JobHandler:

stateManager = StateManager()
baseConnectionURL = BASE_JOB_HANDLER_URL
maxJobRetryCount = MAX_JOB_RETRY_COUNT

def __init__(self, job_queue: str):
self.job_queue = job_queue
Expand Down Expand Up @@ -50,6 +51,21 @@ def fetch_job(self):

return ManagedJobModel(**job_query.json()[self.job_queue])

def fetch_job_by_id(self, jobid):

try:
job_query = requests.get(self.baseConnectionURL + "job/id/" + jobid)
except requests.exceptions.ConnectionError:
log.error(f'[Job Handler] Failed to fetch job {self.baseConnectionURL + "job/id/" + jobid}')
return None
except requests.exceptions.ReadTimeout:
log.error(f'[Job Handler] Timeout fetching job {self.baseConnectionURL + "job/id/" + jobid}')
return None
except requests.exceptions.HTTPError:
log.error(f'[Job Handler] Request failed with HTTP ERROR {job_query.status_code} for {self.baseConnectionURL + "job/id/" + jobid}')

return ManagedJobModel(**job_query.json())

def update_job(self, job):

log.debug(f'Attempting to update job {job.job_id} to {self.baseConnectionURL + "job"}')
Expand All @@ -72,6 +88,10 @@ def update_job(self, job):

def error_job(self, job, error_message: str):

if int(job.retries) < self.maxJobRetryCount:
self.retry_job(job, error_message)
return True

job.job_error = error_message
job.job_end_time = int(time())
job.job_status = "error"
Expand All @@ -81,3 +101,14 @@ def error_job(self, job, error_message: str):
log.debug(f'Error status set for job {job.job_id}')

return True

def retry_job(self,job, error_message: str ):

job.retries = str(int(job.retries) + 1)
job.job_error = f'Retry Error: {error_message}'

self.update_job(job)

log.debug(f'Job: {job.job_id} set for retry : {job.retries} ')

return True
1 change: 1 addition & 0 deletions ManagerAPI/api/models/managedJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class ManagedJobModel(BaseModel):
publish_public: Union[str, None] = ''
region: str
storm_id : Union[str,None] = ''
retries : Union[str,None] = '0'

class Config:
orm_mode = True
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='RootsWeatherProject',
version='1.0.36',
version='1.0.37',
long_description=open('README.md').read(),
long_description_content_type='text/markdown',
packages=find_packages(),
Expand Down
71 changes: 68 additions & 3 deletions tests/test_jobhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ def test_fetch_job_fields(job_handler):
"img_resolution": "1250x750",
"yt_video_id": "",
"publish_public": "False",
"region": "CONUS"
"region": "CONUS",
"retries" : "0"
}
}

# Call the method under test
job_id = job_handler.fetch_job()
print(job_id)
assert type(job_id) is ManagedJobModel

# Assert the expected result
Expand All @@ -73,14 +75,77 @@ def test_update_job(job_handler):
"img_resolution": "1250x750",
"yt_video_id": "",
"publish_public": "False",
"region": "CONUS"
"region": "CONUS",
"retries" : "0"
}
}

mock_job = ManagedJobModel(job_id="efb7f53b-e5a1-4c30-9f45-4b6c116a36ca", job_status="new", job_start_time=1705313815,
job_end_time=-1, job_error="", img_date="2024-01-12", image_links="", gif_urn="", video_urn="",
img_resolution="1250x750", yt_video_id="", region="CONUS")
img_resolution="1250x750", yt_video_id="", region="CONUS",retries="0")
job_push = job_handler.update_job(mock_job)

assert job_push is True
mock_post.assert_called_once_with('http://localhost:8000/job', json=mock_job.model_dump())

# Test that the retry_job is called when ManagedJob object retry count is below the max retry count (5)
def test_retry_on_error(job_handler):

with mock.patch.object(JobHandler,"retry_job") as mocked_retry_method:
mocked_retry_method.return_value = True

mock_job_obj = {
"job_id": "efb7f53b-e5a1-4c30-9f45-4b6c116a36ca",
"job_status": "new",
"job_start_time": 1705313815,
"job_end_time": -1,
"job_error": "",
"img_date": "2024-01-12",
"image_links": "",
"gif_urn": "",
"video_urn": "",
"img_resolution": "1250x750",
"yt_video_id": "",
"publish_public": "False",
"region": "CONUS",
"retries" : "0"
}

mock_managed_job = ManagedJobModel(**mock_job_obj)


#Force Error which will cause the retry function to update the retry count
error_call = job_handler.error_job(mock_managed_job,"Testing Error")

mocked_retry_method.assert_called_once()

# Test that the retry_job is not called when ManagedJob object retry count is above or equal to the max retry count (5)
def test_retry_exhausted_on_error(job_handler):

with mock.patch.object(JobHandler,"retry_job") as mocked_retry_method:
mocked_retry_method.return_value = True

mock_job_obj = {
"job_id": "efb7f53b-e5a1-4c30-9f45-4b6c116a36ca",
"job_status": "new",
"job_start_time": 1705313815,
"job_end_time": -1,
"job_error": "",
"img_date": "2024-01-12",
"image_links": "",
"gif_urn": "",
"video_urn": "",
"img_resolution": "1250x750",
"yt_video_id": "",
"publish_public": "False",
"region": "CONUS",
"retries" : "5"
}

mock_managed_job = ManagedJobModel(**mock_job_obj)


#Force Error which will cause the retry function to update the retry count
error_call = job_handler.error_job(mock_managed_job,"Testing Error")

mocked_retry_method.assert_not_called()

0 comments on commit 12d95b9

Please sign in to comment.