Skip to content

Commit

Permalink
Implement retry API endpoint for failed background jobs (#1356)
Browse files Browse the repository at this point in the history
Fixes #1328 

- Adds /retry endpoint for retrying failed jobs.
- Returns 400 error if previous job still running or has succeeded
- Keeps track of previous failed attempts in previousAttempts array on failed job.
- Also amends the similar webhook /retry endpoint to use `POST` for consistency.
- Remove duplicate api tag for backgroundjobs
tw4l authored Nov 10, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 82a5d1e commit 1afc411
Showing 5 changed files with 210 additions and 64 deletions.
256 changes: 198 additions & 58 deletions backend/btrixcloud/background_jobs.py
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
DeleteReplicaJob,
PaginatedResponse,
AnyJob,
StorageRef,
)
from .pagination import DEFAULT_PAGE_SIZE, paginated_format

@@ -90,9 +91,8 @@ async def handle_replica_job_finished(self, job: CreateReplicaJob) -> None:

async def create_replica_jobs(
self, oid: UUID, file: BaseFile, object_id: str, object_type: str
) -> Dict:
"""Create k8s background job to replicate a file to another storage location."""

) -> Dict[str, Union[bool, List[str]]]:
"""Create k8s background job to replicate a file to all replica storage locations."""
org = await self.org_ops.get_org_by_id(oid)

primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage)
@@ -105,23 +105,42 @@ async def create_replica_jobs(
ids = []

for replica_ref in self.storage_ops.get_org_replicas_storage_refs(org):
replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref)
replica_endpoint, bucket_suffix = self.strip_bucket(
replica_storage.endpoint_url
job_id = await self.create_replica_job(
org,
file,
object_id,
object_type,
replica_ref,
primary_file_path,
primary_endpoint,
)
replica_file_path = bucket_suffix + file.filename
ids.append(job_id)

return {"added": True, "ids": ids}

# print(f"primary: {file.storage.get_storage_secret_name(str(oid))}")
# print(f" endpoint: {primary_endpoint}")
# print(f" path: {primary_file_path}")
# print(f"replica: {replica_ref.get_storage_secret_name(str(oid))}")
# print(f" endpoint: {replica_endpoint}")
# print(f" path: {replica_file_path}")
async def create_replica_job(
self,
org: Organization,
file: BaseFile,
object_id: str,
object_type: str,
replica_ref: StorageRef,
primary_file_path: str,
primary_endpoint: str,
existing_job_id: Optional[str] = None,
) -> str:
"""Create k8s background job to replicate a file to a specific replica storage location."""
replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref)
replica_endpoint, bucket_suffix = self.strip_bucket(
replica_storage.endpoint_url
)
replica_file_path = bucket_suffix + file.filename

job_type = BgJobType.CREATE_REPLICA.value
job_type = BgJobType.CREATE_REPLICA.value

try:
job_id = await self.crawl_manager.run_replica_job(
oid=str(oid),
oid=str(org.id),
job_type=job_type,
primary_storage=file.storage,
primary_file_path=primary_file_path,
@@ -130,74 +149,122 @@ async def create_replica_jobs(
replica_file_path=replica_file_path,
replica_endpoint=replica_endpoint,
job_id_prefix=f"{job_type}-{object_id}",
existing_job_id=existing_job_id,
)
replication_job = CreateReplicaJob(
id=job_id,
oid=oid,
started=datetime.now(),
file_path=file.filename,
object_type=object_type,
object_id=object_id,
primary=file.storage,
replica_storage=replica_ref,
)
# print(
# f"File path written into replication_job: {file.filename}", flush=True
# )
if existing_job_id:
replication_job = await self.get_background_job(existing_job_id, org.id)
previous_attempt = {
"started": replication_job.started,
"finished": replication_job.finished,
}
if replication_job.previousAttempts:
replication_job.previousAttempts.append(previous_attempt)
else:
replication_job.previousAttempts = [previous_attempt]
replication_job.started = datetime.now()
replication_job.finished = None
replication_job.success = None
else:
replication_job = CreateReplicaJob(
id=job_id,
oid=org.id,
started=datetime.now(),
file_path=file.filename,
object_type=object_type,
object_id=object_id,
primary=file.storage,
replica_storage=replica_ref,
)

await self.jobs.find_one_and_update(
{"_id": job_id}, {"$set": replication_job.to_dict()}, upsert=True
)
ids.append(job_id)

return {"added": True, "ids": ids}
return job_id
except Exception as exc:
# pylint: disable=raise-missing-from
raise HTTPException(status_code=500, detail=f"Error starting crawl: {exc}")

async def create_delete_replica_jobs(
self, org: Organization, file: BaseFile, object_id: str, object_type: str
) -> Dict[str, Union[bool, List[str]]]:
"""Create a job to delete each replica for the given file"""

ids = []
oid = str(org.id)

for replica_ref in file.replicas or []:
replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref)
replica_endpoint, bucket_suffix = self.strip_bucket(
replica_storage.endpoint_url
job_id = await self.create_delete_replica_job(
org, file, object_id, object_type, replica_ref
)
replica_file_path = bucket_suffix + file.filename
ids.append(job_id)

return {"added": True, "ids": ids}

# print(f"replica: {replica_ref.get_storage_secret_name(oid)}")
# print(f" endpoint: {replica_endpoint}")
# print(f" path: {replica_file_path}")
async def create_delete_replica_job(
self,
org: Organization,
file: BaseFile,
object_id: str,
object_type: str,
replica_ref: StorageRef,
existing_job_id: Optional[str] = None,
) -> str:
"""Create a job to delete one replica of a given file"""
replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref)
replica_endpoint, bucket_suffix = self.strip_bucket(
replica_storage.endpoint_url
)
replica_file_path = bucket_suffix + file.filename

