Skip to content

Commit

Permalink
on-demand tickets need to be prioritized
Browse files Browse the repository at this point in the history
Otherwise the on-demand resources might be taken by other tickets first.
  • Loading branch information
praiskup committed Aug 2, 2023
1 parent af7e82b commit cc8a49a
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 11 deletions.
55 changes: 46 additions & 9 deletions resallocserver/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ class CrossPoolConfig:
Some configuration loaded from pools.yaml that is not strictly related to
a specific pool.
"""
on_demand_tags = None
def __init__(self, on_demand_tags):
self.on_demand_tags = on_demand_tags


def reload_config():
Expand All @@ -169,9 +170,7 @@ def reload_config():
for _, pool in pools.items():
pool.validate(on_demand)

config = CrossPoolConfig()
config.on_demand_tags = on_demand
return config, pools
return CrossPoolConfig(on_demand), pools


class ThreadLocalData(threading.local):
Expand Down Expand Up @@ -856,6 +855,18 @@ def object_id(self):
""" Object_id from the resource id """
return self.resource.id

class PrioritizedTicketID(PriorityQueueTask):
"""
Structure wrapping Ticket as a PriorityQueueTask
"""
def __init__(self, ticket_id):
self.resource = ticket_id

@property
def object_id(self):
""" Object_id from the resource id """
return self.resource


class Manager(object):
def __init__(self, sync):
Expand All @@ -866,12 +877,35 @@ def _notify_waiting(self, thread_id):
with self.sync.resource_ready:
self.sync.resource_ready.notify_all()

def _assign_tickets(self):
def _assign_tickets(self, cross_pool_config):
ticket_id_queue = PriorityQueue()

resource_ids = set()

# Typically, the older the ticket is, the sooner we process that. But
# if it is requesting an on-demand resource, it has a higher priority.
with session_scope() as session:
qticket = QTickets(session)
tickets = [x.id for x in qticket.waiting().order_by(models.Ticket.id).all()]
for ticket in qticket.waiting().order_by(models.Ticket.id).all():
if ticket.tag_set.intersection(cross_pool_config.on_demand_tags):
ticket_id_queue.add_task(ticket.id, priority=10)
else:
ticket_id_queue.add_task(ticket.id)

# Remember the initial list of resource IDs so the newer ticket IDs
# do not overtake older.
qres = QResources(session)
for resource in qres.ready():
resource_ids.add(resource.id)


while True:
try:
ticket_id = ticket_id_queue.pop_task()
except KeyError:
# no more tickets
break

for ticket_id in tickets:
notify_ticket = False
with session_scope() as session:
ticket = session.query(models.Ticket).get(ticket_id)
Expand All @@ -885,6 +919,9 @@ def _assign_tickets(self):
queue = PriorityQueue()
ticket_tags = ticket.tag_set
for resource in resources:
if resource.id not in resource_ids:
continue

res_tags = resource.tag_set
if resource.sandbox and resource.sandbox != ticket.sandbox:
continue
Expand Down Expand Up @@ -943,7 +980,7 @@ def _decide_where_to_start_on_demand_instances(self, config, pools):

class _PrioritizedPool(PriorityQueueTask):
"""
Resource with priority (calculated from matching tasks).
Structure wrapping Pool as a PriorityQueueTask
"""
def __init__(self, pool):
self.resource = pool
Expand Down Expand Up @@ -1012,7 +1049,7 @@ def _loop(self):
# Assign tasks. This needs to be done after _detect_closed_tickets(),
# because that call potentially releases some resources which need be
# preferably re-used to not waste resources.
self._assign_tickets()
self._assign_tickets(cross_pool_config)


def run(self):
Expand Down
5 changes: 3 additions & 2 deletions tests/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from resallocserver import models
from resallocserver.app import session_scope
from resallocserver.main import Synchronizer
from resallocserver.manager import Manager
from resallocserver.manager import Manager, CrossPoolConfig

from tests import ResallocTestCase

Expand Down Expand Up @@ -73,7 +73,8 @@ def test_resource_tag_priority(self, passengers, expected_car):

sync = Synchronizer()
manager = Manager(sync)
manager._assign_tickets()

manager._assign_tickets(CrossPoolConfig(set()))

with session_scope() as session:
ticket = session.query(models.Ticket).get(1)
Expand Down

0 comments on commit cc8a49a

Please sign in to comment.