Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,13 @@ async def init_index(self):
await self.crawls.create_index(
[("type", pymongo.HASHED), ("fileSize", pymongo.DESCENDING)]
)

await self.crawls.create_index(
[
("state", pymongo.ASCENDING),
("oid", pymongo.ASCENDING),
("started", pymongo.ASCENDING),
]
)
await self.crawls.create_index([("finished", pymongo.DESCENDING)])
await self.crawls.create_index([("oid", pymongo.HASHED)])
await self.crawls.create_index([("cid", pymongo.HASHED)])
Expand Down Expand Up @@ -336,6 +342,18 @@ async def list_crawls(

return crawls, total

async def get_active_crawls(self, oid: UUID, limit: int) -> list[str]:
"""get list of waiting crawls, sorted from earliest to latest"""
res = (
self.crawls.find(
{"state": {"$in": RUNNING_AND_WAITING_STATES}, "oid": oid}, {"_id": 1}
)
.sort({"started": 1})
.limit(limit)
)
res_list = await res.to_list()
return [res["_id"] for res in res_list]

async def delete_crawls(
self,
org: Organization,
Expand Down
47 changes: 12 additions & 35 deletions backend/btrixcloud/operator/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
TYPE_NON_RUNNING_STATES,
TYPE_RUNNING_STATES,
TYPE_ALL_CRAWL_STATES,
NON_RUNNING_STATES,
RUNNING_STATES,
WAITING_STATES,
RUNNING_AND_STARTING_ONLY,
Expand Down Expand Up @@ -53,7 +52,6 @@
CMAP,
PVC,
CJS,
BTRIX_API,
)


Expand Down Expand Up @@ -258,7 +256,7 @@ async def sync_crawls(self, data: MCSyncData):
return self._empty_response(status)

if status.state in ("starting", "waiting_org_limit"):
if not await self.can_start_new(crawl, data, status):
if not await self.can_start_new(crawl, status):
return self._empty_response(status)

await self.set_state(
Expand Down Expand Up @@ -738,20 +736,10 @@ async def set_state(

def get_related(self, data: MCBaseRequest):
"""return objects related to crawl pods"""
spec = data.parent.get("spec", {})
crawl_id = spec["id"]
oid = spec.get("oid")
# filter by role as well (job vs qa-job)
role = data.parent.get("metadata", {}).get("labels", {}).get("role")
related_resources = [
{
"apiVersion": BTRIX_API,
"resource": "crawljobs",
"labelSelector": {"matchLabels": {"btrix.org": oid, "role": role}},
},
]

related_resources = []
if self.k8s.enable_auto_resize:
spec = data.parent.get("spec", {})
crawl_id = spec["id"]
related_resources.append(
{
"apiVersion": METRICS_API,
Expand All @@ -765,7 +753,6 @@ def get_related(self, data: MCBaseRequest):
async def can_start_new(
self,
crawl: CrawlSpec,
data: MCSyncData,
status: CrawlStatus,
):
"""return true if crawl can start, otherwise set crawl to 'queued' state
Expand All @@ -774,27 +761,17 @@ async def can_start_new(
if not max_crawls:
return True

next_active_crawls = await self.crawl_ops.get_active_crawls(
crawl.oid, max_crawls
)

# if total crawls < concurrent, always allow, no need to check further
if len(data.related[CJS]) <= max_crawls:
if len(next_active_crawls) < max_crawls:
return True

name = data.parent.get("metadata", {}).get("name")

# assume crawls already sorted from oldest to newest
# (seems to be the case always)
i = 0
for crawl_sorted in data.related[CJS].values():
# if crawl not running, don't count
if crawl_sorted.get("status", {}).get("state") in NON_RUNNING_STATES:
continue

# if reached current crawl, if did not reach crawl quota, allow current crawl to run
if crawl_sorted.get("metadata").get("name") == name:
if i < max_crawls:
return True

break
i += 1
# allow crawl if within first list of active crawls
if crawl.id in next_active_crawls:
return True

await self.set_state(
"waiting_org_limit", status, crawl, allowed_from=["starting"]
Expand Down
Loading