Skip to content

Commit

Permalink
Merge pull request #5695 from b10n1k/schedulig_mm_158146
Browse files Browse the repository at this point in the history
Provide parallel_one_host_only via workers config file
  • Loading branch information
Martchus authored Jul 3, 2024
2 parents 7a3653d + 0e31752 commit cb1357e
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 56 deletions.
4 changes: 2 additions & 2 deletions docs/WritingTests.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,8 @@ eventually workers can be held back for the cluster.
It is possible to ensure that all jobs within the same _parallel_ cluster are
executed on the same worker host. This is useful for connecting the SUTs without
having to connect the physical worker hosts. Use `PARALLEL_ONE_HOST_ONLY=1` to
enable this. Note that adding this setting in `workers.ini` has currently *no*
effect.
enable this. This setting can be applied as a test variable during the time
of scheduling as well as in the worker configuration file `workers.ini`.

WARNING: You need to provide enough worker slots on single worker hosts to fit
an entire cluster. So this feature is mainly intended to workaround situations
Expand Down
4 changes: 4 additions & 0 deletions etc/openqa/workers.ini
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
# space separated list of webuis to connect to (empty defaults to localhost)
#HOST = http://openqa.example.host

# Enable to restrict any multi-machine jobs taken by this worker host to only
# run here. Disabled by default.
#PARALLEL_ONE_HOST_ONLY = 0

# Specify a cache directory for assets and tests to sync them automatically via
# http/rsync; the specified path is just an example but what you would usually
# use on a normal setup
Expand Down
47 changes: 33 additions & 14 deletions lib/OpenQA/Scheduler/Model/Jobs.pm
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,23 @@ sub determine_scheduled_jobs ($self) {
return $self->scheduled_jobs;
}

sub _allocate_worker_slot ($self, $allocated_workers, $worker, $job_info) {
$allocated_workers->{$worker->id} = $job_info->{id};

# set "one_host_only_via_worker"-flag for whole cluster if the allocated worker slot has
# the PARALLEL_ONE_HOST_ONLY property
# note: This is done so that _pick_siblings_of_running() can take it into account. To be able to reset this flag
# on the next tick a separate flag is used here (and not just "one_host_only").
return undef unless $worker->get_property('PARALLEL_ONE_HOST_ONLY');
my $scheduled_jobs = $self->scheduled_jobs;
my $cluster_jobs = $job_info->{cluster_jobs};
$job_info->{one_host_only_via_worker} = 1;
for my $job_id (keys %$cluster_jobs) {
next unless my $cluster_job = $scheduled_jobs->{$job_id};
$cluster_job->{one_host_only_via_worker} = 1;
}
}

