diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index c6c8780f06..c6c27b19c4 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -121,7 +121,13 @@ async def init_index(self): await self.crawls.create_index( [("type", pymongo.HASHED), ("fileSize", pymongo.DESCENDING)] ) - + await self.crawls.create_index( + [ + ("state", pymongo.ASCENDING), + ("oid", pymongo.ASCENDING), + ("started", pymongo.ASCENDING), + ] + ) await self.crawls.create_index([("finished", pymongo.DESCENDING)]) await self.crawls.create_index([("oid", pymongo.HASHED)]) await self.crawls.create_index([("cid", pymongo.HASHED)]) @@ -336,6 +342,18 @@ async def list_crawls( return crawls, total + async def get_active_crawls(self, oid: UUID, limit: int) -> list[str]: + """get list of waiting crawls, sorted from earliest to latest""" + res = ( + self.crawls.find( + {"state": {"$in": RUNNING_AND_WAITING_STATES}, "oid": oid}, {"_id": 1} + ) + .sort({"started": 1}) + .limit(limit) + ) + res_list = await res.to_list() + return [res["_id"] for res in res_list] + async def delete_crawls( self, org: Organization, diff --git a/backend/btrixcloud/operator/crawls.py b/backend/btrixcloud/operator/crawls.py index 314d4eb30d..841f2de244 100644 --- a/backend/btrixcloud/operator/crawls.py +++ b/backend/btrixcloud/operator/crawls.py @@ -21,7 +21,6 @@ TYPE_NON_RUNNING_STATES, TYPE_RUNNING_STATES, TYPE_ALL_CRAWL_STATES, - NON_RUNNING_STATES, RUNNING_STATES, WAITING_STATES, RUNNING_AND_STARTING_ONLY, @@ -53,7 +52,6 @@ CMAP, PVC, CJS, - BTRIX_API, ) @@ -258,7 +256,7 @@ async def sync_crawls(self, data: MCSyncData): return self._empty_response(status) if status.state in ("starting", "waiting_org_limit"): - if not await self.can_start_new(crawl, data, status): + if not await self.can_start_new(crawl, status): return self._empty_response(status) await self.set_state( @@ -738,20 +736,10 @@ async def set_state( def get_related(self, data: MCBaseRequest): """return objects related to crawl pods""" - spec = data.parent.get("spec", {}) - crawl_id = spec["id"] - oid = spec.get("oid") - # filter by role as well (job vs qa-job) - role = data.parent.get("metadata", {}).get("labels", {}).get("role") - related_resources = [ - { - "apiVersion": BTRIX_API, - "resource": "crawljobs", - "labelSelector": {"matchLabels": {"btrix.org": oid, "role": role}}, - }, - ] - + related_resources = [] if self.k8s.enable_auto_resize: + spec = data.parent.get("spec", {}) + crawl_id = spec["id"] related_resources.append( { "apiVersion": METRICS_API, @@ -765,7 +753,6 @@ def get_related(self, data: MCBaseRequest): async def can_start_new( self, crawl: CrawlSpec, - data: MCSyncData, status: CrawlStatus, ): """return true if crawl can start, otherwise set crawl to 'queued' state @@ -774,27 +761,17 @@ async def can_start_new( if not max_crawls: return True + next_active_crawls = await self.crawl_ops.get_active_crawls( + crawl.oid, max_crawls + ) + # if total crawls < concurrent, always allow, no need to check further - if len(data.related[CJS]) <= max_crawls: + if len(next_active_crawls) < max_crawls: return True - name = data.parent.get("metadata", {}).get("name") - - # assume crawls already sorted from oldest to newest - # (seems to be the case always) - i = 0 - for crawl_sorted in data.related[CJS].values(): - # if crawl not running, don't count - if crawl_sorted.get("status", {}).get("state") in NON_RUNNING_STATES: - continue - - # if reached current crawl, if did not reach crawl quota, allow current crawl to run - if crawl_sorted.get("metadata").get("name") == name: - if i < max_crawls: - return True - - break - i += 1 + # allow crawl if within first list of active crawls + if crawl.id in next_active_crawls: + return True await self.set_state( "waiting_org_limit", status, crawl, allowed_from=["starting"]