From 016d1e36c80513a143c594dd31f8ff183578f255 Mon Sep 17 00:00:00 2001 From: Pavel Raiskup Date: Fri, 30 Jun 2023 16:59:56 +0200 Subject: [PATCH] on-demand tickets need to be prioritized Otherwise the on-demand resources might be taken by other tickets first. --- resallocserver/manager.py | 55 ++++++++++++++++++++++++++++++++------- tests/test_manager.py | 5 ++-- 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/resallocserver/manager.py b/resallocserver/manager.py index 1a8d4f4..258911b 100644 --- a/resallocserver/manager.py +++ b/resallocserver/manager.py @@ -142,7 +142,8 @@ class CrossPoolConfig: Some configuration loaded from pools.yaml that is not strictly related to a specific pool. """ - on_demand_tags = None + def __init__(self, on_demand_tags): + self.on_demand_tags = on_demand_tags def reload_config(): @@ -169,9 +170,7 @@ def reload_config(): for _, pool in pools.items(): pool.validate(on_demand) - config = CrossPoolConfig() - config.on_demand_tags = on_demand - return config, pools + return CrossPoolConfig(on_demand), pools class ThreadLocalData(threading.local): @@ -856,6 +855,18 @@ def object_id(self): """ Object_id from the resource id """ return self.resource.id +class PrioritizedTicketID(PriorityQueueTask): + """ + Structure wrapping Ticket as a PriorityQueueTask + """ + def __init__(self, ticket_id): + self.resource = ticket_id + + @property + def object_id(self): + """ Object_id from the resource id """ + return self.resource + class Manager(object): def __init__(self, sync): @@ -866,12 +877,35 @@ def _notify_waiting(self, thread_id): with self.sync.resource_ready: self.sync.resource_ready.notify_all() - def _assign_tickets(self): + def _assign_tickets(self, cross_pool_config): + ticket_id_queue = PriorityQueue() + + resource_ids = set() + + # Typically, the older the ticket is, the sooner we process that. But + # if it is requesting an on-demand resource, it has a higher priority. with session_scope() as session: qticket = QTickets(session) - tickets = [x.id for x in qticket.waiting().order_by(models.Ticket.id).all()] + for ticket in qticket.waiting().order_by(models.Ticket.id).all(): + if ticket.tag_set.intersection(cross_pool_config.on_demand_tags): + ticket_id_queue.add_task(ticket.id, priority=10) + else: + ticket_id_queue.add_task(ticket.id) + + # Remember the initial list of resource IDs so the newer ticket IDs + # do not overtake older. + qres = QResources(session) + for resource in qres.ready(): + resource_ids.add(resource.id) + + + while True: + try: + ticket_id = ticket_id_queue.pop_task() + except KeyError: + # no more tickets + break - for ticket_id in tickets: notify_ticket = False with session_scope() as session: ticket = session.query(models.Ticket).get(ticket_id) @@ -885,6 +919,9 @@ def _assign_tickets(self): queue = PriorityQueue() ticket_tags = ticket.tag_set for resource in resources: + if resource.id not in resource_ids: + continue + res_tags = resource.tag_set if resource.sandbox and resource.sandbox != ticket.sandbox: continue @@ -943,7 +980,7 @@ def _decide_where_to_start_on_demand_instances(self, config, pools): class _PrioritizedPool(PriorityQueueTask): """ - Resource with priority (calculated from matching tasks). + Structure wrapping Pool as a PriorityQueueTask """ def __init__(self, pool): self.resource = pool @@ -1012,7 +1049,7 @@ def _loop(self): # Assign tasks. This needs to be done after _detect_closed_tickets(), # because that call potentially releases some resources which need be # preferably re-used to not waste resources. - self._assign_tickets() + self._assign_tickets(cross_pool_config) def run(self): diff --git a/tests/test_manager.py b/tests/test_manager.py index 4ab9780..c4ee980 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -12,7 +12,7 @@ from resallocserver import models from resallocserver.app import session_scope from resallocserver.main import Synchronizer -from resallocserver.manager import Manager +from resallocserver.manager import Manager, CrossPoolConfig from tests import ResallocTestCase @@ -73,7 +73,8 @@ def test_resource_tag_priority(self, passengers, expected_car): sync = Synchronizer() manager = Manager(sync) - manager._assign_tickets() + + manager._assign_tickets(CrossPoolConfig(set())) with session_scope() as session: ticket = session.query(models.Ticket).get(1)