diff --git a/lib/OpenQA/Scheduler/Model/Jobs.pm b/lib/OpenQA/Scheduler/Model/Jobs.pm index 3f6a26562408..3ce75be5b713 100644 --- a/lib/OpenQA/Scheduler/Model/Jobs.pm +++ b/lib/OpenQA/Scheduler/Model/Jobs.pm @@ -38,6 +38,23 @@ sub determine_scheduled_jobs ($self) { return $self->scheduled_jobs; } +sub _allocate_worker_slot ($self, $allocated_workers, $worker, $job_info) { + $allocated_workers->{$worker->id} = $job_info->{id}; + + # set "one_host_only_via_worker"-flag for whole cluster if the allocated worker slot has + # the PARALLEL_ONE_HOST_ONLY property + # note: This is done so that _pick_siblings_of_running() can take it into account. To be able to reset this flag + # on the next tick a separate flag is used here (and not just "one_host_only"). + return undef unless $worker->get_property('PARALLEL_ONE_HOST_ONLY'); + my $scheduled_jobs = $self->scheduled_jobs; + my $cluster_jobs = $job_info->{cluster_jobs}; + $job_info->{one_host_only_via_worker} = 1; + for my $job_id (keys %$cluster_jobs) { + next unless my $cluster_job = $scheduled_jobs->{$job_id}; + $cluster_job->{one_host_only_via_worker} = 1; + } +} + sub _allocate_jobs ($self, $free_workers) { my ($allocated_workers, $allocated_jobs) = ({}, {}); my $scheduled_jobs = $self->scheduled_jobs; @@ -102,19 +119,19 @@ sub _allocate_jobs ($self, $free_workers) { # a bonus on their priority my $prio = $j->{priority}; # we only consider the priority of the main job for my $worker (keys %taken) { - my $ji = $taken{$worker}; - _allocate_worker_with_priority($prio, $ji, $j, $allocated_workers, $worker); + my ($picked_worker, $job_info) = @{$taken{$worker}}; + $self->_allocate_worker_with_priority($prio, $job_info, $j, $allocated_workers, $picked_worker); } %taken = (); last; } - $taken{$picked_worker->id} = $sub_job; + $taken{$picked_worker->id} = [$picked_worker, $sub_job]; } - for my $worker (keys %taken) { - my $ji = $taken{$worker}; - $allocated_workers->{$worker} = $ji->{id}; - $allocated_jobs->{$ji->{id}} - = {job => $ji->{id}, worker => $worker, priority_offset => \$j->{priority_offset}}; + for my $picked_worker_id (keys %taken) { + my ($picked_worker, $job_info) = @{$taken{$picked_worker_id}}; + $self->_allocate_worker_slot($allocated_workers, $picked_worker, $job_info); + $allocated_jobs->{$job_info->{id}} + = {job => $job_info->{id}, worker => $picked_worker_id, priority_offset => \$j->{priority_offset}}; } # we make sure we schedule clusters no matter what, # but we stop if we're over the limit @@ -129,11 +146,11 @@ sub _allocate_jobs ($self, $free_workers) { return ($allocated_workers, $allocated_jobs); } -sub _allocate_worker_with_priority ($prio, $ji, $j, $allocated_workers, $worker) { +sub _allocate_worker_with_priority ($self, $prio, $job_info, $j, $allocated_workers, $worker) { if ($prio > 0) { # this means we will by default increase the offset per half-assigned job, # so if we miss 1/25 jobs, we'll bump by +24 - log_debug "Discarding job $ji->{id} (with priority $prio) due to incomplete parallel cluster" + log_debug "Discarding job $job_info->{id} (with priority $prio) due to incomplete parallel cluster" . ', reducing priority by ' . STARVATION_PROTECTION_PRIORITY_OFFSET; $j->{priority_offset} += STARVATION_PROTECTION_PRIORITY_OFFSET; @@ -141,8 +158,9 @@ sub _allocate_worker_with_priority ($prio, $ji, $j, $allocated_workers, $worker) else { # don't "take" the worker, but make sure it's not # used for another job and stays around - log_debug "Holding worker $worker for job $ji->{id} to avoid starvation"; - $allocated_workers->{$worker} = $ji->{id}; + my $worker_id = $worker->id; + log_debug "Holding worker $worker_id for job $job_info->{id} to avoid starvation"; + $self->_allocate_worker_slot($allocated_workers, $worker, $job_info); } } @@ -363,8 +381,8 @@ sub _pick_siblings_of_running ($self, $allocated_jobs, $allocated_workers) { last unless $worker_host; for my $w (@{$jobinfo->{matching_workers}}) { next if $allocated_workers->{$w->id}; - next if $jobinfo->{one_host_only} && $w->host ne $worker_host; - $allocated_workers->{$w->id} = $jobinfo->{id}; + $self->_allocate_worker_slot($allocated_workers, $w, $jobinfo); + next if ($jobinfo->{one_host_only} || $jobinfo->{one_host_only_via_worker}) && ($w->host ne $worker_host); $allocated_jobs->{$jobinfo->{id}} = {job => $jobinfo->{id}, worker => $w->id}; } } @@ -439,6 +457,7 @@ sub _update_scheduled_jobs ($self) { # it's the same cluster for all, so share $cluster_infos{$_} = $cluster_jobs for keys %$cluster_jobs; } + $info->{one_host_only_via_worker} = 0; $info->{one_host_only} = any { $_->{one_host_only} } values %$cluster_jobs; $scheduled_jobs->{$job->id} = $info; } diff --git a/lib/OpenQA/Scheduler/WorkerSlotPicker.pm b/lib/OpenQA/Scheduler/WorkerSlotPicker.pm index e6bfba68270e..118a10e1df91 100644 --- a/lib/OpenQA/Scheduler/WorkerSlotPicker.pm +++ b/lib/OpenQA/Scheduler/WorkerSlotPicker.pm @@ -6,25 +6,35 @@ use Mojo::Base -base, -signatures; use List::Util qw(any); -sub new ($class, $to_be_scheduled) { - my $self = $class->SUPER::new; +sub new ($class, $to_be_scheduled) { $class->SUPER::new->reset($to_be_scheduled) } + +sub reset ($self, $to_be_scheduled) { $self->{_to_be_scheduled} = $to_be_scheduled; $self->{_matching_worker_slots_by_host} = {}; $self->{_visited_worker_slots_by_id} = {}; $self->{_picked_matching_worker_slots} = []; + $self->{_one_host_only} = 0; return $self; } sub _pick_next_slot_for_host_and_job ($self, $matching_worker_slots_for_host, $matching_worker, $job) { push @$matching_worker_slots_for_host, $matching_worker; $self->{_visited_worker_slots_by_id}->{$matching_worker->id} = $job; - return @$matching_worker_slots_for_host < @{$self->{_to_be_scheduled}} - ? undef - : ($self->{_picked_matching_worker_slots} = $matching_worker_slots_for_host); + $self->{_one_host_only} ||= $self->_is_one_host_only($matching_worker); + return undef if @$matching_worker_slots_for_host < @{$self->{_to_be_scheduled}}; + $self->{_picked_matching_worker_slots} = $matching_worker_slots_for_host; } sub _matching_worker_slots_for_host ($self, $host) { $self->{_matching_worker_slots_by_host}->{$host} //= [] } +sub _worker_host ($self, $worker) { $self->{_any_host} // $worker->host } + +sub _is_one_host_only ($self, $worker) { + my $cache = $self->{_is_one_host_only} //= {}; + my $worker_id = $worker->id; + $cache->{$worker_id} //= $worker->get_property('PARALLEL_ONE_HOST_ONLY'); +} + sub _reduce_matching_workers ($self) { # reduce the matching workers of each job to a single slot on the picked worker host's slots # note: If no single host provides enough matching slots ($picked_matching_worker_slots is still an @@ -38,13 +48,21 @@ sub _reduce_matching_workers ($self) { return $picked_matching_worker_slots; } +sub _id_or_skip ($self, $worker, $visited_worker_slots_by_id) { + my $id = $worker->id; + # skip slots that have already been picked + return undef if exists $visited_worker_slots_by_id->{$id}; + # skip slots with "PARALLEL_ONE_HOST_ONLY" to try picking on any other hosts instead + return undef if $self->{_any_host} && $self->_is_one_host_only($worker); + return $id; +} + sub _pick_one_matching_slot_per_host($self, $job) { my $visited_worker_slots_by_id = $self->{_visited_worker_slots_by_id}; my %visited_worker_slots_by_host; for my $matching_worker (@{$job->{matching_workers}}) { - my $id = $matching_worker->id; - next if exists $visited_worker_slots_by_id->{$id}; # skip slots that have already been picked - my $host = $matching_worker->host; + next unless my $id = $self->_id_or_skip($matching_worker, $visited_worker_slots_by_id); + my $host = $self->_worker_host($matching_worker); next if $visited_worker_slots_by_host{$host}++; # skip to pick only one slot per host last if $self->_pick_next_slot_for_host_and_job($self->_matching_worker_slots_for_host($host), @@ -55,7 +73,7 @@ sub _pick_one_matching_slot_per_host($self, $job) { sub _swap_slot_with_competitor_job ($self, $visited_worker_slots_by_host, $matching_worker) { # skip hosts we were able to pick a slot on - my $host = $matching_worker->host; + my $host = $self->_worker_host($matching_worker); return 0 if $visited_worker_slots_by_host->{$host}; # check the job we are competing with for this slot and see whether we might be able to swap picks by finding @@ -66,10 +84,11 @@ sub _swap_slot_with_competitor_job ($self, $visited_worker_slots_by_host, $match my $matching_worker_slots = $self->_matching_worker_slots_for_host($host); for my $alternative_matching_worker (@{$competitor_job->{matching_workers}}) { # check whether the competitor can use this slot alternatively - my $alternative_id = $alternative_matching_worker->id; - next if exists $visited_worker_slots_by_id->{$alternative_id}; # skip slots that have already been picked - next if $id == $alternative_id; # skip the competitor's current slot for this host - next if $alternative_matching_worker->host ne $host; # skip slots that are not on the relevant host + next unless my $alternative_id = $self->_id_or_skip($alternative_matching_worker, $visited_worker_slots_by_id); + # skip the competitor's current slot for this host + next if $id == $alternative_id; + # skip slots that are not on the relevant host + next if $self->{_one_host_only} && $self->_worker_host($alternative_matching_worker) ne $host; # make the competitor job use the alternative we have just found for (my $i = 0; $i != @$matching_worker_slots; ++$i) { @@ -78,21 +97,14 @@ sub _swap_slot_with_competitor_job ($self, $visited_worker_slots_by_host, $match last; } $visited_worker_slots_by_id->{$alternative_id} = $competitor_job; + $self->{_one_host_only} ||= $self->_is_one_host_only($alternative_matching_worker); return 1; } return 0; } -# reduces the matching workers of the jobs to be scheduled for pinning parallel jobs to single host -sub pick_slots_with_common_worker_host ($self) { - # return early if we don't need to care about picking a common host for the given set of jobs - my $to_be_scheduled = $self->{_to_be_scheduled}; - return undef if @$to_be_scheduled < 2 || !(any { $_->{one_host_only} } @$to_be_scheduled); - - # let each job pick one slot per host - my $matching_worker_slots_by_host = $self->{_matching_worker_slots_by_host}; - my $visited_worker_slots_by_id = $self->{_visited_worker_slots_by_id}; - for my $job (@$to_be_scheduled) { +sub _pick_one_slot_per_host_for_each_job ($self, $jobs) { + for my $job (@$jobs) { # go through the list of matching worker slots and pick one slot per host my $visited_worker_slots_by_host = $self->_pick_one_matching_slot_per_host($job); @@ -105,8 +117,35 @@ sub pick_slots_with_common_worker_host ($self) { $matching_worker, $job); } } +} + +# reduces the matching workers of the jobs to be scheduled for pinning parallel jobs to single host +sub pick_slots_with_common_worker_host ($self) { + # return early if we don't need to care about picking a common host for the given set of jobs + my $to_be_scheduled = $self->{_to_be_scheduled}; + return undef if @$to_be_scheduled < 2; + + # determine whether only slots with a common worker host must be picked as per job settings + my $one_host_only_per_job_settings = $self->{_one_host_only} = any { $_->{one_host_only} } @$to_be_scheduled; + + # let each job pick one slot per host + $self->_pick_one_slot_per_host_for_each_job($to_be_scheduled); + + # do not reduce worker slots if there is no "PARALLEL_ONE_HOST_ONLY" job setting or worker property present + return undef unless my $one_host_only = $self->{_one_host_only}; + + # try assignment again without taking workers that have the "PARALLEL_ONE_HOST_ONLY" constraint into account + # note: The algorithm so far took the first best worker slots it could find - including slots with the + # "PARALLEL_ONE_HOST_ONLY" constraint. Therefore the presence of a single matching worker slot with the + # "PARALLEL_ONE_HOST_ONLY" flag could easily prevent any matching jobs with parallel dependencies from + # being scheduled at all. To avoid this situation, let's re-run the algorithm without those worker slots. + if (!@{$self->{_picked_matching_worker_slots}} && !$one_host_only_per_job_settings) { + $self->reset($to_be_scheduled); + $self->{_any_host} = 'any'; + $self->_pick_one_slot_per_host_for_each_job($to_be_scheduled); + } - $self->_reduce_matching_workers; + return $self->_reduce_matching_workers; } 1; diff --git a/t/04-scheduler.t b/t/04-scheduler.t index 7f776c2b16c8..7f65f0a9c6d1 100644 --- a/t/04-scheduler.t +++ b/t/04-scheduler.t @@ -584,7 +584,7 @@ subtest 'parallel pinning' => sub { subtest 'no slots on common host picked when pinning not explicitly enabled' => sub { $slots = OpenQA::Scheduler::WorkerSlotPicker->new(\@to_be_scheduled)->pick_slots_with_common_worker_host; - is_deeply $slots, undef, 'undef returned if not at least one job has pinning enabled'; + is_deeply $slots, undef, 'slots not reduced if not at least one job has pinning enabled'; is_deeply $to_be_scheduled[0]->{matching_workers}, [$ws1, $ws2], 'slots of job 1 not altered'; is_deeply $to_be_scheduled[1]->{matching_workers}, [$ws1, $ws2], 'slots of job 2 not altered'; } or diag $explain_slots->(); @@ -675,6 +675,42 @@ subtest 'parallel pinning' => sub { is_deeply $to_be_scheduled[0]->{matching_workers}, [$ws1], 'slot 1 assigned to job 1'; is_deeply $to_be_scheduled[1]->{matching_workers}, [$ws2], 'slot 2 assigned to job 2'; } or diag $explain_slots->(); + + subtest 'setting PARALLEL_ONE_HOST_ONLY as worker property prevents cross-host scheduling' => sub { + $ws1->update({host => 'host1', instance => 1}); + $ws2->update({host => 'host2', instance => 1}); + $ws2->set_property(PARALLEL_ONE_HOST_ONLY => 1); + @to_be_scheduled = ({id => 1, matching_workers => [$ws1, $ws2]}, {id => 2, matching_workers => [$ws1, $ws2]}); + + my $picker = OpenQA::Scheduler::WorkerSlotPicker->new(\@to_be_scheduled); + $slots = $picker->pick_slots_with_common_worker_host; + is_deeply $slots, [], 'empty array returned because no single host has enough matching slots'; + is_deeply $to_be_scheduled[0]->{matching_workers}, [], 'no slots assigned to job 1'; + is_deeply $to_be_scheduled[1]->{matching_workers}, [], 'no slots assigned to job 2'; + } or diag $explain_slots->(); + + subtest 'scheduling on one host still possible when PARALLEL_ONE_HOST_ONLY set as worker property' => sub { + my @slots = ($ws1, $ws2, $ws3); + @to_be_scheduled = ({id => 1, matching_workers => [@slots]}, {id => 2, matching_workers => [@slots]}); + + my $picker = OpenQA::Scheduler::WorkerSlotPicker->new(\@to_be_scheduled); + $slots = $picker->pick_slots_with_common_worker_host; + is_deeply $slots, [$ws2, $ws3], 'slots from host with enough matching slots picked'; + is_deeply $to_be_scheduled[0]->{matching_workers}, [$ws2], 'slot 2 assigned to job 1'; + is_deeply $to_be_scheduled[1]->{matching_workers}, [$ws3], 'slot 3 assigned to job 2'; + is $picker->{_one_host_only}, 1, 'picking slots on one host only'; + } or diag $explain_slots->(); + + subtest 'PARALLEL_ONE_HOST_ONLY worker property does not prevent other worker slots to be used' => sub { + $ws3->update({host => 'host3', instance => 1}); + @to_be_scheduled = ({id => 1, matching_workers => [$ws2, $ws1, $ws3]}, {id => 2, matching_workers => [$ws3]}); + + my $picker = OpenQA::Scheduler::WorkerSlotPicker->new(\@to_be_scheduled); + $slots = $picker->pick_slots_with_common_worker_host; + is_deeply $slots, [$ws1, $ws3], 'slots from host without PARALLEL_ONE_HOST_ONLY picked'; + is_deeply $to_be_scheduled[0]->{matching_workers}, [$ws1], 'slot 2 assigned to job 1'; + is_deeply $to_be_scheduled[1]->{matching_workers}, [$ws3], 'slot 3 assigned to job 2'; + } or diag $explain_slots->(); }; done_testing; diff --git a/t/05-scheduler-dependencies.t b/t/05-scheduler-dependencies.t index e57f7fa101bb..0856cbd5bd83 100644 --- a/t/05-scheduler-dependencies.t +++ b/t/05-scheduler-dependencies.t @@ -1175,6 +1175,17 @@ subtest 'parallel siblings of running jobs are allocated' => sub { $schedule->_pick_siblings_of_running($allocated_jobs, $allocated_workers); ok exists $allocated_jobs->{99998}, 'job 99998 allocated' or diag explain $allocated_jobs; }; + subtest 'no jobs allocated with dependency pinning (via worker property) and mismatching hosts' => sub { + my ($allocated_jobs, $allocated_workers) = ({}, {}); + # pretend the job itself does not have the one_host_only-flag + delete $scheduled_jobs{99998}->{one_host_only}; + # pretend the worker slot being used has the PARALLEL_ONE_HOST_ONLY property + my $relevant_worker = $workers->find($worker_on_different_host_id); + $relevant_worker->set_property(PARALLEL_ONE_HOST_ONLY => 1); + $scheduled_jobs{99998}->{matching_workers} = [$relevant_worker]; + $schedule->_pick_siblings_of_running($allocated_jobs, $allocated_workers); + ok !exists $allocated_jobs->{99998}, 'job 99998 not allocated' or diag explain $allocated_jobs; + }; }; subtest 'PARALLEL_ONE_HOST_ONLY is taken into account when determining scheduled jobs' => sub {