Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide parallel_one_host_only via workers config file #5695

Merged
merged 4 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/WritingTests.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,8 @@ eventually workers can be held back for the cluster.
It is possible to ensure that all jobs within the same _parallel_ cluster are
executed on the same worker host. This is useful for connecting the SUTs without
having to connect the physical worker hosts. Use `PARALLEL_ONE_HOST_ONLY=1` to
enable this. Note that adding this setting in `workers.ini` has currently *no*
effect.
enable this. This setting can be applied as a test variable during the time
of scheduling as well as in the worker configuration file `workers.ini`.
Martchus marked this conversation as resolved.
Show resolved Hide resolved

WARNING: You need to provide enough worker slots on single worker hosts to fit
an entire cluster. So this feature is mainly intended to workaround situations
Expand Down
4 changes: 4 additions & 0 deletions etc/openqa/workers.ini
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
# space separated list of webuis to connect to (empty defaults to localhost)
#HOST = http://openqa.example.host

# Enable to restrict any multi-machine jobs taken by this worker host to only
# run here. Disabled by default.
#PARALLEL_ONE_HOST_ONLY = 0

# Specify a cache directory for assets and tests to sync them automatically via
# http/rsync; the specified path is just an example but what you would usually
# use on a normal setup
Expand Down
47 changes: 33 additions & 14 deletions lib/OpenQA/Scheduler/Model/Jobs.pm
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,23 @@ sub determine_scheduled_jobs ($self) {
return $self->scheduled_jobs;
}

sub _allocate_worker_slot ($self, $allocated_workers, $worker, $job_info) {
$allocated_workers->{$worker->id} = $job_info->{id};

# set "one_host_only_via_worker"-flag for whole cluster if the allocated worker slot has
# the PARALLEL_ONE_HOST_ONLY property
# note: This is done so that _pick_siblings_of_running() can take it into account. To be able to reset this flag
# on the next tick a separate flag is used here (and not just "one_host_only").
return undef unless $worker->get_property('PARALLEL_ONE_HOST_ONLY');
my $scheduled_jobs = $self->scheduled_jobs;
my $cluster_jobs = $job_info->{cluster_jobs};
$job_info->{one_host_only_via_worker} = 1;
for my $job_id (keys %$cluster_jobs) {
next unless my $cluster_job = $scheduled_jobs->{$job_id};
$cluster_job->{one_host_only_via_worker} = 1;
}
}

