Skip to content

Commit

Permalink
Redo background_jobs typing, add superuser non-org list endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
tw4l committed Feb 18, 2025
1 parent 25d49fb commit 87f22dd
Showing 1 changed file with 67 additions and 29 deletions.
96 changes: 67 additions & 29 deletions backend/btrixcloud/background_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ def _get_job_by_type_from_data(self, data: dict[str, object]):

async def list_background_jobs(
self,
org: Organization,
org: Optional[Organization] = None,
page_size: int = DEFAULT_PAGE_SIZE,
page: int = 1,
success: Optional[bool] = None,
Expand All @@ -572,7 +572,10 @@ async def list_background_jobs(
page = page - 1
skip = page_size * page

query: dict[str, object] = {"oid": org.id}
query: dict[str, object] = {}

if org:
query["oid"] = org.id

if success in (True, False):
query["success"] = success
Expand Down Expand Up @@ -640,8 +643,8 @@ async def get_replica_job_file(
raise HTTPException(status_code=404, detail="file_not_found")

async def retry_background_job(
self, job_id: str, org: Organization
) -> Dict[str, Union[bool, Optional[str]]]:
self, job_id: str, org: Optional[Organization] = None
):
"""Retry background job"""
job = await self.get_background_job(job_id)
if not job:
Expand All @@ -653,6 +656,22 @@ async def retry_background_job(
if job.success:
raise HTTPException(status_code=400, detail="job_already_succeeded")

if org:
return await self.retry_org_background_job(job, org)

if job.type == BgJobType.OPTIMIZE_PAGES:
await self.create_optimize_crawl_pages_job(
job.crawl_type,
existing_job_id=job_id,
)
return {"success": True}

return {"success": False}

async def retry_org_background_job(
self, job: BackgroundJob, org: Organization
) -> Dict[str, Union[bool, Optional[str]]]:
"""Retry background job specific to one org"""
if job.type == BgJobType.CREATE_REPLICA:
file = await self.get_replica_job_file(job, org)
primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage)
Expand All @@ -670,6 +689,7 @@ async def retry_background_job(
primary_endpoint,
existing_job_id=job_id,
)
return {"success": True}

if job.type == BgJobType.DELETE_REPLICA:
file = await self.get_replica_job_file(job, org)
Expand All @@ -682,35 +702,33 @@ async def retry_background_job(
force_start_immediately=True,
existing_job_id=job_id,
)
return {"success": True}

if job.type == BgJobType.DELETE_ORG:
await self.create_delete_org_job(
org,
existing_job_id=job_id,
)
return {"success": True}

if job.type == BgJobType.RECALCULATE_ORG_STATS:
await self.create_recalculate_org_stats_job(
org,
existing_job_id=job_id,
)
return {"success": True}

if job.type == BgJobType.READD_ORG_PAGES:
await self.create_re_add_org_pages_job(
org.id,
job.crawl_type,
existing_job_id=job_id,
)
return {"success": True}

if job.type == BgJobType.OPTIMIZE_PAGES:
await self.create_optimize_crawl_pages_job(
job.crawl_type,
existing_job_id=job_id,
)

return {"success": True}
return {"success": False}

async def retry_failed_background_jobs(
async def retry_failed_org_background_jobs(
self, org: Organization
) -> Dict[str, Union[bool, Optional[str]]]:
"""Retry all failed background jobs in an org
Expand All @@ -735,11 +753,9 @@ async def retry_all_failed_background_jobs(
"""
bg_tasks = set()
async for job in self.jobs.find({"success": False}):
org = None
if job["oid"]:
org = await self.org_ops.get_org_by_id(job["oid"])
else:
# Hacky workaround until we rework retry_background_job
org = await self.org_ops.get_default_org()
task = asyncio.create_task(self.retry_background_job(job["_id"], org))
bg_tasks.add(task)
task.add_done_callback(bg_tasks.discard)
Expand Down Expand Up @@ -767,7 +783,7 @@ def init_background_jobs_api(
"/{job_id}",
response_model=AnyJob,
)
async def get_background_job(
async def get_org_background_job(
job_id: str,
org: Organization = Depends(org_crawl_dep),
):
Expand All @@ -786,25 +802,20 @@ async def get_background_job_all_orgs(job_id: str, user: User = Depends(user_dep
"/orgs/all/jobs/{job_id}/retry", response_model=SuccessResponse, tags=["jobs"]
)
async def retry_background_job_no_org(job_id: str, user: User = Depends(user_dep)):
"""Retry backgound migration job"""
"""Retry backgound job that doesn't belong to an org, e.g. migration job"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

job = await ops.get_background_job(job_id)

org = None
if job.oid:
org = await ops.org_ops.get_org_by_id(job.oid)
# Use default org for org-less jobs without oid for now, until we
# can rework retry_background_job
elif job.type == BgJobType.OPTIMIZE_PAGES:
org = await ops.org_ops.get_default_org()
else:
return HTTPException(status_code=404, detail="job_not_found")

return await ops.retry_background_job(job_id, org)

@router.post("/{job_id}/retry", response_model=SuccessResponse, tags=["jobs"])
async def retry_background_job(
async def retry_org_background_job(
job_id: str,
org: Organization = Depends(org_crawl_dep),
):
Expand All @@ -821,14 +832,41 @@ async def retry_all_failed_background_jobs(user: User = Depends(user_dep)):

return await ops.retry_all_failed_background_jobs()

@router.post("/retryFailed", response_model=SuccessResponse)
async def retry_failed_background_jobs(
@router.post("/retryFailed", response_model=SuccessResponse, tags=["jobs"])
async def retry_failed_org_background_jobs(
org: Organization = Depends(org_crawl_dep),
):
"""Retry failed background jobs"""
return await ops.retry_failed_background_jobs(org)
return await ops.retry_failed_org_background_jobs(org)

@app.get(
"/orgs/all/jobs", response_model=PaginatedBackgroundJobResponse, tags=["jobs"]
)
async def list_all_background_jobs(
pageSize: int = DEFAULT_PAGE_SIZE,
page: int = 1,
success: Optional[bool] = None,
jobType: Optional[str] = None,
sortBy: Optional[str] = None,
sortDirection: Optional[int] = -1,
user: User = Depends(user_dep),
):
"""Retrieve paginated list of background jobs"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

jobs, total = await ops.list_background_jobs(
org=None,
page_size=pageSize,
page=page,
success=success,
job_type=jobType,
sort_by=sortBy,
sort_direction=sortDirection,
)
return paginated_format(jobs, total, page, pageSize)

@router.get("", response_model=PaginatedBackgroundJobResponse)
@router.get("", response_model=PaginatedBackgroundJobResponse, tags=["jobs"])
async def list_background_jobs(
org: Organization = Depends(org_crawl_dep),
pageSize: int = DEFAULT_PAGE_SIZE,
Expand All @@ -840,7 +878,7 @@ async def list_background_jobs(
):
"""Retrieve paginated list of background jobs"""
jobs, total = await ops.list_background_jobs(
org,
org=org,
page_size=pageSize,
page=page,
success=success,
Expand Down

0 comments on commit 87f22dd

Please sign in to comment.