job_type = BgJobType.DELETE_REPLICA.value
job_type = BgJobType.DELETE_REPLICA.value

try:
job_id = await self.crawl_manager.run_replica_job(
oid=oid,
oid=str(org.id),
job_type=job_type,
replica_storage=replica_ref,
replica_file_path=replica_file_path,
replica_endpoint=replica_endpoint,
job_id_prefix=f"{job_type}-{object_id}",
existing_job_id=existing_job_id,
)

delete_replica_job = DeleteReplicaJob(
id=job_id,
oid=oid,
started=datetime.now(),
file_path=file.filename,
object_id=object_id,
object_type=object_type,
replica_storage=replica_ref,
)
if existing_job_id:
delete_replica_job = await self.get_background_job(
existing_job_id, org.id
)
previous_attempt = {
"started": delete_replica_job.started,
"finished": delete_replica_job.finished,
}
if delete_replica_job.previousAttempts:
delete_replica_job.previousAttempts.append(previous_attempt)
else:
delete_replica_job.previousAttempts = [previous_attempt]
delete_replica_job.started = datetime.now()
delete_replica_job.finished = None
delete_replica_job.success = None
else:
delete_replica_job = DeleteReplicaJob(
id=job_id,
oid=org.id,
started=datetime.now(),
file_path=file.filename,
object_id=object_id,
object_type=object_type,
replica_storage=replica_ref,
)

await self.jobs.find_one_and_update(
{"_id": job_id}, {"$set": delete_replica_job.to_dict()}, upsert=True
)

ids.append(job_id)
return job_id

return {"added": True, "ids": ids}
except Exception as exc:
# pylint: disable=raise-missing-from
raise HTTPException(
status_code=400, detail=f"Error starting background job: {exc}"
)

