diff --git a/Makefile b/Makefile index e6fd4ca..332c18c 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,7 @@ SHELLTEST_OPTIONS := SHELL_TESTS := \ basic.sh \ check.sh \ + ondemand.sh \ reuse.sh TEST_PYTHONS := python3 @@ -28,7 +29,7 @@ shelltests: unittests: status=true ; \ for python in $(TEST_PYTHONS); do \ - PYTHON=$$python ./unittests.sh || status=false ; \ + PYTHON=$$python ./unittests.sh -vv || status=false ; \ done ;\ $$status diff --git a/NEWS b/NEWS index 7a28a8b..3b65357 100644 --- a/NEWS +++ b/NEWS @@ -1,3 +1,13 @@ +New in v5.0: + +* New features + + - New concept of "on demand" ticket tags added, these on demand tags + trigger a resource allocation. Until such a ticket is taken, the + corresponding resource pool has no resource allocated. + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + New in v4.10: * New features diff --git a/README.md b/README.md index 22cf22c..7aa5c7e 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,9 @@ own when starting from scratch. new VM in the cloud and running Ansible playbooks to provision it can take few minutes), and users don't want to wait. It is a good idea to preallocate a small number of resources that are ready to be used immediately. +- On demand allocation - In special "on demand" pool mode, resources are not + preallocated in advance but started on demand, only upon a ticket requesting + the resources. - Livechecks - Clouds are unreliable. VMs can break while starting or become unresponsive for various reasons. Resalloc periodically checks the liveness of all resources and makes sure money doesn't leak out of our pockets. diff --git a/config/pools.yaml b/config/pools.yaml index 31bfcde..f8304d2 100644 --- a/config/pools.yaml +++ b/config/pools.yaml @@ -92,6 +92,27 @@ ## - ci_test_machine_x86_64 ## - ci_test_machine ## +## # This is similar to the "tags" configuration in terms of "matching +## # resources to tickets". But on demand tags trigger a completely +## # different pool behavior. Instead of preallocating a set of "free" +## # resources in advance dynamically, pool with the "tags_on_demand" +## # configured have by default zero resources allocated until some existing +## # ticket is taken with at least one of predefined "tags_on_demand". The +## # more tags are taken, the more resources are allocated on demand. By +## # example, if `beefy` tag is configured in pool, no resource is started +## # till `resalloc ticket --tag beefy` is taken. Note that contrary to +## # normal pools, the resources are allocated on demand, so resolving such +## # tickets always takes some time (unless the resource is reused within +## # reuse_opportunity_time). Multiple pools may provide the same +## # "on_demand_tags", but those tags may not be mixed between the "tags" +## # and "on_demand_tags" in multiple pools (configuration runtime error is +## # generated in such case). The "max_prealloc" config, if also +## # specified, is ignored (no preallocation is possible). +## tags_on_demand: +## - beefy_machine_x86_64 +## - name: beefy_machine +## priority: -10 +## ## # The "reuse" feature options. These options configure the mechanism of ## # re-assigning of previously used resources to multiple subsequent tickets ## # (when the assigned tickets belong to the same --sandbox). Still, when the diff --git a/pylintrc b/pylintrc index 14802e7..67bacd2 100644 --- a/pylintrc +++ b/pylintrc @@ -24,7 +24,16 @@ ignored-classes=RState,TState # This is to work-around the issues in our diff-lint CI where we do not have # all the build/runtime requirements installed. We can ignore this error # because the packaging CI would actually discover the import issues anyway. -disable=useless-object-inheritance,import-error +# too-few-public-methos +# Deliberately using classes instead of namedtuple, because it helps us with +# typing (completion). +# consider-using-f-string +# We still support EPEL7. +# too-many-lines +# too-many-locals +# too-many-branches +# Stylis errors that are low-priority, and sometimes hard to avoid. +disable=useless-object-inheritance,import-error,too-few-public-methods,consider-using-f-string,too-many-lines,too-many-locals,too-many-branches [DESIGN] min-public-methods=1 diff --git a/resallocserver/logic.py b/resallocserver/logic.py index 09212f7..668051d 100644 --- a/resallocserver/logic.py +++ b/resallocserver/logic.py @@ -18,10 +18,11 @@ import time import uuid +from sqlalchemy.orm import Query, joinedload +from sqlalchemy import or_ + from resalloc.helpers import RState, TState from resallocserver import models -from sqlalchemy.orm import Query -from sqlalchemy import or_ def assign_ticket(resource, ticket): @@ -69,10 +70,11 @@ def ready(self): """ Get ready resources, those which were never assigned or are released. The sandbox-assigned resources are sorted above others - so they can be - re-used first. + re-used first. The query is already ordered by ID ASC. """ return (self.up().filter(models.Resource.ticket_id.is_(None)) - .filter(models.Resource.check_failed_count==0)) + .filter(models.Resource.check_failed_count==0) + .order_by(models.Resource.id.asc())) def taken(self): """ @@ -156,9 +158,15 @@ def kill(self, res_id): class QTickets(QObject): query = Query(models.Ticket) - def waiting(self): - return self.query.filter_by(resource_id=None)\ - .filter_by(state=TState.OPEN) + def waiting(self, preload_tags=False): + query = ( + self.query.filter_by(resource_id=None) + .filter_by(state=TState.OPEN) + ) + if preload_tags: + query = query.options(joinedload("tags")) + return query + def not_closed(self): return self.query.filter(models.Ticket.state != TState.CLOSED) diff --git a/resallocserver/manager.py b/resallocserver/manager.py index ac049e5..36a8faf 100644 --- a/resallocserver/manager.py +++ b/resallocserver/manager.py @@ -31,7 +31,7 @@ from resallocserver.logic import ( QResources, QTickets, assign_ticket, release_resource ) -from resallocserver.priority_queue import PriorityQueue, PriorityQueueTask +from resallocserver.priority_queue import PriorityQueue REUSED_RESOURCE_PRIORITY = 500 @@ -111,6 +111,42 @@ def run_command(pool_id, res_id, res_name, id_in_pool, command, ltype='alloc', } +def normalize_tags(tags): + """ + Tags can be array of str() or dict() fields. Transform strings to the + dict() variant so we can later work with them uniformly. + """ + if not tags: + return + + new_tags = [] + for tag in tags: + if isinstance(tag, str): + new_tags.append({ + "name": tag, + "priority": 0, + }) + elif isinstance(tag, dict): + new_tags.append({ + "name": tag["name"], + "priority": tag.get('priority', 0) + }) + else: + assert False + + del tags[:] + tags.extend(new_tags) + + +class CrossPoolConfig: + """ + Some configuration loaded from pools.yaml that is not strictly related to + a specific pool. + """ + def __init__(self, on_demand_tags): + self.on_demand_tags = on_demand_tags + + def reload_config(): config_dir = app.config["config_dir"] config_file = os.path.join(config_dir, "pools.yaml") @@ -121,10 +157,21 @@ def reload_config(): assert not pool_id in pools pool = Pool(pool_id) pool.from_dict(config[pool_id]) - pool.validate() + if pool.tags and pool.tags_on_demand: + pool.tags += pool.tags_on_demand + pool.max_prealloc = 0 + pools[pool_id] = pool - return pools + on_demand = set() + for _, pool in pools.items(): + for tag in pool.tags_on_demand: + on_demand.add(tag["name"]) + + for _, pool in pools.items(): + pool.validate(on_demand) + + return CrossPoolConfig(on_demand), pools class ThreadLocalData(threading.local): @@ -280,28 +327,12 @@ def job(self): resource.state = RState.ENDED if output['status'] else RState.UP resource.data = output['stdout'] tags = [] - if not isinstance(self.pool.tags, list): - msg = "Pool {pool} has set 'tags' set, but that's not an array"\ - .format(pool=self.name) - warnings.warn(msg) - else: - for tag in self.pool.tags: - tag_name = None - tag_priority = 0 - if isinstance(tag, str): - # older format - tag_name = tag - elif isinstance(tag, dict): - tag_name = tag['name'] - tag_priority = tag.get('priority', 0) - else: - assert False - - tag_obj = models.ResourceTag() - tag_obj.id = tag_name - tag_obj.resource_id = resource.id - tag_obj.priority = tag_priority - tags.append(tag_obj) + for tag in self.pool.tags: + tag_obj = models.ResourceTag() + tag_obj.id = tag["name"] + tag_obj.resource_id = resource.id + tag_obj.priority = tag["priority"] + tags.append(tag_obj) self.log.info("Allocating %s finished => %s", resource.name, resource.state) @@ -374,7 +405,7 @@ def _list_known_resources(self): class Watcher(threading.Thread): def loop(self): app.log.info("Watcher loop") - pools = reload_config() + _, pools = reload_config() to_check = {} with session_scope() as session: # Even though we never terminate resources that have assigned @@ -447,17 +478,50 @@ class Pool(object): livecheck_period = 600 livecheck_attempts = 3 tags = None + tags_on_demand = [] name_pattern = "{pool_name}_{id}_{datetime}" reuse_opportunity_time = 0 reuse_max_count = 0 reuse_max_time = 3600 + start_on_demand_this_cycle = 0 + def __init__(self, id): self.id = id # TODO: drop this self.name = id + @property + def tag_set(self): + """ Returns set() of (all) tag names assigned to this pool """ + retval = set() + for tag in self.tags: + retval.add(tag["name"]) + return retval + + @property + def tag_set_on_demand(self): + """ Returns set() of on-demand tag names assigned to this pool """ + retval = set() + for tag in self.tags_on_demand: + retval.add(tag["name"]) + return retval + + def get_tags_priority(self, queried_tags): + """ + Given a set of tags, calculate the priority this Pool is given. + If the pool doesn't match all the requested tags, return None + """ + priority = 0 + found_tags = set() + for tag in self.tags: + if tag["name"] in queried_tags: + priority += tag["priority"] + found_tags.add(tag["name"]) + if found_tags == queried_tags: + return priority + return None def loop(self, event): """ @@ -466,25 +530,27 @@ def loop(self, event): ``Synchronizer().ticket`` object. """ - # decouple ticket from resource, and maybe switch UP → RELEASING - self._detect_closed_tickets(event) - # switch UP → DELETE_REQUEST self._request_resource_removal() - # switch DELETE_REQUEST → ENDED + # switch DELETE_REQUEST → DELETING (→ ENDED on background) self._garbage_collector(event) + # New in STARTING (slowly switching → UP on background) self._allocate_more_resources(event) # Delete all resources that are not recognized by resalloc self._clean_unknown_resources(event) - def validate(self): + def validate(self, on_demand_tag_set): assert(self.cmd_new) assert(self.cmd_delete) + for tag in on_demand_tag_set: + if tag in self.tag_set: + assert tag in self.tag_set_on_demand + def _allocate_pool_id(self, session, resource): # allocate the lowest available pool_id @@ -535,7 +601,7 @@ def allocate(self, event): AllocWorker(event, self, int(resource_id)).start() def from_dict(self, data): - allowed_types = [int, str, dict, type(None)] + allowed_types = [int, str, dict, type(None), list] if type(data) != dict: # TODO: warning @@ -556,6 +622,17 @@ def from_dict(self, data): else: setattr(self, key, data[key]) + for attr in ["tags", "tags_on_demand"]: + obj = getattr(self, attr, None) + if not isinstance(obj, list): + msg = "Pool {} attribute {} must is not list, ignoring".format( + self.name, attr) + warnings.warn(msg) + setattr(self, attr, []) + + obj = getattr(self, attr) + normalize_tags(obj) + def _too_soon(self): last_start = 0.0 with session_scope() as session: @@ -584,13 +661,25 @@ def _allocate_more_resources(self, event): msg = msg + ' {0}={1}'.format(key,val) app.log.debug(msg) - if stats['on'] >= self.max \ - or stats['free'] + stats['start'] >= self.max_prealloc \ - or stats['start'] >= self.max_starting \ - or self._too_soon(): - # Quota reached, don't allocate more. + if stats['on'] >= self.max: break + if self.max_prealloc: + # Normal "preallocated" instances + if stats['free'] + stats['start'] >= self.max_prealloc: + break + + elif self.start_on_demand_this_cycle <= 0: + # The "on-demand" instances + break + + if stats['start'] >= self.max_starting: + break + + if self._too_soon(): + break + + self.start_on_demand_this_cycle -= 0 self.allocate(event) def _clean_unknown_resources(self, event): @@ -617,7 +706,8 @@ def _clean_unknown_resources(self, event): dbinfo.cleaning_unknown_resources = datetime.now() session.add(dbinfo) - def _detect_closed_tickets(self, event): + def detect_closed_tickets(self, event): + """ decouple ticket from resource, and maybe switch UP → RELEASING """ close_resources = [] with session_scope() as session: @@ -644,6 +734,64 @@ def _detect_closed_tickets(self, event): for resource_id in close_resources: ReleaseWorker(event, self, resource_id).start() + + def _request_on_demand_resources_removal(self, session): + """ + on-demand resources are typically expensive (e.g. the per-hour price) + and we don't want to keep them unused unreasonably long (e.g. when the + requesting ticket was closed for any reason). + + We pay much more attention for the resource allocation logic than to + this cleanup logic (only allocate more resources in one of the pools + that match). So it _is not typical_ we want to hit some resource here, + only if user really changed the mind and closed the on-demand ticket + prematurely. + + Consider that + - our pool provides on-demand tags [A, B] and normal tag [C] + - there are these waiting tickets 1=[A, C], 2=[B], 3=[A, B], 4=[C, D], + and 5=[C]. + - we have 5 resources allocated and ready to be assigned + + Then 4. is not matching, so we don't reflect it. Though 5. is a ticket + that is not "on-demand", but we can serve and we take it into account. + We do this because, when this expensive resource was already allocated, + it is bad to just drop it without actual use - so we prefer to give it + to even a less privileged ticket. + + So we eventually pay attention to 1.-4., and we have one more - the + oldest resource is going to be terminated. + """ + + if not self.tags_on_demand: + # resources in this pool are not started on-demand + return + + we_provide = self.tag_set + + qtickets = QTickets(session) + waiting_on_us = 0 + for ticket in qtickets.waiting(preload_tags=True): + if ticket.tag_set.issubset(we_provide): + waiting_on_us += 1 + + qres = QResources(session, pool=self.name) + resources = list(qres.ready()) + + # Consider only those instances that were never taken + resources = [r for r in resources if not r.releases_counter] + remove = len(resources) - waiting_on_us + + remove_item = 0 + while remove > 0: + resource = resources[remove_item] + app.log.debug("Deleting on-demand instance %s for not enough " + "tickets", resource.id) + resource.state = RState.DELETE_REQUEST + remove -= 1 + remove_item += 1 + + def _request_resource_removal(self): with session_scope() as session: now = time.time() @@ -688,6 +836,9 @@ def _request_resource_removal(self): res.state = RState.DELETE_REQUEST continue + self._request_on_demand_resources_removal(session) + + def _garbage_collector(self, event): to_terminate = [] with session_scope() as session: @@ -696,19 +847,6 @@ def _garbage_collector(self, event): TerminateWorker(event, self, int(res.id)).start() -class PrioritizedResource(PriorityQueueTask): - """ - Resource with priority (calculated from matching tasks). - """ - def __init__(self, resource): - self.resource = resource - - @property - def object_id(self): - """ Object_id from the resource id """ - return self.resource.id - - class Manager(object): def __init__(self, sync): self.sync = sync @@ -718,12 +856,35 @@ def _notify_waiting(self, thread_id): with self.sync.resource_ready: self.sync.resource_ready.notify_all() - def _assign_tickets(self): + def _assign_tickets(self, cross_pool_config): + ticket_id_queue = PriorityQueue() + + resource_ids = set() + + # Typically, the older the ticket is, the sooner we process that. But + # if it is requesting an on-demand resource, it has a higher priority. with session_scope() as session: qticket = QTickets(session) - tickets = [x.id for x in qticket.waiting().order_by(models.Ticket.id).all()] + for ticket in qticket.waiting().order_by(models.Ticket.id).all(): + if ticket.tag_set.intersection(cross_pool_config.on_demand_tags): + ticket_id_queue.add_task(ticket.id, priority=10) + else: + ticket_id_queue.add_task(ticket.id) + + # Remember the initial list of resource IDs so the newer ticket IDs + # do not overtake older. + qres = QResources(session) + for resource in qres.ready(): + resource_ids.add(resource.id) + + + while True: + try: + ticket_id = ticket_id_queue.pop_task() + except KeyError: + # no more tickets + break - for ticket_id in tickets: notify_ticket = False with session_scope() as session: ticket = session.query(models.Ticket).get(ticket_id) @@ -737,6 +898,9 @@ def _assign_tickets(self): queue = PriorityQueue() ticket_tags = ticket.tag_set for resource in resources: + if resource.id not in resource_ids: + continue + res_tags = resource.tag_set if resource.sandbox and resource.sandbox != ticket.sandbox: continue @@ -774,17 +938,144 @@ def _assign_tickets(self): self._notify_waiting(notify_ticket) + def _decide_where_to_start_on_demand_instances(self, config, pools): + """ + The on-demand resources are a bit more difficult than those + pre-allocated, because, if there's a triggering "on-demand" ticket we + don't want to simply start allocating machines in all the capable + pools, but only in one of them. + """ + + pools_on_demand = [pool for _, pool in pools.items() + if pool.tags_on_demand] + pools_on_demand.sort(key=lambda x: x.name) + + all_stats = {} + tickets_to_solve = {} + + with session_scope() as session: + # Gather pools' statistics + for pool in pools_on_demand: + qres = QResources(session, pool=pool.name) + stats = qres.stats() + # Some resources may be already started or being started by the + # previous manager's loop() because there are some + # not-yet-processed tickets. And we don't want the "old" + # tickets to trigger another VM allocation here. We calculate + # _all_ "ready" resources here, and those being started. Note + # that we know nothing about the resources themselves, so we + # ignore the "released" (included in "free") resources - simply + # put, released resources blindly block an allocation of another + # resources (for now) and must be terminated first. + stats["already_existing"] = stats["start"] + stats["ready"] + \ + stats["releasing"] + all_stats[pool.id] = stats + + # Gather the "on demand" tickets to start new resources for. + qticket = QTickets(session) + + for ticket in qticket.waiting(preload_tags=True): + # On-demand tickets only! The tags are pre-loaded above. + if not ticket.tag_set.intersection(config.on_demand_tags): + continue + + # Only tickets that do not have an already starting resource! + ticket_has_resource = False + ticket_has_capable_pool = False + + for pool in pools_on_demand: + if pool.get_tags_priority(ticket.tag_set) is None: + # this pool doesn't match the tag-set + continue # maybe the next pool? + + ticket_has_capable_pool = True + + pool_stats = all_stats[pool.id] + if pool_stats["already_existing"] > 0: + # We won't start a resource for this ticket, already + # starting! + pool_stats["already_existing"] -= 1 + ticket_has_resource = True + break # go to the next ticket + + if not ticket_has_capable_pool: + app.log.error("Couldn't find appropriate on demand pool for " + "ticket=%s, it will never be resolved!", + ticket.id) + continue + + if ticket_has_resource: + app.log.debug("Ticket=%s likely has a resource running", + ticket.id) + continue + + app.log.info("Ticket handled %s", ticket.id) + # let's try to start a new resource for this ticket + tickets_to_solve[ticket.id] = ticket.tag_set + + + for ticket_id, ticket_tags in tickets_to_solve.items(): + + # Construct a list (priority queue) of all Pools related to the + # the handled ticket. + queue = PriorityQueue() + for pool in pools_on_demand: + priority = pool.get_tags_priority(ticket_tags) + if priority is None: + # None means that the pool doesn't have all the tags + continue + # Note that some pool might be failing to start instances right + # now. We should somehow decrease the priority of such a pool + # here, to (e.g. randomly?) try the less prioritized pool. + queue.add_task(pool, priority) + + # Pick the first adequate Pool (per priority) and start a new + # resource inside. + startup_triggered = False + while True: + try: + pool = queue.pop_task() + except KeyError: + # no more pools matching the tag-set + break + + stats = all_stats[pool.id] + if any([ + pool.start_on_demand_this_cycle + stats['on'] >= pool.max, + pool.start_on_demand_this_cycle + stats['start'] >= pool.max_starting, + ]): + continue # try next capable pool? + + startup_triggered = True + app.log.debug("Ticket=%s starts id in pool=%s", ticket_id, + pool.id) + pool.start_on_demand_this_cycle += 1 + break # we've identified the pool, go to the next ticket! + + if not startup_triggered: + app.log.debug("Ticket=%s can not start new resources, " + "quotas reached", ticket_id) + + def _loop(self): app.log.info("Manager's loop.") # Cleanup the old resources. - for _, pool in reload_config().items(): + cross_pool_config, pools = reload_config() + + + for _, pool in pools.items(): + pool.detect_closed_tickets(self.sync.ticket) + + self._decide_where_to_start_on_demand_instances(cross_pool_config, pools) + + for _, pool in pools.items(): pool.loop(self.sync.ticket) - # Assign tasks. This needs to be done after _detect_closed_tickets(), + # Assign tasks. This needs to be done after detect_closed_tickets(), # because that call potentially releases some resources which need be # preferably re-used to not waste resources. - self._assign_tickets() + self._assign_tickets(cross_pool_config) def run(self): diff --git a/resallocserver/priority_queue.py b/resallocserver/priority_queue.py index d2f0c14..a20afb7 100644 --- a/resallocserver/priority_queue.py +++ b/resallocserver/priority_queue.py @@ -2,6 +2,11 @@ Python priority queue implementation. """ +# TODO: rename "Task" to "Item" if we want to make this a library, we can +# place any kind of resource into the Queue (Pools, Tickets, etc.). +# TODO: enforce the PriorityQueueTask use. Using the default __repr__ for +# generic objects might lead to misbehavior. + import itertools from heapq import heappop, heappush diff --git a/shelltests/testlib b/shelltests/testlib index 003e455..bd7ba8b 100644 --- a/shelltests/testlib +++ b/shelltests/testlib @@ -74,6 +74,12 @@ fail () } fatal () { echo >&2 "FATAL: $*"; exit 1 ; } +assert() { + eval "$2" || { + fatal "$1 ($2)" + } +} + # postgresql_start PORT DIR SOCKETDIR # ----------------------------------- postgresql_start () @@ -83,4 +89,93 @@ postgresql_start () pg_ctl start -w -o "-p $PORT -k $SOCKETDIR" -D "$DIR" } +wait_up_resources() +{ + message=$1 + info "Wait for $wait_num resources, $message" + counter=30 + while true; do + up=$(maint resource-list --up | wc -l) + counter=$(( counter - 1)) + if test "$up" -eq "$wait_num"; then + break + fi + test "$up" -lt "$wait_num" || { + maint resource-list --up + error "too many resources" + } + test $counter -gt 0 + sleep 1 + done +} + +wait_ticket_resolved() +{ + _counter=30 + _ticket=$1 + _message="" + test -z "$2" || _message=" ($2)" + info "Wait for ticket $_ticket being resolved$_message" + while true; do + _count=$(maint ticket-list | grep "^$_ticket -" | grep -c resource= || :) + test "$_count" -ne 1 || break # successful waiting + test "$_counter" -le 0 && fatal "Unsuccessful waiting for ticket $_ticket" + _counter=$(( _counter - 1)) + sleep 1 + done + + wait_ticket_resolved_result_resource=$( + maint ticket-list | grep "^$_ticket -" | sed 's/.*resource=//' + ) +} + +get_assigned_ticket_count() +{ + get_assigned_ticket_count_result=$(maint ticket-list | grep -c resource=) +} + +get_resource_count() +{ + get_resource_count_result=$(maint resource-list | wc -l) +} + +get_ticket_count() +{ + get_ticket_count_result=$(maint ticket-list | wc -l) +} + +assert_assigned_ticket_count() +{ + get_assigned_ticket_count + assert "Unexpected number of assigned tickets - $2" \ + "test $get_assigned_ticket_count_result = $1" +} + +assert_ticket_count() +{ + get_ticket_count + assert "Unexpected total number of tickets - $2" \ + "test $get_ticket_count_result = $1" +} + +assert_resource_count() +{ + get_resource_count + assert "Unexpected total number of resources - $2" \ + "test $get_resource_count_result = $1" +} + +get_ticket() +{ + get_ticket_result=$(client ticket "$@") +} + +assert_starts_with() +{ + case $1 in + $2*) ;; + *) fatal "Assert that $1 starts with $2 failed, $3" ;; + esac +} + # vi: ft=sh diff --git a/shelltests/tests/ondemand.sh b/shelltests/tests/ondemand.sh new file mode 100755 index 0000000..75d800b --- /dev/null +++ b/shelltests/tests/ondemand.sh @@ -0,0 +1,267 @@ +#! /bin/bash + +# Check that "on demand" resources work correctly in Resalloc +# Copyright (C) 2023 Red Hat, Inc. +# +# This file is part of resalloc project. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +DBNAME=resalloc-test-on-demand +: "${DATABASE=sqlite}" + +. ./testlib + +cd "$WORKDIR" || exit 1 + +debug "I'm in $PWD" +cleanup_actions=() +cleanup () +{ + debug "cleanup" + set +e + for action in "${cleanup_actions[@]}" + do + eval "$action" + done +} +trap cleanup EXIT + +mkdir etc +cat > etc/pools.yaml <&2 before; env | grep RESALLOC_; echo >&2 after" + cmd_delete: "echo >&2 stderr; echo stdout" + tags: + - A + - B + - prealloc + +ondemand1: + max: 3 + max_prealloc: 5 # unused + cmd_new: "echo >&2 before; sleep 3 ; env | grep RESALLOC_; echo >&2 after" + cmd_delete: "echo >&2 stderr; echo stdout" + tags: + - A + - B + - C1 + - name: priority # This tag is prioritized here in ondemand1 + priority: 2 + tags_on_demand: + - ondemand # This tag is prioritized in ondemand2 pool + - ondemand1 + - ondemand_not_prioritized + reuse_opportunity_time: 5 + +ondemand2: + max: 3 + max_prealloc: 5 # unused + cmd_new: "echo >&2 before; sleep 3 ; env | grep RESALLOC_; echo >&2 after" + cmd_delete: "echo >&2 stderr; echo stdout" + tags: + - A + - B + - C2 + - name: priority + priority: 1 + tags_on_demand: + - name: ondemand + priority: 1 + - ondemand2 + - ondemand_not_prioritized + reuse_opportunity_time: 5 +EOF + +uname=$(id -u -n) +dburl="sqlite:///$WORKDIR/server-sql" +case $DATABASE in + sqlite) ;; + postgresql) + port=${POSTGRESQL_PORT-65432} + host=/tmp + datadir=$WORKDIR/pgdata + info "preparing PostgreSQL server" + postgresql_start "$port" "$datadir" "$host" &>/dev/null + createdb -p "$port" -h "$host" "$DBNAME" + cleanup_actions+=( "pg_ctl stop -D \"$datadir\" -m i >/dev/null" ) + dburl="postgresql://$uname@/$DBNAME?host=$host&port=$port" + ;; + *) false ;; +esac + +cat > etc/server.yaml </dev/null +cleanup_actions+=( "shutdown_server $server_pid" ) + +# Wait for the DB startup and init. +counter=30 +while ! maint resource-list &>/dev/null; do + counter=$(( counter - 1 )) + test $counter -gt 0 +done + +# this ticket is never going to be closed, never matches any pool, and never +# creates any demand resource +get_ticket --tag A --tag B --tag C + +# this ticket matches ondemand2, though it doesn't start a new resource +get_ticket --tag A --tag B --tag C2 + +# This ticket takes one pre-alloc resource, 2nd one gets preallocated +get_ticket --tag A --tag B --tag prealloc +ticket_in_basic_prealloc=$get_ticket_result + +# This ticket triggers one on-demand resource startup +get_ticket --tag A --tag B --tag ondemand + +wait_num=3 wait_up_resources "2x 'basic', 1x 'ondemand'" + +# Two assigned resources, one on-demand, one preallocated +assert_assigned_ticket_count 2 "First check" + +# These tickets match all pools, but only prealloc are taken. +# First is assigned, second one stays unresolved till a very late state of this +# test script +get_ticket --tag A --tag B +get_ticket --tag A --tag B +remember_ticket_for_later=$get_ticket_result +get_ticket --tag A --tag B +remember_ticket_for_later2=$get_ticket_result +sleep 3 + +# Three assigned resources, one on-demand, two preallocated +assert_assigned_ticket_count 3 "Second check" + +wait_num=3 wait_up_resources "2x 'basic', 1x 'ondemand' (nothing changed)" + +assert_ticket_count 7 "check before harder tasks" +assert_resource_count 3 "check before harder tasks" + +# "ondemand" tag itself prioritizes ondemand2 pool +get_ticket --tag A --tag B --tag ondemand --sandbox=take_one +on_demand_ticket1=$get_ticket_result +# .. but with "priority" tag it flips back to ondemand1 +get_ticket --tag A --tag B --tag ondemand --tag priority --sandbox=take_two +on_demand_ticket2=$get_ticket_result + +wait_ticket_resolved "$on_demand_ticket1" "in ondemand2" +on_demand_resource1=$wait_ticket_resolved_result_resource +wait_ticket_resolved "$on_demand_ticket2" "in ondemand1" +on_demand_resource2=$wait_ticket_resolved_result_resource + +# Priorities are respected while allocating resources! +assert_starts_with "$on_demand_resource1" "ondemand2_" "Checking ondemand2 pool is assigned" +assert_starts_with "$on_demand_resource2" "ondemand1_" "Checking ondemand1 pool is assigned" + +wait_num=5 wait_up_resources "two more on-demand machines" + +# Check that first-defined pool (ondemand1) is chosen first over the latter +# (determinism test) +get_ticket --tag A --tag B --tag ondemand_not_prioritized +wait_ticket_resolved "$get_ticket_result" "on demand, but not prioritized" +assert_starts_with "$wait_ticket_resolved_result_resource" "ondemand1_" \ + "Not prioritized ticket should get the first defined pool" + +# This one starts the last have Check that first-defined pool is chosen first over the latter (determinism) +get_ticket --tag A --tag B --tag ondemand2 --sandbox=retake +last_resolved_ticket=$get_ticket_result +# This two ticket are going to wait till the NEXT one is resolved because it is not +# using on-demand tag. The C2 ticket explicitly selects ondemand2 pool, the +# next ticket can be handled by any pool actually, but all the other resources +# are allocated in 'basic' to take care of. +get_ticket --tag A --tag B --tag C2 --sandbox=retake +unresolved_ticket_not_on_demand=$get_ticket_result +get_ticket --tag A --tag B --sandbox=retake +unresolved_ticket_not_on_demand_basic=$get_ticket_result +# This ticket is going to wait till the $last_resolved_ticket is closed, +# blocking the previous because we reached the "max" limit (so we are not +# starting new resources) +get_ticket --tag A --tag B --tag ondemand2 --sandbox=retake +unresolved_ticket=$get_ticket_result + +wait_ticket_resolved "$last_resolved_ticket" "the last one allocated" +last_resource=$wait_ticket_resolved_result_resource +assert_starts_with "$last_resource" "ondemand2_" "explicit ondemand2" +assert_resource_count 7 "check before harder tasks" + +# Try re-taking of on-demand resources by on-demand task that was waiting on the +# "max" limit. There are other two tickets that we could take by this resource, +# but on-demand ticket has more priority. +client ticket-close "$last_resolved_ticket" +wait_ticket_resolved "$unresolved_ticket" "taking a waiting on demand task" + +# Try re-taking of on-demand resources even by non-on-demand task +client ticket-close "$unresolved_ticket" +wait_ticket_resolved "$unresolved_ticket_not_on_demand" "taking non-on-demand task" + +# Try re-taking of on-demand resources by non-on-demand task that can be handled +# by non-on-demand pool, too (while the basic pool is full). +client ticket-close "$unresolved_ticket_not_on_demand" +wait_ticket_resolved "$unresolved_ticket_not_on_demand_basic" "taking also task for basic pool" + +# We still get the same resource ID! +assert "check retaken resource" "test $last_resource = $wait_ticket_resolved_result_resource" +assert_resource_count 7 "final check before harder tasks" + +# This triggers a resource startup in ondemand1 pool (ondemand2 is full). We +# give-up, the resource is started anyway and it takes the old unresolved +# ticket $remember_ticket_for_later +get_ticket --tag A --tag B --tag ondemand +sleep 2 +client ticket-close "$get_ticket_result" + +wait_ticket_resolved "$remember_ticket_for_later" "additional ondemand resource taking old basic ticket" +assert_starts_with "$wait_ticket_resolved_result_resource" "ondemand1_" \ + "Checking ondemand1 pool is assigned to $remember_ticket_for_later" + +# Close a very old ticket, which deallocates resource in 'basic', new one is +# started and it finally takes '$remember_ticket_for_later2' +client ticket-close "$ticket_in_basic_prealloc" + +wait_ticket_resolved "$remember_ticket_for_later2" "take ticket by basic pool again" +assert_starts_with "$wait_ticket_resolved_result_resource" "basic_" + +# Drop remaining tickets, deallocate resources (one 'basic' preallocated +# remains). +for i in 1 2 4 5 6 7 8 9 10 13; do + client ticket-close $i +done +assert_ticket_count 0 "all should be deallocated," + +# These two tickets are allocated to a and new basic resources +get_ticket --tag A --tag B +check=$get_ticket_result +get_ticket --tag A --tag B +wait_ticket_resolved "$get_ticket_result" "fresh new basic resource #1" +assert_starts_with "$wait_ticket_resolved_result_resource" "basic_" +wait_ticket_resolved "$check" "fresh new basic resource #2" +assert_starts_with "$wait_ticket_resolved_result_resource" "basic_" +sleep 7 # waste the reuse_opportunity_time +assert_resource_count 2 "only two basic resources again" + +# vi: ft=sh diff --git a/tests/test_manager.py b/tests/test_manager.py index 4ab9780..3108402 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -12,11 +12,23 @@ from resallocserver import models from resallocserver.app import session_scope from resallocserver.main import Synchronizer -from resallocserver.manager import Manager +from resallocserver.manager import Manager, CrossPoolConfig, normalize_tags from tests import ResallocTestCase +def test_normalize_tags(): + """ Test that the normalize_tag() properly modifies the tags """ + tags = ["a"] + normalize_tags(tags) + assert tags == [{"name": "a", "priority": 0}] + tags = ["a", {"name": "b"}, {"name": "c", "priority": 10}] + normalize_tags(tags) + assert tags == [{"name": "a", "priority": 0}, + {"name": "b", "priority": 0}, + {"name": "c", "priority": 10}] + + class TestManager(ResallocTestCase): @pytest.mark.parametrize( @@ -73,7 +85,7 @@ def test_resource_tag_priority(self, passengers, expected_car): sync = Synchronizer() manager = Manager(sync) - manager._assign_tickets() + manager._assign_tickets(CrossPoolConfig(set())) with session_scope() as session: ticket = session.query(models.Ticket).get(1)