Skip to content

Commit

Permalink
WIP: Adjust scheduling to consider PARALLEL_ONE_HOST_ONLY from workers
Browse files Browse the repository at this point in the history
  • Loading branch information
Martchus committed Jul 1, 2024
1 parent 60d09e7 commit af76645
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 15 deletions.
44 changes: 30 additions & 14 deletions lib/OpenQA/Scheduler/Model/Jobs.pm
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ sub determine_scheduled_jobs ($self) {
return $self->scheduled_jobs;
}

sub _allocate_worker_slot ($self, $allocated_workers, $worker, $job_info) {
$allocated_workers->{$worker->id} = $job_info->{id};

# set "one_host_only_via_worker"-flag for whole cluster if the allocated worker slot has the PARALLEL_ONE_HOST_ONLY property
return undef unless $worker->get_property('PARALLEL_ONE_HOST_ONLY');
my $scheduled_jobs = $self->scheduled_jobs;
my $cluster_jobs = $job_info->{cluster_jobs};
$job_info->{one_host_only_via_worker} = 1;
for my $job_id (keys %$cluster_jobs) {
next unless my $cluster_job = $scheduled_jobs->{$job_id};
$cluster_job->{one_host_only_via_worker} = 1;
}
}

sub _allocate_jobs ($self, $free_workers) {
my ($allocated_workers, $allocated_jobs) = ({}, {});
my $scheduled_jobs = $self->scheduled_jobs;
Expand Down Expand Up @@ -102,19 +116,19 @@ sub _allocate_jobs ($self, $free_workers) {
# a bonus on their priority
my $prio = $j->{priority}; # we only consider the priority of the main job
for my $worker (keys %taken) {
my $ji = $taken{$worker};
_allocate_worker_with_priority($prio, $ji, $j, $allocated_workers, $worker);
my ($picked_worker, $job_info) = @{$taken{$worker}};
$self->_allocate_worker_with_priority($prio, $job_info, $j, $allocated_workers, $picked_worker);
}
%taken = ();
last;
}
$taken{$picked_worker->id} = $sub_job;
$taken{$picked_worker->id} = [$picked_worker, $sub_job];
}
for my $worker (keys %taken) {
my $ji = $taken{$worker};
$allocated_workers->{$worker} = $ji->{id};
$allocated_jobs->{$ji->{id}}
= {job => $ji->{id}, worker => $worker, priority_offset => \$j->{priority_offset}};
for my $picked_worker_id (keys %taken) {
my ($picked_worker, $job_info) = @{$taken{$picked_worker_id}};
$self->_allocate_worker_slot($allocated_workers, $picked_worker, $job_info);
$allocated_jobs->{$job_info->{id}}
= {job => $job_info->{id}, worker => $picked_worker_id, priority_offset => \$j->{priority_offset}};
}
# we make sure we schedule clusters no matter what,
# but we stop if we're over the limit
Expand All @@ -129,20 +143,21 @@ sub _allocate_jobs ($self, $free_workers) {
return ($allocated_workers, $allocated_jobs);
}

sub _allocate_worker_with_priority ($prio, $ji, $j, $allocated_workers, $worker) {
sub _allocate_worker_with_priority ($self, $prio, $job_info, $j, $allocated_workers, $worker) {
if ($prio > 0) {
# this means we will by default increase the offset per half-assigned job,
# so if we miss 1/25 jobs, we'll bump by +24
log_debug "Discarding job $ji->{id} (with priority $prio) due to incomplete parallel cluster"
log_debug "Discarding job $job_info->{id} (with priority $prio) due to incomplete parallel cluster"
. ', reducing priority by '
. STARVATION_PROTECTION_PRIORITY_OFFSET;
$j->{priority_offset} += STARVATION_PROTECTION_PRIORITY_OFFSET;
}
else {
# don't "take" the worker, but make sure it's not
# used for another job and stays around
log_debug "Holding worker $worker for job $ji->{id} to avoid starvation";
$allocated_workers->{$worker} = $ji->{id};
my $worker_id = $worker->id;
log_debug "Holding worker $worker_id for job $job_info->{id} to avoid starvation";
$self->_allocate_worker_slot($allocated_workers, $worker, $job_info);
}
}

Expand Down Expand Up @@ -363,8 +378,8 @@ sub _pick_siblings_of_running ($self, $allocated_jobs, $allocated_workers) {
last unless $worker_host;
for my $w (@{$jobinfo->{matching_workers}}) {
next if $allocated_workers->{$w->id};
next if $jobinfo->{one_host_only} && $w->host ne $worker_host;
$allocated_workers->{$w->id} = $jobinfo->{id};
$self->_allocate_worker_slot($allocated_workers, $w, $jobinfo);
next if ($jobinfo->{one_host_only} || $jobinfo->{one_host_only_via_worker}) && ($w->host ne $worker_host);
$allocated_jobs->{$jobinfo->{id}} = {job => $jobinfo->{id}, worker => $w->id};
}
}
Expand Down Expand Up @@ -439,6 +454,7 @@ sub _update_scheduled_jobs ($self) {
# it's the same cluster for all, so share
$cluster_infos{$_} = $cluster_jobs for keys %$cluster_jobs;
}
$info->{one_host_only_via_worker} = 0;
$info->{one_host_only} = any { $_->{one_host_only} } values %$cluster_jobs;
$scheduled_jobs->{$job->id} = $info;
}
Expand Down
5 changes: 4 additions & 1 deletion lib/OpenQA/Scheduler/WorkerSlotPicker.pm
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ sub _swap_slot_with_competitor_job ($self, $visited_worker_slots_by_host, $match
sub pick_slots_with_common_worker_host ($self) {
# return early if we don't need to care about picking a common host for the given set of jobs
my $to_be_scheduled = $self->{_to_be_scheduled};
return undef if @$to_be_scheduled < 2 || !(any { $_->{one_host_only} } @$to_be_scheduled);
return undef if @$to_be_scheduled < 2;

# FIXME: make this algorithm check the "one_host_only"-flag only as needed and take worker property PARALLEL_ONE_HOST_ONLY into account
#|| !(any { $_->{one_host_only} || $_->{one_host_only_via_worker} } @$to_be_scheduled);

# let each job pick one slot per host
my $matching_worker_slots_by_host = $self->{_matching_worker_slots_by_host};
Expand Down
9 changes: 9 additions & 0 deletions t/04-scheduler.t
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,15 @@ subtest 'parallel pinning' => sub {
is_deeply $to_be_scheduled[0]->{matching_workers}, [$ws1], 'slot 1 assigned to job 1';
is_deeply $to_be_scheduled[1]->{matching_workers}, [$ws2], 'slot 2 assigned to job 2';
} or diag $explain_slots->();

subtest 'the PARALLEL_ONE_HOST_ONLY setting is taken into account as worker property' => sub {
my @to_be_scheduled = ({id => 1, matching_workers => [$ws1, $ws2]}, {id => 2, matching_workers => [$ws1, $ws2]});
$ws1->set_property(PARALLEL_ONE_HOST_ONLY => 1);
$slots = OpenQA::Scheduler::WorkerSlotPicker->new(\@to_be_scheduled)->pick_slots_with_common_worker_host;
is_deeply $slots, [], 'empty array returned if no single host has enough matching slots';
is_deeply $to_be_scheduled[0]->{matching_workers}, [], 'no slots assigned to job 1';
is_deeply $to_be_scheduled[1]->{matching_workers}, [], 'no slots assigned to job 2';
} or diag $explain_slots->();
};

done_testing;
11 changes: 11 additions & 0 deletions t/05-scheduler-dependencies.t
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,17 @@ subtest 'parallel siblings of running jobs are allocated' => sub {
$schedule->_pick_siblings_of_running($allocated_jobs, $allocated_workers);
ok exists $allocated_jobs->{99998}, 'job 99998 allocated' or diag explain $allocated_jobs;
};
subtest 'no jobs allocated with dependency pinning (via worker property) and mismatching hosts' => sub {
my ($allocated_jobs, $allocated_workers) = ({}, {});
# pretend the job itself does not have the one_host_only-flag
delete $scheduled_jobs{99998}->{one_host_only};
# pretend the worker slot being used has the PARALLEL_ONE_HOST_ONLY property
my $relevant_worker = $workers->find($worker_on_different_host_id);
$relevant_worker->set_property(PARALLEL_ONE_HOST_ONLY => 1);
$scheduled_jobs{99998}->{matching_workers} = [$relevant_worker];
$schedule->_pick_siblings_of_running($allocated_jobs, $allocated_workers);
ok !exists $allocated_jobs->{99998}, 'job 99998 not allocated' or diag explain $allocated_jobs;
};
};

subtest 'PARALLEL_ONE_HOST_ONLY is taken into account when determining scheduled jobs' => sub {
Expand Down

0 comments on commit af76645

Please sign in to comment.