From 1afc4111143e1f7d41a9b635faeff224c1654341 Mon Sep 17 00:00:00 2001 From: Tessa Walsh Date: Thu, 9 Nov 2023 21:09:37 -0500 Subject: [PATCH] Implement retry API endpoint for failed background jobs (#1356) Fixes #1328 - Adds /retry endpoint for retrying failed jobs. - Returns 400 error if previous job still running or has succeeded - Keeps track of previous failed attempts in previousAttempts array on failed job. - Also amends the similar webhook /retry endpoint to use `POST` for consistency. - Remove duplicate api tag for backgroundjobs --- backend/btrixcloud/background_jobs.py | 256 ++++++++++++++++++++------ backend/btrixcloud/crawlmanager.py | 12 +- backend/btrixcloud/models.py | 2 + backend/btrixcloud/webhooks.py | 2 +- backend/test/test_webhooks.py | 2 +- 5 files changed, 210 insertions(+), 64 deletions(-) diff --git a/backend/btrixcloud/background_jobs.py b/backend/btrixcloud/background_jobs.py index 7490229a98..963e5da1ec 100644 --- a/backend/btrixcloud/background_jobs.py +++ b/backend/btrixcloud/background_jobs.py @@ -20,6 +20,7 @@ DeleteReplicaJob, PaginatedResponse, AnyJob, + StorageRef, ) from .pagination import DEFAULT_PAGE_SIZE, paginated_format @@ -90,9 +91,8 @@ async def handle_replica_job_finished(self, job: CreateReplicaJob) -> None: async def create_replica_jobs( self, oid: UUID, file: BaseFile, object_id: str, object_type: str - ) -> Dict: - """Create k8s background job to replicate a file to another storage location.""" - + ) -> Dict[str, Union[bool, List[str]]]: + """Create k8s background job to replicate a file to all replica storage locations.""" org = await self.org_ops.get_org_by_id(oid) primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage) @@ -105,23 +105,42 @@ async def create_replica_jobs( ids = [] for replica_ref in self.storage_ops.get_org_replicas_storage_refs(org): - replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref) - replica_endpoint, bucket_suffix = self.strip_bucket( - replica_storage.endpoint_url + job_id = await self.create_replica_job( + org, + file, + object_id, + object_type, + replica_ref, + primary_file_path, + primary_endpoint, ) - replica_file_path = bucket_suffix + file.filename + ids.append(job_id) + + return {"added": True, "ids": ids} - # print(f"primary: {file.storage.get_storage_secret_name(str(oid))}") - # print(f" endpoint: {primary_endpoint}") - # print(f" path: {primary_file_path}") - # print(f"replica: {replica_ref.get_storage_secret_name(str(oid))}") - # print(f" endpoint: {replica_endpoint}") - # print(f" path: {replica_file_path}") + async def create_replica_job( + self, + org: Organization, + file: BaseFile, + object_id: str, + object_type: str, + replica_ref: StorageRef, + primary_file_path: str, + primary_endpoint: str, + existing_job_id: Optional[str] = None, + ) -> str: + """Create k8s background job to replicate a file to a specific replica storage location.""" + replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref) + replica_endpoint, bucket_suffix = self.strip_bucket( + replica_storage.endpoint_url + ) + replica_file_path = bucket_suffix + file.filename - job_type = BgJobType.CREATE_REPLICA.value + job_type = BgJobType.CREATE_REPLICA.value + try: job_id = await self.crawl_manager.run_replica_job( - oid=str(oid), + oid=str(org.id), job_type=job_type, primary_storage=file.storage, primary_file_path=primary_file_path, @@ -130,74 +149,122 @@ async def create_replica_jobs( replica_file_path=replica_file_path, replica_endpoint=replica_endpoint, job_id_prefix=f"{job_type}-{object_id}", + existing_job_id=existing_job_id, ) - replication_job = CreateReplicaJob( - id=job_id, - oid=oid, - started=datetime.now(), - file_path=file.filename, - object_type=object_type, - object_id=object_id, - primary=file.storage, - replica_storage=replica_ref, - ) - # print( - # f"File path written into replication_job: {file.filename}", flush=True - # ) + if existing_job_id: + replication_job = await self.get_background_job(existing_job_id, org.id) + previous_attempt = { + "started": replication_job.started, + "finished": replication_job.finished, + } + if replication_job.previousAttempts: + replication_job.previousAttempts.append(previous_attempt) + else: + replication_job.previousAttempts = [previous_attempt] + replication_job.started = datetime.now() + replication_job.finished = None + replication_job.success = None + else: + replication_job = CreateReplicaJob( + id=job_id, + oid=org.id, + started=datetime.now(), + file_path=file.filename, + object_type=object_type, + object_id=object_id, + primary=file.storage, + replica_storage=replica_ref, + ) + await self.jobs.find_one_and_update( {"_id": job_id}, {"$set": replication_job.to_dict()}, upsert=True ) - ids.append(job_id) - return {"added": True, "ids": ids} + return job_id + except Exception as exc: + # pylint: disable=raise-missing-from + raise HTTPException(status_code=500, detail=f"Error starting crawl: {exc}") async def create_delete_replica_jobs( self, org: Organization, file: BaseFile, object_id: str, object_type: str ) -> Dict[str, Union[bool, List[str]]]: """Create a job to delete each replica for the given file""" - ids = [] - oid = str(org.id) for replica_ref in file.replicas or []: - replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref) - replica_endpoint, bucket_suffix = self.strip_bucket( - replica_storage.endpoint_url + job_id = await self.create_delete_replica_job( + org, file, object_id, object_type, replica_ref ) - replica_file_path = bucket_suffix + file.filename + ids.append(job_id) + + return {"added": True, "ids": ids} - # print(f"replica: {replica_ref.get_storage_secret_name(oid)}") - # print(f" endpoint: {replica_endpoint}") - # print(f" path: {replica_file_path}") + async def create_delete_replica_job( + self, + org: Organization, + file: BaseFile, + object_id: str, + object_type: str, + replica_ref: StorageRef, + existing_job_id: Optional[str] = None, + ) -> str: + """Create a job to delete one replica of a given file""" + replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref) + replica_endpoint, bucket_suffix = self.strip_bucket( + replica_storage.endpoint_url + ) + replica_file_path = bucket_suffix + file.filename - job_type = BgJobType.DELETE_REPLICA.value + job_type = BgJobType.DELETE_REPLICA.value + try: job_id = await self.crawl_manager.run_replica_job( - oid=oid, + oid=str(org.id), job_type=job_type, replica_storage=replica_ref, replica_file_path=replica_file_path, replica_endpoint=replica_endpoint, job_id_prefix=f"{job_type}-{object_id}", + existing_job_id=existing_job_id, ) - delete_replica_job = DeleteReplicaJob( - id=job_id, - oid=oid, - started=datetime.now(), - file_path=file.filename, - object_id=object_id, - object_type=object_type, - replica_storage=replica_ref, - ) + if existing_job_id: + delete_replica_job = await self.get_background_job( + existing_job_id, org.id + ) + previous_attempt = { + "started": delete_replica_job.started, + "finished": delete_replica_job.finished, + } + if delete_replica_job.previousAttempts: + delete_replica_job.previousAttempts.append(previous_attempt) + else: + delete_replica_job.previousAttempts = [previous_attempt] + delete_replica_job.started = datetime.now() + delete_replica_job.finished = None + delete_replica_job.success = None + else: + delete_replica_job = DeleteReplicaJob( + id=job_id, + oid=org.id, + started=datetime.now(), + file_path=file.filename, + object_id=object_id, + object_type=object_type, + replica_storage=replica_ref, + ) await self.jobs.find_one_and_update( {"_id": job_id}, {"$set": delete_replica_job.to_dict()}, upsert=True ) - ids.append(job_id) + return job_id - return {"added": True, "ids": ids} + except Exception as exc: + # pylint: disable=raise-missing-from + raise HTTPException( + status_code=400, detail=f"Error starting background job: {exc}" + ) async def job_finished( self, @@ -241,7 +308,9 @@ async def job_finished( {"$set": {"success": success, "finished": finished}}, ) - async def get_background_job(self, job_id: str, oid: UUID) -> BackgroundJob: + async def get_background_job( + self, job_id: str, oid: UUID + ) -> Union[CreateReplicaJob, DeleteReplicaJob]: """Get background job""" query: dict[str, object] = {"_id": job_id, "oid": oid} res = await self.jobs.find_one(query) @@ -255,10 +324,9 @@ def _get_job_by_type_from_data(self, data: dict[str, object]): if data["type"] == BgJobType.CREATE_REPLICA: return CreateReplicaJob.from_dict(data) - if data["type"] == BgJobType.DELETE_REPLICA: - return DeleteReplicaJob.from_dict(data) + return DeleteReplicaJob.from_dict(data) - return BackgroundJob.from_dict(data) + # return BackgroundJob.from_dict(data) async def list_background_jobs( self, @@ -324,6 +392,69 @@ async def list_background_jobs( return jobs, total + async def get_replica_job_file( + self, job: Union[CreateReplicaJob, DeleteReplicaJob], org: Organization + ) -> BaseFile: + """Return file from replica job""" + try: + if job.object_type == "profile": + profile = await self.profile_ops.get_profile(UUID(job.object_id), org) + return BaseFile(**profile.resource.dict()) + + item_res = await self.base_crawl_ops.get_crawl_raw(job.object_id, org) + matching_file = [ + f for f in item_res.get("files", []) if f["filename"] == job.file_path + ][0] + return BaseFile(**matching_file) + # pylint: disable=broad-exception-caught, raise-missing-from + except Exception: + raise HTTPException(status_code=404, detail="file_not_found") + + async def retry_background_job( + self, job_id: str, org: Organization + ) -> Dict[str, Union[bool, Optional[str]]]: + """Retry background job and return new job id""" + job = await self.get_background_job(job_id, org.id) + if not job: + raise HTTPException(status_code=404, detail="job_not_found") + + if not job.finished: + raise HTTPException(status_code=400, detail="job_not_finished") + + if job.success: + raise HTTPException(status_code=400, detail="job_already_succeeded") + + file = await self.get_replica_job_file(job, org) + + if job.type == BgJobType.CREATE_REPLICA: + primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage) + primary_endpoint, bucket_suffix = self.strip_bucket( + primary_storage.endpoint_url + ) + primary_file_path = bucket_suffix + file.filename + await self.create_replica_job( + org, + file, + job.object_id, + job.object_type, + job.replica_storage, + primary_file_path, + primary_endpoint, + existing_job_id=job_id, + ) + + if job.type == BgJobType.DELETE_REPLICA: + await self.create_delete_replica_job( + org, + file, + job.object_id, + job.object_type, + job.replica_storage, + existing_job_id=job_id, + ) + + return {"success": True} + # ============================================================================ # pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme @@ -344,7 +475,6 @@ def init_background_jobs_api( @router.get( "/{job_id}", - tags=["backgroundjobs"], response_model=AnyJob, ) async def get_background_job( @@ -354,7 +484,17 @@ async def get_background_job( """Retrieve information for background job""" return await ops.get_background_job(job_id, org.id) - @router.get("", tags=["backgroundjobs"], response_model=PaginatedResponse) + @router.post( + "/{job_id}/retry", + ) + async def retry_background_job( + job_id: str, + org: Organization = Depends(org_crawl_dep), + ): + """Retry background job""" + return await ops.retry_background_job(job_id, org) + + @router.get("", response_model=PaginatedResponse) async def list_background_jobs( org: Organization = Depends(org_crawl_dep), pageSize: int = DEFAULT_PAGE_SIZE, diff --git a/backend/btrixcloud/crawlmanager.py b/backend/btrixcloud/crawlmanager.py index 7294b1603a..531ed323dd 100644 --- a/backend/btrixcloud/crawlmanager.py +++ b/backend/btrixcloud/crawlmanager.py @@ -74,14 +74,18 @@ async def run_replica_job( primary_file_path: Optional[str] = None, primary_endpoint: Optional[str] = None, job_id_prefix: Optional[str] = None, + existing_job_id: Optional[str] = None, ): """run job to replicate file from primary storage to replica storage""" - if not job_id_prefix: - job_id_prefix = job_type + if existing_job_id: + job_id = existing_job_id + else: + if not job_id_prefix: + job_id_prefix = job_type - # ensure name is <=63 characters - job_id = f"{job_id_prefix[:52]}-{secrets.token_hex(5)}" + # ensure name is <=63 characters + job_id = f"{job_id_prefix[:52]}-{secrets.token_hex(5)}" params = { "id": job_id, diff --git a/backend/btrixcloud/models.py b/backend/btrixcloud/models.py index b4b8a3cece..b3c3f6a894 100644 --- a/backend/btrixcloud/models.py +++ b/backend/btrixcloud/models.py @@ -1237,6 +1237,8 @@ class BackgroundJob(BaseMongoModel): started: datetime finished: Optional[datetime] = None + previousAttempts: Optional[List[Dict[str, Optional[datetime]]]] = None + # ============================================================================ class CreateReplicaJob(BackgroundJob): diff --git a/backend/btrixcloud/webhooks.py b/backend/btrixcloud/webhooks.py index c91ab9b333..6603544c2c 100644 --- a/backend/btrixcloud/webhooks.py +++ b/backend/btrixcloud/webhooks.py @@ -400,7 +400,7 @@ async def get_notification( ): return await ops.get_notification(org, notificationid) - @router.get("/{notificationid}/retry") + @router.post("/{notificationid}/retry") async def retry_notification( notificationid: UUID, org: Organization = Depends(org_owner_dep), diff --git a/backend/test/test_webhooks.py b/backend/test/test_webhooks.py index 6fc2fdd95b..1f26646416 100644 --- a/backend/test/test_webhooks.py +++ b/backend/test/test_webhooks.py @@ -99,7 +99,7 @@ def test_get_webhook_event(admin_auth_headers, default_org_id): def test_retry_webhook_event(admin_auth_headers, default_org_id): # Expect to fail because we haven't set up URLs that accept webhooks - r = requests.get( + r = requests.post( f"{API_PREFIX}/orgs/{default_org_id}/webhooks/{_webhook_event_id}/retry", headers=admin_auth_headers, )