sub _allocate_jobs ($self, $free_workers) {
my ($allocated_workers, $allocated_jobs) = ({}, {});
my $scheduled_jobs = $self->scheduled_jobs;
Expand Down Expand Up @@ -102,19 +119,19 @@ sub _allocate_jobs ($self, $free_workers) {
# a bonus on their priority
my $prio = $j->{priority}; # we only consider the priority of the main job
for my $worker (keys %taken) {
my $ji = $taken{$worker};
_allocate_worker_with_priority($prio, $ji, $j, $allocated_workers, $worker);
my ($picked_worker, $job_info) = @{$taken{$worker}};
$self->_allocate_worker_with_priority($prio, $job_info, $j, $allocated_workers, $picked_worker);
}
%taken = ();
last;
}
$taken{$picked_worker->id} = $sub_job;
$taken{$picked_worker->id} = [$picked_worker, $sub_job];
}
for my $worker (keys %taken) {
my $ji = $taken{$worker};
$allocated_workers->{$worker} = $ji->{id};
$allocated_jobs->{$ji->{id}}
= {job => $ji->{id}, worker => $worker, priority_offset => \$j->{priority_offset}};
for my $picked_worker_id (keys %taken) {
my ($picked_worker, $job_info) = @{$taken{$picked_worker_id}};
$self->_allocate_worker_slot($allocated_workers, $picked_worker, $job_info);
$allocated_jobs->{$job_info->{id}}
= {job => $job_info->{id}, worker => $picked_worker_id, priority_offset => \$j->{priority_offset}};
}
# we make sure we schedule clusters no matter what,
# but we stop if we're over the limit
Expand All @@ -129,20 +146,21 @@ sub _allocate_jobs ($self, $free_workers) {
return ($allocated_workers, $allocated_jobs);
}

sub _allocate_worker_with_priority ($prio, $ji, $j, $allocated_workers, $worker) {
sub _allocate_worker_with_priority ($self, $prio, $job_info, $j, $allocated_workers, $worker) {
if ($prio > 0) {
# this means we will by default increase the offset per half-assigned job,
# so if we miss 1/25 jobs, we'll bump by +24
log_debug "Discarding job $ji->{id} (with priority $prio) due to incomplete parallel cluster"
log_debug "Discarding job $job_info->{id} (with priority $prio) due to incomplete parallel cluster"
. ', reducing priority by '
. STARVATION_PROTECTION_PRIORITY_OFFSET;
$j->{priority_offset} += STARVATION_PROTECTION_PRIORITY_OFFSET;
}
else {
# don't "take" the worker, but make sure it's not
# used for another job and stays around
log_debug "Holding worker $worker for job $ji->{id} to avoid starvation";
$allocated_workers->{$worker} = $ji->{id};
my $worker_id = $worker->id;
log_debug "Holding worker $worker_id for job $job_info->{id} to avoid starvation";
$self->_allocate_worker_slot($allocated_workers, $worker, $job_info);
}
}

Expand Down Expand Up @@ -363,8 +381,8 @@ sub _pick_siblings_of_running ($self, $allocated_jobs, $allocated_workers) {
last unless $worker_host;
for my $w (@{$jobinfo->{matching_workers}}) {
next if $allocated_workers->{$w->id};
next if $jobinfo->{one_host_only} && $w->host ne $worker_host;
$allocated_workers->{$w->id} = $jobinfo->{id};
$self->_allocate_worker_slot($allocated_workers, $w, $jobinfo);
next if ($jobinfo->{one_host_only} || $jobinfo->{one_host_only_via_worker}) && ($w->host ne $worker_host);
$allocated_jobs->{$jobinfo->{id}} = {job => $jobinfo->{id}, worker => $w->id};
}
}
Expand Down Expand Up @@ -439,6 +457,7 @@ sub _update_scheduled_jobs ($self) {
# it's the same cluster for all, so share
$cluster_infos{$_} = $cluster_jobs for keys %$cluster_jobs;
}
$info->{one_host_only_via_worker} = 0;
$info->{one_host_only} = any { $_->{one_host_only} } values %$cluster_jobs;
$scheduled_jobs->{$job->id} = $info;
}
Expand Down
88 changes: 63 additions & 25 deletions lib/OpenQA/Scheduler/WorkerSlotPicker.pm
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,35 @@ use Mojo::Base -base, -signatures;

use List::Util qw(any);

sub new ($class, $to_be_scheduled) {
my $self = $class->SUPER::new;
sub new ($class, $to_be_scheduled) { $class->SUPER::new->reset($to_be_scheduled) }

sub reset ($self, $to_be_scheduled) {
$self->{_to_be_scheduled} = $to_be_scheduled;
$self->{_matching_worker_slots_by_host} = {};
$self->{_visited_worker_slots_by_id} = {};
$self->{_picked_matching_worker_slots} = [];
$self->{_one_host_only} = 0;
return $self;
}

sub _pick_next_slot_for_host_and_job ($self, $matching_worker_slots_for_host, $matching_worker, $job) {
push @$matching_worker_slots_for_host, $matching_worker;
$self->{_visited_worker_slots_by_id}->{$matching_worker->id} = $job;
return @$matching_worker_slots_for_host < @{$self->{_to_be_scheduled}}
? undef
: ($self->{_picked_matching_worker_slots} = $matching_worker_slots_for_host);
$self->{_one_host_only} ||= $self->_is_one_host_only($matching_worker);
return undef if @$matching_worker_slots_for_host < @{$self->{_to_be_scheduled}};
$self->{_picked_matching_worker_slots} = $matching_worker_slots_for_host;
}

sub _matching_worker_slots_for_host ($self, $host) { $self->{_matching_worker_slots_by_host}->{$host} //= [] }

sub _worker_host ($self, $worker) { $self->{_any_host} // $worker->host }

sub _is_one_host_only ($self, $worker) {
my $cache = $self->{_is_one_host_only} //= {};
my $worker_id = $worker->id;
$cache->{$worker_id} //= $worker->get_property('PARALLEL_ONE_HOST_ONLY');
}

sub _reduce_matching_workers ($self) {
# reduce the matching workers of each job to a single slot on the picked worker host's slots
# note: If no single host provides enough matching slots ($picked_matching_worker_slots is still an
Expand All @@ -38,13 +48,21 @@ sub _reduce_matching_workers ($self) {
return $picked_matching_worker_slots;
}

sub _id_or_skip ($self, $worker, $visited_worker_slots_by_id) {
my $id = $worker->id;
# skip slots that have already been picked
return undef if exists $visited_worker_slots_by_id->{$id};
# skip slots with "PARALLEL_ONE_HOST_ONLY" to try picking on any other hosts instead
return undef if $self->{_any_host} && $self->_is_one_host_only($worker);
return $id;
}

sub _pick_one_matching_slot_per_host($self, $job) {
my $visited_worker_slots_by_id = $self->{_visited_worker_slots_by_id};
my %visited_worker_slots_by_host;
for my $matching_worker (@{$job->{matching_workers}}) {
my $id = $matching_worker->id;
next if exists $visited_worker_slots_by_id->{$id}; # skip slots that have already been picked
my $host = $matching_worker->host;
next unless my $id = $self->_id_or_skip($matching_worker, $visited_worker_slots_by_id);
my $host = $self->_worker_host($matching_worker);
next if $visited_worker_slots_by_host{$host}++; # skip to pick only one slot per host
last
if $self->_pick_next_slot_for_host_and_job($self->_matching_worker_slots_for_host($host),
Expand All @@ -55,7 +73,7 @@ sub _pick_one_matching_slot_per_host($self, $job) {

sub _swap_slot_with_competitor_job ($self, $visited_worker_slots_by_host, $matching_worker) {
# skip hosts we were able to pick a slot on
my $host = $matching_worker->host;
my $host = $self->_worker_host($matching_worker);
return 0 if $visited_worker_slots_by_host->{$host};

# check the job we are competing with for this slot and see whether we might be able to swap picks by finding
Expand All @@ -66,10 +84,11 @@ sub _swap_slot_with_competitor_job ($self, $visited_worker_slots_by_host, $match
my $matching_worker_slots = $self->_matching_worker_slots_for_host($host);
for my $alternative_matching_worker (@{$competitor_job->{matching_workers}}) {
# check whether the competitor can use this slot alternatively
my $alternative_id = $alternative_matching_worker->id;
next if exists $visited_worker_slots_by_id->{$alternative_id}; # skip slots that have already been picked
next if $id == $alternative_id; # skip the competitor's current slot for this host
next if $alternative_matching_worker->host ne $host; # skip slots that are not on the relevant host
next unless my $alternative_id = $self->_id_or_skip($alternative_matching_worker, $visited_worker_slots_by_id);
# skip the competitor's current slot for this host
next if $id == $alternative_id;
# skip slots that are not on the relevant host
next if $self->{_one_host_only} && $self->_worker_host($alternative_matching_worker) ne $host;

# make the competitor job use the alternative we have just found
for (my $i = 0; $i != @$matching_worker_slots; ++$i) {
Expand All @@ -78,22 +97,14 @@ sub _swap_slot_with_competitor_job ($self, $visited_worker_slots_by_host, $match
last;
}
$visited_worker_slots_by_id->{$alternative_id} = $competitor_job;
$self->{_one_host_only} ||= $self->_is_one_host_only($alternative_matching_worker);
return 1;
}
return 0;
}

# reduces the matching workers of the jobs to be scheduled for pinning parallel jobs to single host
sub pick_slots_with_common_worker_host ($self) {
# return early if we don't need to care about picking a common host for the given set of jobs
my $to_be_scheduled = $self->{_to_be_scheduled};
return undef if @$to_be_scheduled < 2 || !(any { $_->{one_host_only} } @$to_be_scheduled);

# let each job pick one slot per host
my $matching_worker_slots_by_host = $self->{_matching_worker_slots_by_host};
my $visited_worker_slots_by_id = $self->{_visited_worker_slots_by_id};
for my $job (@$to_be_scheduled) {
my $matching_workers = $job->{matching_workers};
sub _pick_one_slot_per_host_for_each_job ($self, $jobs) {
for my $job (@$jobs) {
# go through the list of matching worker slots and pick one slot per host
my $visited_worker_slots_by_host = $self->_pick_one_matching_slot_per_host($job);

Expand All @@ -106,8 +117,35 @@ sub pick_slots_with_common_worker_host ($self) {
$matching_worker, $job);
}
}
}

# reduces the matching workers of the jobs to be scheduled for pinning parallel jobs to single host
sub pick_slots_with_common_worker_host ($self) {
# return early if we don't need to care about picking a common host for the given set of jobs
my $to_be_scheduled = $self->{_to_be_scheduled};
return undef if @$to_be_scheduled < 2;

# determine whether only slots with a common worker host must be picked as per job settings
my $one_host_only_per_job_settings = $self->{_one_host_only} = any { $_->{one_host_only} } @$to_be_scheduled;

# let each job pick one slot per host
$self->_pick_one_slot_per_host_for_each_job($to_be_scheduled);

# do not reduce worker slots if there is no "PARALLEL_ONE_HOST_ONLY" job setting or worker property present
return undef unless my $one_host_only = $self->{_one_host_only};

# try assignment again without taking workers that have the "PARALLEL_ONE_HOST_ONLY" constraint into account
# note: The algorithm so far took the first best worker slots it could find - including slots with the
# "PARALLEL_ONE_HOST_ONLY" constraint. Therefore the presence of a single matching worker slot with the
# "PARALLEL_ONE_HOST_ONLY" flag could easily prevent any matching jobs with parallel dependencies from
# being scheduled at all. To avoid this situation, let's re-run the algorithm without those worker slots.
if (!@{$self->{_picked_matching_worker_slots}} && !$one_host_only_per_job_settings) {
$self->reset($to_be_scheduled);
$self->{_any_host} = 'any';
$self->_pick_one_slot_per_host_for_each_job($to_be_scheduled);
}

$self->_reduce_matching_workers;
return $self->_reduce_matching_workers;
}

1;
12 changes: 3 additions & 9 deletions lib/OpenQA/Schema/Result/Workers.pm
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,8 @@ sub seen ($self, $options = {}) {
$self->update($data);
}

# update worker's capabilities
# param: workerid , workercaps
sub update_caps {
my ($self, $workercaps) = @_;

for my $cap (keys %{$workercaps}) {
$self->set_property(uc $cap, $workercaps->{$cap}) if $workercaps->{$cap};
}
}
# update the properties of the worker with the specified capabilities
sub update_caps ($self, $workercaps) { $self->set_property(uc $_, $workercaps->{$_}) for keys %$workercaps }

sub get_property {
my ($self, $key) = @_;
Expand All @@ -108,6 +101,7 @@ sub delete_properties {
sub set_property {

my ($self, $key, $val) = @_;
return $self->properties->search({key => $key})->delete unless defined $val;

my $r = $self->properties->find_or_new(
{
Expand Down
7 changes: 3 additions & 4 deletions lib/OpenQA/WebAPI/Controller/API/V1/Worker.pm
Original file line number Diff line number Diff line change
Expand Up @@ -173,18 +173,17 @@ sub create {
my ($self) = @_;
my $validation = $self->validation;
my @validation_params
= qw(cpu_arch cpu_modelname cpu_opmode cpu_flags mem_max isotovideo_interface_version websocket_api_version worker_class);
= qw(cpu_arch cpu_modelname cpu_opmode cpu_flags mem_max isotovideo_interface_version websocket_api_version worker_class parallel_one_host_only);
$validation->required($_) for qw(host instance cpu_arch mem_max worker_class);
$validation->optional($_)
for qw(cpu_modelname cpu_opmode cpu_flags isotovideo_interface_version job_id websocket_api_version);
$validation->optional($_) for qw(cpu_modelname cpu_opmode cpu_flags isotovideo_interface_version job_id
websocket_api_version parallel_one_host_only);
return $self->reply->validation_error({format => 'json'}) if $validation->has_error;

my $host = $validation->param('host');
my $instance = $validation->param('instance');
my $job_ids = $validation->every_param('job_id');
my $caps = {};
$caps->{$_} = $validation->param($_) for @validation_params;

my $id;
try {
$id = $self->_register($self->schema, $host, $instance, $caps, $job_ids);
Expand Down
2 changes: 1 addition & 1 deletion lib/OpenQA/Worker.pm
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ sub capabilities ($self) {
= join(',', map { 'qemu_' . $_ } @{$supported_archs_by_cpu_archs{$caps->{cpu_arch}} // [$caps->{cpu_arch}]});
# TODO: check installed qemu and kvm?
}

$caps->{parallel_one_host_only} = $global_settings->{PARALLEL_ONE_HOST_ONLY};
return $self->{_caps} = $caps;
}

Expand Down
38 changes: 37 additions & 1 deletion t/04-scheduler.t
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ subtest 'parallel pinning' => sub {

subtest 'no slots on common host picked when pinning not explicitly enabled' => sub {
$slots = OpenQA::Scheduler::WorkerSlotPicker->new(\@to_be_scheduled)->pick_slots_with_common_worker_host;
is_deeply $slots, undef, 'undef returned if not at least one job has pinning enabled';
is_deeply $slots, undef, 'slots not reduced if not at least one job has pinning enabled';
is_deeply $to_be_scheduled[0]->{matching_workers}, [$ws1, $ws2], 'slots of job 1 not altered';
is_deeply $to_be_scheduled[1]->{matching_workers}, [$ws1, $ws2], 'slots of job 2 not altered';
} or diag $explain_slots->();
Expand Down Expand Up @@ -675,6 +675,42 @@ subtest 'parallel pinning' => sub {
is_deeply $to_be_scheduled[0]->{matching_workers}, [$ws1], 'slot 1 assigned to job 1';
is_deeply $to_be_scheduled[1]->{matching_workers}, [$ws2], 'slot 2 assigned to job 2';
} or diag $explain_slots->();

subtest 'setting PARALLEL_ONE_HOST_ONLY as worker property prevents cross-host scheduling' => sub {
$ws1->update({host => 'host1', instance => 1});
$ws2->update({host => 'host2', instance => 1});
$ws2->set_property(PARALLEL_ONE_HOST_ONLY => 1);
@to_be_scheduled = ({id => 1, matching_workers => [$ws1, $ws2]}, {id => 2, matching_workers => [$ws1, $ws2]});

my $picker = OpenQA::Scheduler::WorkerSlotPicker->new(\@to_be_scheduled);
$slots = $picker->pick_slots_with_common_worker_host;
is_deeply $slots, [], 'empty array returned because no single host has enough matching slots';
is_deeply $to_be_scheduled[0]->{matching_workers}, [], 'no slots assigned to job 1';
is_deeply $to_be_scheduled[1]->{matching_workers}, [], 'no slots assigned to job 2';
} or diag $explain_slots->();

subtest 'scheduling on one host still possible when PARALLEL_ONE_HOST_ONLY set as worker property' => sub {
my @slots = ($ws1, $ws2, $ws3);
@to_be_scheduled = ({id => 1, matching_workers => [@slots]}, {id => 2, matching_workers => [@slots]});

my $picker = OpenQA::Scheduler::WorkerSlotPicker->new(\@to_be_scheduled);
$slots = $picker->pick_slots_with_common_worker_host;
is_deeply $slots, [$ws2, $ws3], 'slots from host with enough matching slots picked';
is_deeply $to_be_scheduled[0]->{matching_workers}, [$ws2], 'slot 2 assigned to job 1';
is_deeply $to_be_scheduled[1]->{matching_workers}, [$ws3], 'slot 3 assigned to job 2';
is $picker->{_one_host_only}, 1, 'picking slots on one host only';
} or diag $explain_slots->();

subtest 'PARALLEL_ONE_HOST_ONLY worker property does not prevent other worker slots to be used' => sub {
$ws3->update({host => 'host3', instance => 1});
@to_be_scheduled = ({id => 1, matching_workers => [$ws2, $ws1, $ws3]}, {id => 2, matching_workers => [$ws3]});

my $picker = OpenQA::Scheduler::WorkerSlotPicker->new(\@to_be_scheduled);
$slots = $picker->pick_slots_with_common_worker_host;
is_deeply $slots, [$ws1, $ws3], 'slots from host without PARALLEL_ONE_HOST_ONLY picked';
is_deeply $to_be_scheduled[0]->{matching_workers}, [$ws1], 'slot 2 assigned to job 1';
is_deeply $to_be_scheduled[1]->{matching_workers}, [$ws3], 'slot 3 assigned to job 2';
} or diag $explain_slots->();
};

done_testing;
Loading

0 comments on commit cb1357e

Please sign in to comment.