Skip to content

Commit

Permalink
Shutdown the worker when shutdown is initiated and the last job is done
Browse files Browse the repository at this point in the history
  • Loading branch information
bzikarsky committed Apr 28, 2021
1 parent 632dadf commit 2230ef3
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 39 deletions.
12 changes: 12 additions & 0 deletions src/Protocol/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public function __construct(DuplexStreamInterface $stream, CommandFactoryInterfa
$this->emit('close', [$this]);
});

$this->stream->on('end', function () {
$this->closed = true;
$this->emit('close', [$this]);
});

$this->on('close', function () {
foreach ($this->commandSendQueue as $deferred) {
$deferred->reject(new ConnectionLostException());
Expand Down Expand Up @@ -159,4 +164,11 @@ public function close(): void
$this->stream->close();
}
}

public function end() : void
{
if ($this->isClosed()) {
$this->stream->end();
}
}
}
8 changes: 6 additions & 2 deletions src/Protocol/Participant.php
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,12 @@ protected function getCommandFactory(): CommandFactoryInterface
return $this->connection->getCommandFactory();
}

public function disconnect(): void
public function disconnect(bool $graceful = false): void
{
$this->connection->close();
if ($graceful) {
$this->connection->end();
} else {
$this->connection->close();
}
}
}
15 changes: 9 additions & 6 deletions src/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,20 @@ public function getRegisteredFunctions(): array
return array_values($this->functions);
}

public function disconnect(): void
public function disconnect(bool $graceful = false): void
{
$this->unregisterAll();
if ($graceful) {
$this->unregisterAll();
}

parent::disconnect();
parent::disconnect($graceful);
}

public function forceShutdown(): PromiseInterface
{
$this->runningJobs = [];
$this->initShutdown();
$this->disconnect(false);
$this->finishShutdown();
return $this->shutdownPromise->promise();
}

Expand Down Expand Up @@ -184,7 +187,7 @@ protected function finishShutdown(): void

protected function initShutdown(): void
{
$this->disconnect();
$this->disconnect(true);
$this->finishShutdown();
}

Expand Down Expand Up @@ -258,7 +261,7 @@ protected function onJobDone($handle): void
{
unset($this->runningJobs[$handle]);

if ($this->runningJobs === 0 && $this->shutdownPromise !== null) {
if (count($this->runningJobs) === 0 && $this->shutdownPromise !== null) {
$this->initShutdown();
}
}
Expand Down
87 changes: 56 additions & 31 deletions tests/SystemTest.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
<?php

use Amp\Deferred;
use Amp\Loop;
use React\EventLoop\LoopInterface;
use Zikarsky\React\Gearman\ClientInterface;
use Zikarsky\React\Gearman\DuplicateJobException;
use Zikarsky\React\Gearman\Event\TaskDataEvent;
Expand All @@ -17,8 +20,12 @@ class SystemTest extends \PHPUnit\Framework\TestCase
const HOST = '127.0.0.1';
const PORT = 4730;

private LoopInterface $loop;

public function setUp(): void
{
$this->loop = \Amp\ReactAdapter\ReactAdapter::get();

$socket = @stream_socket_client("tcp://" . self::HOST . ":" . self::PORT);
if ($socket === false) {
$this->markTestSkipped("No gearman instance available");
Expand All @@ -27,16 +34,33 @@ public function setUp(): void
}
}

protected function asyncTest(callable $coroutine)
protected function asyncTest(callable $coroutine, int $timeoutMs = 1000)
{
// extensions using an event loop may otherwise leak the file descriptors to the loop
gc_collect_cycles(); // extensions using an event loop may otherwise leak the file descriptors to the loop
\Amp\Loop::run($coroutine);

Loop::run(function () use ($coroutine, $timeoutMs) {
// Set up the watcher and keep the loop alive as long as it exists
$timeoutWatcher = Loop::delay($timeoutMs, fn () => $this->fail('Loop timed out'));
Loop::reference($timeoutWatcher);

// Run test
yield \Amp\call($coroutine);

// Cancel timeoutWatcher when the test concludes
Loop::cancel($timeoutWatcher);

// Check loop state afterwards
$info = \Amp\Loop::get()->getInfo();
if ($info['enabled_watchers']['referenced'] > 0) {
$this->fail("Loop has referenced watchers: " . json_encode($info));
}
});
}

protected function getFactory()
{
$loop = \Amp\ReactAdapter\ReactAdapter::get();
return new \Zikarsky\React\Gearman\Factory($loop);
return new \Zikarsky\React\Gearman\Factory($this->loop);
}

protected function getWorkerAndClient()
Expand All @@ -52,16 +76,16 @@ protected function getWorkerAndClient()
return [$client, $worker];
}

