Skip to content

Commit

Permalink
better option, continually running worker, more like the command
Browse files Browse the repository at this point in the history
  • Loading branch information
kiwiroy committed Aug 12, 2019
1 parent a34826f commit 6967e53
Show file tree
Hide file tree
Showing 2 changed files with 355 additions and 25 deletions.
65 changes: 51 additions & 14 deletions lib/Mojolicious/Plugin/Minion.pm
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,67 @@ sub _dev_server {
return unless $server->isa('Mojo::Server::Daemon');
$app->minion->missing_after(0)->repair;

# without server event finish, use worker pid going away to finish
my $morbo_worker_pid = $$;
# without server event finish, use server pid going away to set finished
my $server_pid = $$;
Mojo::IOLoop->subprocess(
sub {
my $subprocess = shift;
# rename process
$0 = 'dev-minion-worker';
$subprocess->ioloop->recurring(
1 => sub { $app->minion->perform_jobs(@args); });
$subprocess->ioloop->recurring(
1 => sub { shift->stop unless kill 0, $morbo_worker_pid });
$subprocess->ioloop->start unless $subprocess->ioloop->is_running;
$app->log->debug("$0 $$ finished.");
return 0;
_run_worker($app->minion->worker, $app->log, $server_pid, @args);
},
sub {
my ($subprocess, $err, @results) = @_;
$app->log->debug("Subprocess error: $err") and return if $err;
$app->log->warn("Subprocess error: $_[1]") and return if $_[1];
}
);
}
) if $c->app->mode eq 'development';
}

sub _job_spawned {
my ($job, $pid) = @_;
my ($id, $task) = ($job->id, $job->task);
$job->app->log->debug(
qq{Process $pid is performing job "$id" with task "$task"});
}

sub _run_worker {
my ($worker, $log, $parent_pid, $args) = (shift, shift, shift, shift || {});

# remarkably similar to Minion::Worker->run, but some important differences
my $status = $worker->status($args)->status;
$status->{command_interval} //= 10;
$status->{dequeue_timeout} //= 5;
$status->{heartbeat_interval} //= 300;
$status->{jobs} //= 4;
$status->{queues} ||= ['default'];
$status->{performed} //= 0;
$status->{repair_interval} //= 21600;
$status->{repair_interval} -= int rand $status->{repair_interval} / 2;
$worker->on(dequeue => sub { pop->once(spawn => \&_job_spawned) });

local $SIG{CHLD} = sub { };
my $commands = $worker->commands;
my $kill = sub {
return unless grep { ($_[1] // '') eq $_ } qw(INT KILL USR1 USR2);
$worker->{jobs}{$_[2]}->kill($_[1]) if $worker->{jobs}{$_[2] // ''};
};
local $commands->{jobs}
= sub { $status->{jobs} = $_[1] if ($_[1] // '') =~ /^\d+$/ };
local $commands->{kill} = $kill;
local $commands->{stop} = sub { $kill->('KILL', $_[1]) };
eval {
$log->info("Worker $$ started");

# $self->{last_repair} ||= 0 in _work() deletes the worker otherwise
$worker->minion->missing_after(180);
$worker->_work
until ($worker->{finished} = !kill 0, $parent_pid)
&& !keys %{$worker->{jobs}};
};
$log->info("Worker $$ stopped");
my $err = $@;
$worker->unregister;
$log->fatal($err) if $err;
}

1;

=encoding utf8
Expand Down
Loading

0 comments on commit 6967e53

Please sign in to comment.