Skip to content

Commit

Permalink
Merge pull request #1 from adlenton/bugifx/signal-handling-not-working
Browse files Browse the repository at this point in the history
fixed signal handling to actually call custom handlers
  • Loading branch information
jareth authored Oct 5, 2018
2 parents 6d738e7 + 7c64171 commit c7bd8b2
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
32 changes: 22 additions & 10 deletions src/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
*/
class Worker implements LoggerAwareInterface
{
/**
* This is not actually evaluated, but as the signal handlers require the signal number to be supplied, we just
* assume 15 (SIGTERM) by default
*/
const DEFAULT_SIGNO = 15;

/**
* @var string String identifying this worker.
*/
Expand Down Expand Up @@ -271,6 +277,7 @@ public function work($interval = 5)
$this->startup();

while (true) {
pcntl_signal_dispatch();
if ($this->shutdown) {
$this->unregister();
return;
Expand Down Expand Up @@ -441,7 +448,7 @@ protected function refreshQueues()
$this->logger->info('Refreshing queues dynamically, but there are no queues yet');
} else {
$this->logger->notice('Not listening to any queues, and dynamic queue refreshing is disabled');
$this->shutdownNow();
$this->shutdownNow(self::DEFAULT_SIGNO);
}
}

Expand Down Expand Up @@ -582,8 +589,9 @@ protected function registerSigHandlers()

/**
* Signal handler callback for USR2, pauses processing of new jobs.
* @param int $signo
*/
public function pauseProcessing()
public function pauseProcessing($signo)
{
$this->logger->notice('USR2 received; pausing job processing');
$this->paused = true;
Expand All @@ -592,8 +600,9 @@ public function pauseProcessing()
/**
* Signal handler callback for CONT, resumes worker allowing it to pick
* up new jobs.
* @param int $signo
*/
public function unPauseProcessing()
public function unPauseProcessing($signo)
{
$this->logger->notice('CONT received; resuming job processing');
$this->paused = false;
Expand All @@ -602,8 +611,9 @@ public function unPauseProcessing()
/**
* Signal handler for SIGPIPE, in the event the redis connection has gone away.
* Attempts to reconnect to redis, or raises an Exception.
* @param int $signo
*/
public function reestablishRedisConnection()
public function reestablishRedisConnection($signo)
{
$this->logger->notice('SIGPIPE received; attempting to reconnect');
$this->resque->reconnect();
Expand All @@ -612,8 +622,9 @@ public function reestablishRedisConnection()
/**
* Schedule a worker for shutdown. Will finish processing the current job
* and when the timeout interval is reached, the worker will shut down.
* @param int $signo
*/
public function shutdown()
public function shutdown($signo)
{
$this->shutdown = true;
$this->logger->notice('Exiting...');
Expand All @@ -622,18 +633,19 @@ public function shutdown()
/**
* Force an immediate shutdown of the worker, killing any child jobs
* currently running.
* @param int $signo
*/
public function shutdownNow()
public function shutdownNow($signo)
{
$this->shutdown();
$this->killChild();
$this->shutdown($signo);
$this->killChild($signo);
}

/**
* Kill a forked child job immediately. The job it is processing will not
* be completed.
*/
public function killChild()
public function killChild($signo)
{
if (!$this->child) {
$this->logger->notice('No child to kill.');
Expand All @@ -654,7 +666,7 @@ public function killChild()
$this->child = null;
} else {
$this->logger->notice('Child ' . $this->child . ' not found, restarting.');
$this->shutdown();
$this->shutdown($signo);
}
}

Expand Down
4 changes: 2 additions & 2 deletions test/WorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public function testPausedWorkerDoesNotPickUpJobs()
$this->resque->clearQueue('jobs');

$worker = $this->getWorker('*');
$worker->pauseProcessing();
$worker->pauseProcessing(Worker::DEFAULT_SIGNO);

$this->resque->enqueue('jobs', 'Resque\Test\Job');

Expand Down Expand Up @@ -325,6 +325,6 @@ public function testBlockingListPop()
public function testReestablishRedisConnection()
{
$worker = $this->getWorker('jobs');
$worker->reestablishRedisConnection();
$worker->reestablishRedisConnection(Worker::DEFAULT_SIGNO);
}
}

0 comments on commit c7bd8b2

Please sign in to comment.