protected function getTaskPromise(TaskInterface $task, callable $onTask)
protected function getTaskPromise(TaskInterface $task, callable $onTask, int $taskTimeoutMs = 10000)
{
$deferred = new \Amp\Deferred();
$deferred = new Deferred();

$watcher = \Amp\Loop::delay(10000, function () use ($deferred, $task) {
$watcher = Loop::delay($taskTimeoutMs, function () use ($deferred, $task) {
$deferred->fail(new Exception("Job timed out: {$task->getWorkload()}"));
});

$task->on('complete', function (TaskDataEvent $event, ClientInterface $client) use ($deferred, $onTask, $watcher) {
\Amp\Loop::cancel($watcher);
Loop::cancel($watcher);
try {
$onTask($event, $client);
$deferred->resolve();
Expand All @@ -71,12 +95,12 @@ protected function getTaskPromise(TaskInterface $task, callable $onTask)
});

$task->on('exception', function (TaskDataEvent $event, ClientInterface $client) use ($deferred, $onTask, $watcher) {
\Amp\Loop::cancel($watcher);
Loop::cancel($watcher);
$deferred->fail(new Exception($event->getData()));
});

$task->on('failure', function (TaskEvent $event, ClientInterface $client) use ($deferred, $onTask, $watcher) {
\Amp\Loop::cancel($watcher);
Loop::cancel($watcher);
$deferred->fail(new Exception("N/A"));
});

Expand Down Expand Up @@ -177,18 +201,19 @@ public function testSubmitBackgroundAndWork()
$task = yield $client->submitBackground($queueName, 'TestData');
$this->assertInstanceOf(TaskInterface::class, $task);

$deferred = new \Amp\Deferred();
$deferred = new Deferred();

$watcher = \Amp\Loop::delay(100, function () use ($deferred) {
$watcher = Loop::delay(100, function () use ($deferred) {
$deferred->fail(new Exception("Job timed out"));
});

yield $worker->register($queueName, function (JobInterface $job) use (&$workerCalled, $deferred, $watcher) {
Loop::cancel($watcher);

$job->complete($job->getWorkload());
$workerCalled = true;
$this->assertEquals('TestData', $job->getWorkload());
$deferred->resolve();
\Amp\Loop::cancel($watcher);
});

yield $deferred->promise();
Expand Down Expand Up @@ -228,7 +253,7 @@ public function testSubmitWithUniqueIds()
$this->assertEquals('TestData2', $event->getData());
});

$deferred = new \Amp\Deferred();
$deferred = new Deferred();

$jobCalls = [];
yield $worker->register($queueName, function (JobInterface $job) use (&$jobCalls, $deferred) {
Expand Down Expand Up @@ -264,8 +289,8 @@ public function testSubmitBackgroundWithUniqueIds()
yield $client->submitBackground($queueName, 'TestData1a', TaskInterface::PRIORITY_NORMAL, '1b');
yield $client->submitBackground($queueName, 'TestData2', TaskInterface::PRIORITY_NORMAL, '2b');

$deferred = new \Amp\Deferred();
$watcher = \Amp\Loop::delay(100, function () use ($deferred) {
$deferred = new Deferred();
$watcher = Loop::delay(100, function () use ($deferred) {
$deferred->fail(new Exception("Job timed out"));
});

Expand All @@ -274,8 +299,8 @@ public function testSubmitBackgroundWithUniqueIds()
$job->complete($job->getWorkload());
$jobCalls[] = $job->getWorkload();
if (count($jobCalls) == 2) {
Loop::cancel($watcher);
$deferred->resolve();
\Amp\Loop::cancel($watcher);
}
});

Expand Down Expand Up @@ -314,19 +339,19 @@ public function testSubmitWithPriorities()
$this->assertEquals('TestData4', $event->getData());
});

$deferred = new \Amp\Deferred();
$deferred = new Deferred();

$jobCalls = [];

$watcher = \Amp\Loop::delay(100, function () use ($deferred) {
$watcher = Loop::delay(100, function () use ($deferred) {
$deferred->fail(new Exception("Job timed out"));
});

yield $worker->register($queueName, function (JobInterface $job) use (&$jobCalls, $deferred, $watcher) {
$job->complete($job->getWorkload());
$jobCalls[] = $job->getWorkload();
if (count($jobCalls) == 2) {
\Amp\Loop::cancel($watcher);
Loop::cancel($watcher);
$deferred->resolve();
}
});
Expand Down Expand Up @@ -358,19 +383,19 @@ public function testSubmitBackgroundWithPriorities()
yield $client->submitBackground($queueName, 'TestData3a', TaskInterface::PRIORITY_LOW, '3b');
yield $client->submitBackground($queueName, 'TestData4', TaskInterface::PRIORITY_HIGH, '4b');

$deferred = new \Amp\Deferred();
$deferred = new Deferred();

$jobCalls = [];

$watcher = \Amp\Loop::delay(100, function () use ($deferred) {
$watcher = Loop::delay(100, function () use ($deferred) {
$deferred->fail(new Exception("Job timed out"));
});

yield $worker->register($queueName, function (JobInterface $job) use (&$jobCalls, $deferred, $watcher) {
$job->complete($job->getWorkload());
$jobCalls[] = $job->getWorkload();
if (count($jobCalls) == 2) {
\Amp\Loop::cancel($watcher);
Loop::cancel($watcher);
$deferred->resolve();
}
});
Expand Down Expand Up @@ -400,15 +425,15 @@ public function testProgress()
$task = yield $client->submit($queueName, 'TestData1', TaskInterface::PRIORITY_NORMAL, '1p');

$dataReceived = [];
$task->on('data', function (TaskDataEvent $event, ClientInterface $client) use (&$dataReceived) {
$task->on('data', function (TaskDataEvent $event) use (&$dataReceived) {
$dataReceived[] = $event->getData();
});

$taskPromise1 = $this->getTaskPromise($task, function (TaskDataEvent $event, ClientInterface $client) use (&$responseReceived) {
$taskPromise1 = $this->getTaskPromise($task, function (TaskDataEvent $event) use (&$responseReceived) {
$this->assertEquals('TestData1', $event->getData());
});

$deferred = new \Amp\Deferred();
$deferred = new Deferred();

$jobCalls = [];
yield $worker->register($queueName, function (JobInterface $job) use (&$jobCalls, $deferred) {
Expand Down Expand Up @@ -583,7 +608,7 @@ public function testGracefulShutdown()
$shutdownPromise = null;
yield $worker->register($queueName, function (JobInterface $job) use ($worker, &$shutdownPromise) {
$shutdownPromise = $worker->shutdown();
$job->complete($job->getWorkload());
$this->loop->addTimer(1, fn () => $job->complete($job->getWorkload()));
});

/**
Expand All @@ -596,10 +621,10 @@ public function testGracefulShutdown()
});

yield $shutdownPromise;
});
}, 3000);
}

public function testDisconnectIsNotGraceul()
public function testDisconnectIsNotGraceful()
{
$queueName = __METHOD__;
$this->asyncTest(function () use ($queueName) {
Expand Down Expand Up @@ -627,12 +652,12 @@ public function testDisconnectIsNotGraceul()
try {
yield $this->getTaskPromise($task, function (TaskDataEvent $event, ClientInterface $client) use (&$responseReceived) {
$this->assertEquals('TestData', $event->getData());
});
}, 1000);
$this->fail("Job did not time out");
} catch (Exception $e) {
$client->cancel($task);
$this->assertEquals("Job timed out: TestData", $e->getMessage());
}
});
}, 2000);
}
}

0 comments on commit 2230ef3

Please sign in to comment.