async def job_finished(
self,
@@ -241,7 +308,9 @@ async def job_finished(
{"$set": {"success": success, "finished": finished}},
)

async def get_background_job(self, job_id: str, oid: UUID) -> BackgroundJob:
async def get_background_job(
self, job_id: str, oid: UUID
) -> Union[CreateReplicaJob, DeleteReplicaJob]:
"""Get background job"""
query: dict[str, object] = {"_id": job_id, "oid": oid}
res = await self.jobs.find_one(query)
@@ -255,10 +324,9 @@ def _get_job_by_type_from_data(self, data: dict[str, object]):
if data["type"] == BgJobType.CREATE_REPLICA:
return CreateReplicaJob.from_dict(data)

if data["type"] == BgJobType.DELETE_REPLICA:
return DeleteReplicaJob.from_dict(data)
return DeleteReplicaJob.from_dict(data)

return BackgroundJob.from_dict(data)
# return BackgroundJob.from_dict(data)

async def list_background_jobs(
self,
@@ -324,6 +392,69 @@ async def list_background_jobs(

return jobs, total

async def get_replica_job_file(
self, job: Union[CreateReplicaJob, DeleteReplicaJob], org: Organization
) -> BaseFile:
"""Return file from replica job"""
try:
if job.object_type == "profile":
profile = await self.profile_ops.get_profile(UUID(job.object_id), org)
return BaseFile(**profile.resource.dict())

item_res = await self.base_crawl_ops.get_crawl_raw(job.object_id, org)
matching_file = [
f for f in item_res.get("files", []) if f["filename"] == job.file_path
][0]
return BaseFile(**matching_file)
# pylint: disable=broad-exception-caught, raise-missing-from
except Exception:
raise HTTPException(status_code=404, detail="file_not_found")

async def retry_background_job(
self, job_id: str, org: Organization
) -> Dict[str, Union[bool, Optional[str]]]:
"""Retry background job and return new job id"""
job = await self.get_background_job(job_id, org.id)
if not job:
raise HTTPException(status_code=404, detail="job_not_found")

if not job.finished:
raise HTTPException(status_code=400, detail="job_not_finished")

if job.success:
raise HTTPException(status_code=400, detail="job_already_succeeded")

file = await self.get_replica_job_file(job, org)

if job.type == BgJobType.CREATE_REPLICA:
primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage)
primary_endpoint, bucket_suffix = self.strip_bucket(
primary_storage.endpoint_url
)
primary_file_path = bucket_suffix + file.filename
await self.create_replica_job(
org,
file,
job.object_id,
job.object_type,
job.replica_storage,
primary_file_path,
primary_endpoint,
existing_job_id=job_id,
)

if job.type == BgJobType.DELETE_REPLICA:
await self.create_delete_replica_job(
org,
file,
job.object_id,
job.object_type,
job.replica_storage,
existing_job_id=job_id,
)

return {"success": True}


# ============================================================================
# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme
@@ -344,7 +475,6 @@ def init_background_jobs_api(

@router.get(
"/{job_id}",
tags=["backgroundjobs"],
response_model=AnyJob,
)
async def get_background_job(
@@ -354,7 +484,17 @@ async def get_background_job(
"""Retrieve information for background job"""
return await ops.get_background_job(job_id, org.id)

@router.get("", tags=["backgroundjobs"], response_model=PaginatedResponse)
@router.post(
"/{job_id}/retry",
)
async def retry_background_job(
job_id: str,
org: Organization = Depends(org_crawl_dep),
):
"""Retry background job"""
return await ops.retry_background_job(job_id, org)

@router.get("", response_model=PaginatedResponse)
async def list_background_jobs(
org: Organization = Depends(org_crawl_dep),
pageSize: int = DEFAULT_PAGE_SIZE,
12 changes: 8 additions & 4 deletions backend/btrixcloud/crawlmanager.py
Original file line number Diff line number Diff line change
@@ -74,14 +74,18 @@ async def run_replica_job(
primary_file_path: Optional[str] = None,
primary_endpoint: Optional[str] = None,
job_id_prefix: Optional[str] = None,
existing_job_id: Optional[str] = None,
):
"""run job to replicate file from primary storage to replica storage"""

if not job_id_prefix:
job_id_prefix = job_type
if existing_job_id:
job_id = existing_job_id
else:
if not job_id_prefix:
job_id_prefix = job_type

# ensure name is <=63 characters
job_id = f"{job_id_prefix[:52]}-{secrets.token_hex(5)}"
# ensure name is <=63 characters
job_id = f"{job_id_prefix[:52]}-{secrets.token_hex(5)}"

params = {
"id": job_id,
2 changes: 2 additions & 0 deletions backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
@@ -1237,6 +1237,8 @@ class BackgroundJob(BaseMongoModel):
started: datetime
finished: Optional[datetime] = None

previousAttempts: Optional[List[Dict[str, Optional[datetime]]]] = None


# ============================================================================
class CreateReplicaJob(BackgroundJob):
2 changes: 1 addition & 1 deletion backend/btrixcloud/webhooks.py
Original file line number Diff line number Diff line change
@@ -400,7 +400,7 @@ async def get_notification(
):
return await ops.get_notification(org, notificationid)

@router.get("/{notificationid}/retry")
@router.post("/{notificationid}/retry")
async def retry_notification(
notificationid: UUID,
org: Organization = Depends(org_owner_dep),
2 changes: 1 addition & 1 deletion backend/test/test_webhooks.py
Original file line number Diff line number Diff line change
@@ -99,7 +99,7 @@ def test_get_webhook_event(admin_auth_headers, default_org_id):

def test_retry_webhook_event(admin_auth_headers, default_org_id):
# Expect to fail because we haven't set up URLs that accept webhooks
r = requests.get(
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/webhooks/{_webhook_event_id}/retry",
headers=admin_auth_headers,
)

0 comments on commit 1afc411

Please sign in to comment.