From 12d95b92c2f8afe64d0848d12331ecdb461dcffa Mon Sep 17 00:00:00 2001 From: johncanthony Date: Fri, 31 Jan 2025 22:37:56 +0000 Subject: [PATCH] Added automatic job retries; ManagedJobModel updated to add retries field as object attribute; jobHandler updated to attempt retries on error condition being raised; imageGrabber updated to ignore previously fetched images on retry; version bump --- ImageGrabber/imageGrabber/grabber.py | 7 +++ JobHandler/jobHandler.py | 33 ++++++++++++- ManagerAPI/api/models/managedJob.py | 1 + setup.py | 2 +- tests/test_jobhandler.py | 71 ++++++++++++++++++++++++++-- 5 files changed, 109 insertions(+), 5 deletions(-) diff --git a/ImageGrabber/imageGrabber/grabber.py b/ImageGrabber/imageGrabber/grabber.py index a9c7cd7..94464db 100644 --- a/ImageGrabber/imageGrabber/grabber.py +++ b/ImageGrabber/imageGrabber/grabber.py @@ -29,6 +29,13 @@ def download_image(image_name): url = request_base_url + image_name filename = IMAGE_DESTINATION + image_name + #For retries, we should not pull every image again. + # Check if the image exists on disk and exit if it does + # This is potentially a time expensive procedure, an alternative + # would be to have a second field in the job object to + if os.path.exists(filename): + return + try: response = requests.get(url, timeout=request_timeout) response.raise_for_status() diff --git a/JobHandler/jobHandler.py b/JobHandler/jobHandler.py index 77d3aea..58f434a 100644 --- a/JobHandler/jobHandler.py +++ b/JobHandler/jobHandler.py @@ -7,12 +7,13 @@ import os BASE_JOB_HANDLER_URL = os.getenv('MANAGERAPI_BASE_URL', 'http://localhost:8000/') - +MAX_JOB_RETRY_COUNT = os.getenv('MAX_JOB_RETRY_COUNT',5) class JobHandler: stateManager = StateManager() baseConnectionURL = BASE_JOB_HANDLER_URL + maxJobRetryCount = MAX_JOB_RETRY_COUNT def __init__(self, job_queue: str): self.job_queue = job_queue @@ -50,6 +51,21 @@ def fetch_job(self): return ManagedJobModel(**job_query.json()[self.job_queue]) + def fetch_job_by_id(self, jobid): + + try: + job_query = requests.get(self.baseConnectionURL + "job/id/" + jobid) + except requests.exceptions.ConnectionError: + log.error(f'[Job Handler] Failed to fetch job {self.baseConnectionURL + "job/id/" + jobid}') + return None + except requests.exceptions.ReadTimeout: + log.error(f'[Job Handler] Timeout fetching job {self.baseConnectionURL + "job/id/" + jobid}') + return None + except requests.exceptions.HTTPError: + log.error(f'[Job Handler] Request failed with HTTP ERROR {job_query.status_code} for {self.baseConnectionURL + "job/id/" + jobid}') + + return ManagedJobModel(**job_query.json()) + def update_job(self, job): log.debug(f'Attempting to update job {job.job_id} to {self.baseConnectionURL + "job"}') @@ -72,6 +88,10 @@ def update_job(self, job): def error_job(self, job, error_message: str): + if int(job.retries) < self.maxJobRetryCount: + self.retry_job(job, error_message) + return True + job.job_error = error_message job.job_end_time = int(time()) job.job_status = "error" @@ -81,3 +101,14 @@ def error_job(self, job, error_message: str): log.debug(f'Error status set for job {job.job_id}') return True + + def retry_job(self,job, error_message: str ): + + job.retries = str(int(job.retries) + 1) + job.job_error = f'Retry Error: {error_message}' + + self.update_job(job) + + log.debug(f'Job: {job.job_id} set for retry : {job.retries} ') + + return True diff --git a/ManagerAPI/api/models/managedJob.py b/ManagerAPI/api/models/managedJob.py index 0299ea4..5db56f6 100644 --- a/ManagerAPI/api/models/managedJob.py +++ b/ManagerAPI/api/models/managedJob.py @@ -22,6 +22,7 @@ class ManagedJobModel(BaseModel): publish_public: Union[str, None] = '' region: str storm_id : Union[str,None] = '' + retries : Union[str,None] = '0' class Config: orm_mode = True diff --git a/setup.py b/setup.py index 27f4461..8e7b278 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='RootsWeatherProject', - version='1.0.36', + version='1.0.37', long_description=open('README.md').read(), long_description_content_type='text/markdown', packages=find_packages(), diff --git a/tests/test_jobhandler.py b/tests/test_jobhandler.py index 56fc2c0..1cf014d 100644 --- a/tests/test_jobhandler.py +++ b/tests/test_jobhandler.py @@ -43,12 +43,14 @@ def test_fetch_job_fields(job_handler): "img_resolution": "1250x750", "yt_video_id": "", "publish_public": "False", - "region": "CONUS" + "region": "CONUS", + "retries" : "0" } } # Call the method under test job_id = job_handler.fetch_job() + print(job_id) assert type(job_id) is ManagedJobModel # Assert the expected result @@ -73,14 +75,77 @@ def test_update_job(job_handler): "img_resolution": "1250x750", "yt_video_id": "", "publish_public": "False", - "region": "CONUS" + "region": "CONUS", + "retries" : "0" } } mock_job = ManagedJobModel(job_id="efb7f53b-e5a1-4c30-9f45-4b6c116a36ca", job_status="new", job_start_time=1705313815, job_end_time=-1, job_error="", img_date="2024-01-12", image_links="", gif_urn="", video_urn="", - img_resolution="1250x750", yt_video_id="", region="CONUS") + img_resolution="1250x750", yt_video_id="", region="CONUS",retries="0") job_push = job_handler.update_job(mock_job) assert job_push is True mock_post.assert_called_once_with('http://localhost:8000/job', json=mock_job.model_dump()) + +# Test that the retry_job is called when ManagedJob object retry count is below the max retry count (5) +def test_retry_on_error(job_handler): + + with mock.patch.object(JobHandler,"retry_job") as mocked_retry_method: + mocked_retry_method.return_value = True + + mock_job_obj = { + "job_id": "efb7f53b-e5a1-4c30-9f45-4b6c116a36ca", + "job_status": "new", + "job_start_time": 1705313815, + "job_end_time": -1, + "job_error": "", + "img_date": "2024-01-12", + "image_links": "", + "gif_urn": "", + "video_urn": "", + "img_resolution": "1250x750", + "yt_video_id": "", + "publish_public": "False", + "region": "CONUS", + "retries" : "0" + } + + mock_managed_job = ManagedJobModel(**mock_job_obj) + + + #Force Error which will cause the retry function to update the retry count + error_call = job_handler.error_job(mock_managed_job,"Testing Error") + + mocked_retry_method.assert_called_once() + +# Test that the retry_job is not called when ManagedJob object retry count is above or equal to the max retry count (5) +def test_retry_exhausted_on_error(job_handler): + + with mock.patch.object(JobHandler,"retry_job") as mocked_retry_method: + mocked_retry_method.return_value = True + + mock_job_obj = { + "job_id": "efb7f53b-e5a1-4c30-9f45-4b6c116a36ca", + "job_status": "new", + "job_start_time": 1705313815, + "job_end_time": -1, + "job_error": "", + "img_date": "2024-01-12", + "image_links": "", + "gif_urn": "", + "video_urn": "", + "img_resolution": "1250x750", + "yt_video_id": "", + "publish_public": "False", + "region": "CONUS", + "retries" : "5" + } + + mock_managed_job = ManagedJobModel(**mock_job_obj) + + + #Force Error which will cause the retry function to update the retry count + error_call = job_handler.error_job(mock_managed_job,"Testing Error") + + mocked_retry_method.assert_not_called() \ No newline at end of file