From 2e88c16e2de189fe6596983d37a9596d4cecc469 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Mon, 27 Oct 2025 21:44:15 -0700 Subject: [PATCH 1/3] handle concurrent crawls via db instead of returning all cjs objects via operator /customize --- backend/btrixcloud/crawls.py | 20 ++++++++++++- backend/btrixcloud/operator/crawls.py | 43 ++++++++------------------- 2 files changed, 32 insertions(+), 31 deletions(-) diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index c6c8780f06..d6f5736f49 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -121,7 +121,13 @@ async def init_index(self): await self.crawls.create_index( [("type", pymongo.HASHED), ("fileSize", pymongo.DESCENDING)] ) - + await self.crawls.create_index( + [ + ("type", pymongo.HASHED), + ("oid", pymongo.DESCENDING), + ("started", pymongo.ASCENDING), + ] + ) await self.crawls.create_index([("finished", pymongo.DESCENDING)]) await self.crawls.create_index([("oid", pymongo.HASHED)]) await self.crawls.create_index([("cid", pymongo.HASHED)]) @@ -336,6 +342,18 @@ async def list_crawls( return crawls, total + async def get_active_crawls(self, oid: UUID, limit: int) -> list[str]: + """get list of waiting crawls, sorted from earliest to latest""" + res = ( + self.crawls.find( + {"state": {"$in": RUNNING_AND_WAITING_STATES}, "oid": oid}, {"_id": 1} + ) + .sort({"started": 1}) + .limit(limit) + ) + res_list = await res.to_list() + return [res["_id"] for res in res_list] + async def delete_crawls( self, org: Organization, diff --git a/backend/btrixcloud/operator/crawls.py b/backend/btrixcloud/operator/crawls.py index 314d4eb30d..973c298df0 100644 --- a/backend/btrixcloud/operator/crawls.py +++ b/backend/btrixcloud/operator/crawls.py @@ -21,7 +21,7 @@ TYPE_NON_RUNNING_STATES, TYPE_RUNNING_STATES, TYPE_ALL_CRAWL_STATES, - NON_RUNNING_STATES, + # NON_RUNNING_STATES, RUNNING_STATES, WAITING_STATES, RUNNING_AND_STARTING_ONLY, @@ -53,7 +53,7 @@ CMAP, PVC, CJS, - BTRIX_API, + # BTRIX_API, ) @@ -740,17 +740,10 @@ def get_related(self, data: MCBaseRequest): """return objects related to crawl pods""" spec = data.parent.get("spec", {}) crawl_id = spec["id"] - oid = spec.get("oid") + # oid = spec.get("oid") # filter by role as well (job vs qa-job) - role = data.parent.get("metadata", {}).get("labels", {}).get("role") - related_resources = [ - { - "apiVersion": BTRIX_API, - "resource": "crawljobs", - "labelSelector": {"matchLabels": {"btrix.org": oid, "role": role}}, - }, - ] - + # role = data.parent.get("metadata", {}).get("labels", {}).get("role") + related_resources = [] if self.k8s.enable_auto_resize: related_resources.append( { @@ -774,27 +767,17 @@ async def can_start_new( if not max_crawls: return True + next_waiting_crawls = await self.crawl_ops.get_active_crawls( + crawl.oid, max_crawls + ) + print("WAITING CRAWLS", next_waiting_crawls) + # if total crawls < concurrent, always allow, no need to check further - if len(data.related[CJS]) <= max_crawls: + if len(next_waiting_crawls) < max_crawls: return True - name = data.parent.get("metadata", {}).get("name") - - # assume crawls already sorted from oldest to newest - # (seems to be the case always) - i = 0 - for crawl_sorted in data.related[CJS].values(): - # if crawl not running, don't count - if crawl_sorted.get("status", {}).get("state") in NON_RUNNING_STATES: - continue - - # if reached current crawl, if did not reach crawl quota, allow current crawl to run - if crawl_sorted.get("metadata").get("name") == name: - if i < max_crawls: - return True - - break - i += 1 + if crawl.id in next_waiting_crawls: + return True await self.set_state( "waiting_org_limit", status, crawl, allowed_from=["starting"] From c034bc900e862a5a1c7ffe2a4b4de7a9766bb6ed Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Wed, 29 Oct 2025 17:15:41 -0700 Subject: [PATCH 2/3] adjust index type --- backend/btrixcloud/crawls.py | 4 ++-- backend/btrixcloud/operator/crawls.py | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index d6f5736f49..c6c27b19c4 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -123,8 +123,8 @@ async def init_index(self): ) await self.crawls.create_index( [ - ("type", pymongo.HASHED), - ("oid", pymongo.DESCENDING), + ("state", pymongo.ASCENDING), + ("oid", pymongo.ASCENDING), ("started", pymongo.ASCENDING), ] ) diff --git a/backend/btrixcloud/operator/crawls.py b/backend/btrixcloud/operator/crawls.py index 973c298df0..2d71d9e92d 100644 --- a/backend/btrixcloud/operator/crawls.py +++ b/backend/btrixcloud/operator/crawls.py @@ -21,7 +21,6 @@ TYPE_NON_RUNNING_STATES, TYPE_RUNNING_STATES, TYPE_ALL_CRAWL_STATES, - # NON_RUNNING_STATES, RUNNING_STATES, WAITING_STATES, RUNNING_AND_STARTING_ONLY, @@ -53,7 +52,6 @@ CMAP, PVC, CJS, - # BTRIX_API, ) From be91e2b3cb9e8540e089e6b75270614c032ca957 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Wed, 29 Oct 2025 17:25:57 -0700 Subject: [PATCH 3/3] cleanup, remove unused --- backend/btrixcloud/operator/crawls.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/backend/btrixcloud/operator/crawls.py b/backend/btrixcloud/operator/crawls.py index 2d71d9e92d..841f2de244 100644 --- a/backend/btrixcloud/operator/crawls.py +++ b/backend/btrixcloud/operator/crawls.py @@ -256,7 +256,7 @@ async def sync_crawls(self, data: MCSyncData): return self._empty_response(status) if status.state in ("starting", "waiting_org_limit"): - if not await self.can_start_new(crawl, data, status): + if not await self.can_start_new(crawl, status): return self._empty_response(status) await self.set_state( @@ -736,13 +736,10 @@ async def set_state( def get_related(self, data: MCBaseRequest): """return objects related to crawl pods""" - spec = data.parent.get("spec", {}) - crawl_id = spec["id"] - # oid = spec.get("oid") - # filter by role as well (job vs qa-job) - # role = data.parent.get("metadata", {}).get("labels", {}).get("role") related_resources = [] if self.k8s.enable_auto_resize: + spec = data.parent.get("spec", {}) + crawl_id = spec["id"] related_resources.append( { "apiVersion": METRICS_API, @@ -756,7 +753,6 @@ def get_related(self, data: MCBaseRequest): async def can_start_new( self, crawl: CrawlSpec, - data: MCSyncData, status: CrawlStatus, ): """return true if crawl can start, otherwise set crawl to 'queued' state @@ -765,16 +761,16 @@ async def can_start_new( if not max_crawls: return True - next_waiting_crawls = await self.crawl_ops.get_active_crawls( + next_active_crawls = await self.crawl_ops.get_active_crawls( crawl.oid, max_crawls ) - print("WAITING CRAWLS", next_waiting_crawls) # if total crawls < concurrent, always allow, no need to check further - if len(next_waiting_crawls) < max_crawls: + if len(next_active_crawls) < max_crawls: return True - if crawl.id in next_waiting_crawls: + # allow crawl if within first list of active crawls + if crawl.id in next_active_crawls: return True await self.set_state(