sub _allocate_jobs ($self, $free_workers) {
my ($allocated_workers, $allocated_jobs) = ({}, {});
my $scheduled_jobs = $self->scheduled_jobs;
Expand Down Expand Up @@ -102,19 +119,19 @@ sub _allocate_jobs ($self, $free_workers) {
# a bonus on their priority
my $prio = $j->{priority}; # we only consider the priority of the main job
for my $worker (keys %taken) {
my $ji = $taken{$worker};
_allocate_worker_with_priority($prio, $ji, $j, $allocated_workers, $worker);
my ($picked_worker, $job_info) = @{$taken{$worker}};
$self->_allocate_worker_with_priority($prio, $job_info, $j, $allocated_workers, $picked_worker);
}
%taken = ();
last;
}
$taken{$picked_worker->id} = $sub_job;
$taken{$picked_worker->id} = [$picked_worker, $sub_job];
}
for my $worker (keys %taken) {
my $ji = $taken{$worker};
$allocated_workers->{$worker} = $ji->{id};
$allocated_jobs->{$ji->{id}}
= {job => $ji->{id}, worker => $worker, priority_offset => \$j->{priority_offset}};
for my $picked_worker_id (keys %taken) {
my ($picked_worker, $job_info) = @{$taken{$picked_worker_id}};
$self->_allocate_worker_slot($allocated_workers, $picked_worker, $job_info);
$allocated_jobs->{$job_info->{id}}
= {job => $job_info->{id}, worker => $picked_worker_id, priority_offset => \$j->{priority_offset}};
}
# we make sure we schedule clusters no matter what,
# but we stop if we're over the limit
Expand All @@ -129,20 +146,21 @@ sub _allocate_jobs ($self, $free_workers) {
return ($allocated_workers, $allocated_jobs);
}

sub _allocate_worker_with_priority ($prio, $ji, $j, $allocated_workers, $worker) {
sub _allocate_worker_with_priority ($self, $prio, $job_info, $j, $allocated_workers, $worker) {
if ($prio > 0) {
# this means we will by default increase the offset per half-assigned job,
# so if we miss 1/25 jobs, we'll bump by +24
log_debug "Discarding job $ji->{id} (with priority $prio) due to incomplete parallel cluster"
log_debug "Discarding job $job_info->{id} (with priority $prio) due to incomplete parallel cluster"
. ', reducing priority by '
. STARVATION_PROTECTION_PRIORITY_OFFSET;
$j->{priority_offset} += STARVATION_PROTECTION_PRIORITY_OFFSET;
}
else {
# don't "take" the worker, but make sure it's not
# used for another job and stays around
log_debug "Holding worker $worker for job $ji->{id} to avoid starvation";
$allocated_workers->{$worker} = $ji->{id};
my $worker_id = $worker->id;
log_debug "Holding worker $worker_id for job $job_info->{id} to avoid starvation";
$self->_allocate_worker_slot($allocated_workers, $worker, $job_info);
}
}

Expand Down Expand Up @@ -363,8 +381,8 @@ sub _pick_siblings_of_running ($self, $allocated_jobs, $allocated_workers) {
last unless $worker_host;
for my $w (@{$jobinfo->{matching_workers}}) {
next if $allocated_workers->{$w->id};
next if $jobinfo->{one_host_only} && $w->host ne $worker_host;
$allocated_workers->{$w->id} = $jobinfo->{id};
$self->_allocate_worker_slot($allocated_workers, $w, $jobinfo);
next if ($jobinfo->{one_host_only} || $jobinfo->{one_host_only_via_worker}) && ($w->host ne $worker_host);
$allocated_jobs->{$jobinfo->{id}} = {job => $jobinfo->{id}, worker => $w->id};
}
}
Expand Down Expand Up @@ -439,6 +457,7 @@ sub _update_scheduled_jobs ($self) {
# it's the same cluster for all, so share
$cluster_infos{$_} = $cluster_jobs for keys %$cluster_jobs;
}
$info->{one_host_only_via_worker} = 0;
$info->{one_host_only} = any { $_->{one_host_only} } values %$cluster_jobs;
$scheduled_jobs->{$job->id} = $info;
}
Expand Down
88 changes: 63 additions & 25 deletions lib/OpenQA/Scheduler/WorkerSlotPicker.pm
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,35 @@ use Mojo::Base -base, -signatures;

use List::Util qw(any);

sub new ($class, $to_be_scheduled) {
my $self = $class->SUPER::new;
sub new ($class, $to_be_scheduled) { $class->SUPER::new->reset($to_be_scheduled) }

sub reset ($self, $to_be_scheduled) {
$self->{_to_be_scheduled} = $to_be_scheduled;
$self->{_matching_worker_slots_by_host} = {};
$self->{_visited_worker_slots_by_id} = {};
$self->{_picked_matching_worker_slots} = [];
$self->{_one_host_only} = 0;
return $self;
}

sub _pick_next_slot_for_host_and_job ($self, $matching_worker_slots_for_host, $matching_worker, $job) {
push @$matching_worker_slots_for_host, $matching_worker;
$self->{_visited_worker_slots_by_id}->{$matching_worker->id} = $job;
return @$matching_worker_slots_for_host < @{$self->{_to_be_scheduled}}
? undef
: ($self->{_picked_matching_worker_slots} = $matching_worker_slots_for_host);
$self->{_one_host_only} ||= $self->_is_one_host_only($matching_worker);
return undef if @$matching_worker_slots_for_host < @{$self->{_to_be_scheduled}};
$self->{_picked_matching_worker_slots} = $matching_worker_slots_for_host;
}

sub _matching_worker_slots_for_host ($self, $host) { $self->{_matching_worker_slots_by_host}->{$host} //= [] }

sub _worker_host ($self, $worker) { $self->{_any_host} // $worker->host }

sub _is_one_host_only ($self, $worker) {
my $cache = $self->{_is_one_host_only} //= {};
my $worker_id = $worker->id;
$cache->{$worker_id} //= $worker->get_property('PARALLEL_ONE_HOST_ONLY');
}

sub _reduce_matching_workers ($self) {
# reduce the matching workers of each job to a single slot on the picked worker host's slots
# note: If no single host provides enough matching slots ($picked_matching_worker_slots is still an
Expand All @@ -38,13 +48,21 @@ sub _reduce_matching_workers ($self) {
return $picked_matching_worker_slots;
}

sub _id_or_skip ($self, $worker, $visited_worker_slots_by_id) {
my $id = $worker->id;
# skip slots that have already been picked
return undef if exists $visited_worker_slots_by_id->{$id};
# skip slots with "PARALLEL_ONE_HOST_ONLY" to try picking on any other hosts instead
return undef if $self->{_any_host} && $self->_is_one_host_only($worker);
return $id;
}

sub _pick_one_matching_slot_per_host($self, $job) {
my $visited_worker_slots_by_id = $self->{_visited_worker_slots_by_id};
my %visited_worker_slots_by_host;
for my $matching_worker (@{$job->{matching_workers}}) {
my $id = $matching_worker->id;
next if exists $visited_worker_slots_by_id->{$id}; # skip slots that have already been picked
my $host = $matching_worker->host;
next unless my $id = $self->_id_or_skip($matching_worker, $visited_worker_slots_by_id);
my $host = $self->_worker_host($matching_worker);
next if $visited_worker_slots_by_host{$host}++; # skip to pick only one slot per host
last
if $self->_pick_next_slot_for_host_and_job($self->_matching_worker_slots_for_host($host),
Expand All @@ -55,7 +73,7 @@ sub _pick_one_matching_slot_per_host($self, $job) {

sub _swap_slot_with_competitor_job ($self, $visited_worker_slots_by_host, $matching_worker) {
# skip hosts we were able to pick a slot on
my $host = $matching_worker->host;
my $host = $self->_worker_host($matching_worker);
return 0 if $visited_worker_slots_by_host->{$host};

# check the job we are competing with for this slot and see whether we might be able to swap picks by finding
Expand All @@ -66,10 +84,11 @@ sub _swap_slot_with_competitor_job ($self, $visited_worker_slots_by_host, $match
my $matching_worker_slots = $self->_matching_worker_slots_for_host($host);
for my $alternative_matching_worker (@{$competitor_job->{matching_workers}}) {
# check whether the competitor can use this slot alternatively
my $alternative_id = $alternative_matching_worker->id;
next if exists $visited_worker_slots_by_id->{$alternative_id}; # skip slots that have already been picked
next if $id == $alternative_id; # skip the competitor's current slot for this host
next if $alternative_matching_worker->host ne $host; # skip slots that are not on the relevant host
next unless my $alternative_id = $self->_id_or_skip($alternative_matching_worker, $visited_worker_slots_by_id);
# skip the competitor's current slot for this host
next if $id == $alternative_id;
# skip slots that are not on the relevant host
next if $self->{_one_host_only} && $self->_worker_host($alternative_matching_worker) ne $host;

# make the competitor job use the alternative we have just found
for (my $i = 0; $i != @$matching_worker_slots; ++$i) {
Expand All @@ -78,22 +97,14 @@ sub _swap_slot_with_competitor_job ($self, $visited_worker_slots_by_host, $match
last;
}
$visited_worker_slots_by_id->{$alternative_id} = $competitor_job;
$self->{_one_host_only} ||= $self->_is_one_host_only($alternative_matching_worker);
return 1;
}
return 0;
}

# reduces the matching workers of the jobs to be scheduled for pinning parallel jobs to single host
sub pick_slots_with_common_worker_host ($self) {
# return early if we don't need to care about picking a common host for the given set of jobs
my $to_be_scheduled = $self->{_to_be_scheduled};
return undef if @$to_be_scheduled < 2 || !(any { $_->{one_host_only} } @$to_be_scheduled);

# let each job pick one slot per host
my $matching_worker_slots_by_host = $self->{_matching_worker_slots_by_host};
my $visited_worker_slots_by_id = $self->{_visited_worker_slots_by_id};
for my $job (@$to_be_scheduled) {
my $matching_workers = $job->{matching_workers};
sub _pick_one_slot_per_host_for_each_job ($self, $jobs) {
for my $job (@$jobs) {
# go through the list of matching worker slots and pick one slot per host
my $visited_worker_slots_by_host = $self->_pick_one_matching_slot_per_host($job);

Expand All @@ -106,8 +117,35 @@ sub pick_slots_with_common_worker_host ($self) {
$matching_worker, $job);
}
}
}

# reduces the matching workers of the jobs to be scheduled for pinning parallel jobs to single host
sub pick_slots_with_common_worker_host ($self) {
# return early if we don't need to care about picking a common host for the given set of jobs
my $to_be_scheduled = $self->{_to_be_scheduled};
return undef if @$to_be_scheduled < 2;

# determine whether only slots with a common worker host must be picked as per job settings
my $one_host_only_per_job_settings = $self->{_one_host_only} = any { $_->{one_host_only} } @$to_be_scheduled;

# let each job pick one slot per host
$self->_pick_one_slot_per_host_for_each_job($to_be_scheduled);

# do not reduce worker slots if there is no "PARALLEL_ONE_HOST_ONLY" job setting or worker property present
return undef unless my $one_host_only = $self->{_one_host_only};

# try assignment again without taking workers that have the "PARALLEL_ONE_HOST_ONLY" constraint into account
# note: The algorithm so far took the first best worker slots it could find - including slots with the
# "PARALLEL_ONE_HOST_ONLY" constraint. Therefore the presence of a single matching worker slot with the
# "PARALLEL_ONE_HOST_ONLY" flag could easily prevent any matching jobs with parallel dependencies from
# being scheduled at all. To avoid this situation, let's re-run the algorithm without those worker slots.
if (!@{$self->{_picked_matching_worker_slots}} && !$one_host_only_per_job_settings) {
$self->reset($to_be_scheduled);
$self->{_any_host} = 'any';
$self->_pick_one_slot_per_host_for_each_job($to_be_scheduled);
}

$self->_reduce_matching_workers;
return $self->_reduce_matching_workers;
}

1;
12 changes: 3 additions & 9 deletions lib/OpenQA/Schema/Result/Workers.pm
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,8 @@ sub seen ($self, $options = {}) {
$self->update($data);
}

# update worker's capabilities
# param: workerid , workercaps
sub update_caps {
my ($self, $workercaps) = @_;

for my $cap (keys %{$workercaps}) {
$self->set_property(uc $cap, $workercaps->{$cap}) if $workercaps->{$cap};
}
}
# update the properties of the worker with the specified capabilities
sub update_caps ($self, $workercaps) { $self->set_property(uc $_, $workercaps->{$_}) for keys %$workercaps }

sub get_property {
my ($self, $key) = @_;
Expand All @@ -108,6 +101,7 @@ sub delete_properties {
sub set_property {

my ($self, $key, $val) = @_;
return $self->properties->search({key => $key})->delete unless defined $val;

my $r = $self->properties->find_or_new(
{
Expand Down
7 changes: 3 additions & 4 deletions lib/OpenQA/WebAPI/Controller/API/V1/Worker.pm
Original file line number Diff line number Diff line change
Expand Up @@ -173,18 +173,17 @@ sub create {
my ($self) = @_;
my $validation = $self->validation;
my @validation_params
= qw(cpu_arch cpu_modelname cpu_opmode cpu_flags mem_max isotovideo_interface_version websocket_api_version worker_class);
= qw(cpu_arch cpu_modelname cpu_opmode cpu_flags mem_max isotovideo_interface_version websocket_api_version worker_class parallel_one_host_only);
$validation->required($_) for qw(host instance cpu_arch mem_max worker_class);
$validation->optional($_)
for qw(cpu_modelname cpu_opmode cpu_flags isotovideo_interface_version job_id websocket_api_version);
$validation->optional($_) for qw(cpu_modelname cpu_opmode cpu_flags isotovideo_interface_version job_id
websocket_api_version parallel_one_host_only);
return $self->reply->validation_error({format => 'json'}) if $validation->has_error;

my $host = $validation->param('host');
my $instance = $validation->param('instance');
my $job_ids = $validation->every_param('job_id');
my $caps = {};
$caps->{$_} = $validation->param($_) for @validation_params;

my $id;
try {
$id = $self->_register($self->schema, $host, $instance, $caps, $job_ids);
Expand Down
2 changes: 1 addition & 1 deletion lib/OpenQA/Worker.pm
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ sub capabilities ($self) {
= join(',', map { 'qemu_' . $_ } @{$supported_archs_by_cpu_archs{$caps->{cpu_arch}} // [$caps->{cpu_arch}]});
# TODO: check installed qemu and kvm?
}

$caps->{parallel_one_host_only} = $global_settings->{PARALLEL_ONE_HOST_ONLY};
return $self->{_caps} = $caps;
}

Expand Down
38 changes: 37 additions & 1 deletion t/04-scheduler.t
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ subtest 'parallel pinning' => sub {

subtest 'no slots on common host picked when pinning not explicitly enabled' => sub {
$slots = OpenQA::Scheduler::WorkerSlotPicker->new(\@to_be_scheduled)->pick_slots_with_common_worker_host;
is_deeply $slots, undef, 'undef returned if not at least one job has pinning enabled';
is_deeply $slots, undef, 'slots not reduced if not at least one job has pinning enabled';
is_deeply $to_be_scheduled[0]->{matching_workers}, [$ws1, $ws2], 'slots of job 1 not altered';
is_deeply $to_be_scheduled[1]->{matching_workers}, [$ws1, $ws2], 'slots of job 2 not altered';
} or diag $explain_slots->();
Expand Down Expand Up @@ -675,6 +675,42 @@ subtest 'parallel pinning' => sub {
is_deeply $to_be_scheduled[0]->{matching_workers}, [$ws1], 'slot 1 assigned to job 1';
is_deeply $to_be_scheduled[1]->{matching_workers}, [$ws2], 'slot 2 assigned to job 2';
} or diag $explain_slots->();

subtest 'setting PARALLEL_ONE_HOST_ONLY as worker property prevents cross-host scheduling' => sub {
$ws1->update({host => 'host1', instance => 1});
$ws2->update({host => 'host2', instance => 1});
$ws2->set_property(PARALLEL_ONE_HOST_ONLY => 1);
@to_be_scheduled = ({id => 1, matching_workers => [$ws1, $ws2]}, {id => 2, matching_workers => [$ws1, $ws2]});

my $picker = OpenQA::Scheduler::WorkerSlotPicker->new(\@to_be_scheduled);
$slots = $picker->pick_slots_with_common_worker_host;
is_deeply $slots, [], 'empty array returned because no single host has enough matching slots';
is_deeply $to_be_scheduled[0]->{matching_workers}, [], 'no slots assigned to job 1';
is_deeply $to_be_scheduled[1]->{matching_workers}, [], 'no slots assigned to job 2';
} or diag $explain_slots->();

subtest 'scheduling on one host still possible when PARALLEL_ONE_HOST_ONLY set as worker property' => sub {
my @slots = ($ws1, $ws2, $ws3);
@to_be_scheduled = ({id => 1, matching_workers => [@slots]}, {id => 2, matching_workers => [@slots]});

my $picker = OpenQA::Scheduler::WorkerSlotPicker->new(\@to_be_scheduled);
$slots = $picker->pick_slots_with_common_worker_host;
is_deeply $slots, [$ws2, $ws3], 'slots from host with enough matching slots picked';
is_deeply $to_be_scheduled[0]->{matching_workers}, [$ws2], 'slot 2 assigned to job 1';
is_deeply $to_be_scheduled[1]->{matching_workers}, [$ws3], 'slot 3 assigned to job 2';
is $picker->{_one_host_only}, 1, 'picking slots on one host only';
} or diag $explain_slots->();

subtest 'PARALLEL_ONE_HOST_ONLY worker property does not prevent other worker slots to be used' => sub {
$ws3->update({host => 'host3', instance => 1});
@to_be_scheduled = ({id => 1, matching_workers => [$ws2, $ws1, $ws3]}, {id => 2, matching_workers => [$ws3]});

my $picker = OpenQA::Scheduler::WorkerSlotPicker->new(\@to_be_scheduled);
$slots = $picker->pick_slots_with_common_worker_host;
is_deeply $slots, [$ws1, $ws3], 'slots from host without PARALLEL_ONE_HOST_ONLY picked';
is_deeply $to_be_scheduled[0]->{matching_workers}, [$ws1], 'slot 2 assigned to job 1';
is_deeply $to_be_scheduled[1]->{matching_workers}, [$ws3], 'slot 3 assigned to job 2';
} or diag $explain_slots->();
};

done_testing;
Loading
Loading