From efacb531cf56564746cf6bcf52b64913fe1de5c1 Mon Sep 17 00:00:00 2001 From: Kalmer Kaurson Date: Mon, 29 Jan 2024 16:09:56 +0200 Subject: [PATCH 1/4] statistics provider interface --- CHANGELOG.md | 2 +- src/cli/InfoAction.php | 63 ++++++ src/drivers/db/Command.php | 1 + src/drivers/db/InfoAction.php | 93 -------- src/drivers/db/Queue.php | 18 +- src/drivers/db/StatisticsProvider.php | 82 ++++++++ src/drivers/file/Command.php | 1 + src/drivers/file/InfoAction.php | 97 --------- src/drivers/file/Queue.php | 16 +- src/drivers/file/StatisticsProvider.php | 81 +++++++ src/drivers/redis/Command.php | 1 + src/drivers/redis/InfoAction.php | 52 ----- src/drivers/redis/Queue.php | 16 +- src/drivers/redis/StatisticsProvider.php | 74 +++++++ src/interfaces/DelayedCountInterface.php | 22 ++ src/interfaces/DoneCountInterface.php | 22 ++ src/interfaces/ReservedCountInterface.php | 22 ++ .../StatisticsProviderInterface.php | 22 ++ src/interfaces/WaitingCountInterface.php | 22 ++ tests/app/config/main.php | 3 + tests/cli/InfoActionTest.php | 198 ++++++++++++++++++ tests/cli/Queue.php | 47 +++++ .../cli/providers/BaseStatisticsProvider.php | 31 +++ tests/cli/providers/DelayedCountProvider.php | 27 +++ tests/cli/providers/DoneCountProvider.php | 27 +++ tests/cli/providers/ReservedCountProvider.php | 27 +++ tests/cli/providers/WaitingCountProvider.php | 27 +++ tests/docker-compose.yml | 6 +- tests/drivers/db/TestCase.php | 40 ++++ tests/drivers/file/QueueTest.php | 37 ++++ tests/drivers/redis/QueueTest.php | 37 ++++ 31 files changed, 966 insertions(+), 248 deletions(-) create mode 100644 src/cli/InfoAction.php delete mode 100644 src/drivers/db/InfoAction.php create mode 100644 src/drivers/db/StatisticsProvider.php delete mode 100644 src/drivers/file/InfoAction.php create mode 100644 src/drivers/file/StatisticsProvider.php delete mode 100644 src/drivers/redis/InfoAction.php create mode 100644 src/drivers/redis/StatisticsProvider.php create mode 100644 src/interfaces/DelayedCountInterface.php create mode 100644 src/interfaces/DoneCountInterface.php create mode 100644 src/interfaces/ReservedCountInterface.php create mode 100644 src/interfaces/StatisticsProviderInterface.php create mode 100644 src/interfaces/WaitingCountInterface.php create mode 100644 tests/cli/InfoActionTest.php create mode 100644 tests/cli/Queue.php create mode 100644 tests/cli/providers/BaseStatisticsProvider.php create mode 100644 tests/cli/providers/DelayedCountProvider.php create mode 100644 tests/cli/providers/DoneCountProvider.php create mode 100644 tests/cli/providers/ReservedCountProvider.php create mode 100644 tests/cli/providers/WaitingCountProvider.php diff --git a/CHANGELOG.md b/CHANGELOG.md index a4c06012b..416780e4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ Yii2 Queue Extension Change Log 2.3.7 under development ----------------------- - +- Enh #509: Add StatisticsProviderInterface to get statistics from queue (kalmer) - no changes in this release. diff --git a/src/cli/InfoAction.php b/src/cli/InfoAction.php new file mode 100644 index 000000000..f857ffe55 --- /dev/null +++ b/src/cli/InfoAction.php @@ -0,0 +1,63 @@ + + */ +class InfoAction extends Action +{ + /** + * @var Queue + */ + public $queue; + + + /** + * Info about queue status. + */ + public function run() + { + if (!($this->queue instanceof StatisticsProviderInterface)) { + throw new NotSupportedException('Queue does not support ' . StatisticsProviderInterface::class); + } + + $this->controller->stdout('Jobs' . PHP_EOL, Console::FG_GREEN); + $statisticsProvider = $this->queue->getStatisticsProvider(); + + if ($statisticsProvider instanceof WaitingCountInterface) { + $this->controller->stdout('- waiting: ', Console::FG_YELLOW); + $this->controller->stdout($statisticsProvider->getWaitingCount() . PHP_EOL); + } + + if ($statisticsProvider instanceof DelayedCountInterface) { + $this->controller->stdout('- delayed: ', Console::FG_YELLOW); + $this->controller->stdout($statisticsProvider->getDelayedCount() . PHP_EOL); + } + + if ($statisticsProvider instanceof ReservedCountInterface) { + $this->controller->stdout('- reserved: ', Console::FG_YELLOW); + $this->controller->stdout($statisticsProvider->getReservedCount() . PHP_EOL); + } + + if ($statisticsProvider instanceof DoneCountInterface) { + $this->controller->stdout('- done: ', Console::FG_YELLOW); + $this->controller->stdout($statisticsProvider->getDoneCount() . PHP_EOL); + } + } +} diff --git a/src/drivers/db/Command.php b/src/drivers/db/Command.php index a61e0787d..141fe9754 100644 --- a/src/drivers/db/Command.php +++ b/src/drivers/db/Command.php @@ -9,6 +9,7 @@ use yii\console\Exception; use yii\queue\cli\Command as CliCommand; +use yii\queue\cli\InfoAction; /** * Manages application db-queue. diff --git a/src/drivers/db/InfoAction.php b/src/drivers/db/InfoAction.php deleted file mode 100644 index 0510e0496..000000000 --- a/src/drivers/db/InfoAction.php +++ /dev/null @@ -1,93 +0,0 @@ - - */ -class InfoAction extends Action -{ - /** - * @var Queue - */ - public $queue; - - - /** - * Info about queue status. - */ - public function run() - { - Console::output($this->format('Jobs', Console::FG_GREEN)); - - Console::stdout($this->format('- waiting: ', Console::FG_YELLOW)); - Console::output($this->getWaiting()->count('*', $this->queue->db)); - - Console::stdout($this->format('- delayed: ', Console::FG_YELLOW)); - Console::output($this->getDelayed()->count('*', $this->queue->db)); - - Console::stdout($this->format('- reserved: ', Console::FG_YELLOW)); - Console::output($this->getReserved()->count('*', $this->queue->db)); - - Console::stdout($this->format('- done: ', Console::FG_YELLOW)); - Console::output($this->getDone()->count('*', $this->queue->db)); - } - - /** - * @return Query - */ - protected function getWaiting() - { - return (new Query()) - ->from($this->queue->tableName) - ->andWhere(['channel' => $this->queue->channel]) - ->andWhere(['reserved_at' => null]) - ->andWhere(['delay' => 0]); - } - - /** - * @return Query - */ - protected function getDelayed() - { - return (new Query()) - ->from($this->queue->tableName) - ->andWhere(['channel' => $this->queue->channel]) - ->andWhere(['reserved_at' => null]) - ->andWhere(['>', 'delay', 0]); - } - - /** - * @return Query - */ - protected function getReserved() - { - return (new Query()) - ->from($this->queue->tableName) - ->andWhere(['channel' => $this->queue->channel]) - ->andWhere('[[reserved_at]] is not null') - ->andWhere(['done_at' => null]); - } - - /** - * @return Query - */ - protected function getDone() - { - return (new Query()) - ->from($this->queue->tableName) - ->andWhere(['channel' => $this->queue->channel]) - ->andWhere('[[done_at]] is not null'); - } -} diff --git a/src/drivers/db/Queue.php b/src/drivers/db/Queue.php index 93f3d98fb..1affd1cca 100644 --- a/src/drivers/db/Queue.php +++ b/src/drivers/db/Queue.php @@ -14,13 +14,14 @@ use yii\di\Instance; use yii\mutex\Mutex; use yii\queue\cli\Queue as CliQueue; +use yii\queue\interfaces\StatisticsProviderInterface; /** * Db Queue. * * @author Roman Zhuravlev */ -class Queue extends CliQueue +class Queue extends CliQueue implements StatisticsProviderInterface { /** * @var Connection|array|string @@ -233,6 +234,8 @@ protected function release($payload) } } + protected $reserveTime; + /** * Moves expired messages into waiting list. */ @@ -251,5 +254,16 @@ protected function moveExpired() } } - protected $reserveTime; + private $_statistcsProvider; + + /** + * @return StatisticsProvider + */ + public function getStatisticsProvider() + { + if (!$this->_statistcsProvider) { + $this->_statistcsProvider = new StatisticsProvider($this); + } + return $this->_statistcsProvider; + } } diff --git a/src/drivers/db/StatisticsProvider.php b/src/drivers/db/StatisticsProvider.php new file mode 100644 index 000000000..7df9c3c06 --- /dev/null +++ b/src/drivers/db/StatisticsProvider.php @@ -0,0 +1,82 @@ + + */ +class StatisticsProvider extends BaseObject implements DoneCountInterface, WaitingCountInterface, DelayedCountInterface, ReservedCountInterface +{ + /** + * @var Queue + */ + protected $queue; + + public function __construct(Queue $queue, $config = []) + { + $this->queue = $queue; + parent::__construct($config); + } + + /** + * @inheritdoc + */ + public function getWaitingCount() + { + return (new Query()) + ->from($this->queue->tableName) + ->andWhere(['channel' => $this->queue->channel]) + ->andWhere(['reserved_at' => null]) + ->andWhere(['delay' => 0])->count('*', $this->queue->db); + } + + /** + * @inheritdoc + */ + public function getDelayedCount() + { + return (new Query()) + ->from($this->queue->tableName) + ->andWhere(['channel' => $this->queue->channel]) + ->andWhere(['reserved_at' => null]) + ->andWhere(['>', 'delay', 0])->count('*', $this->queue->db); + } + + /** + * @inheritdoc + */ + public function getReservedCount() + { + return (new Query()) + ->from($this->queue->tableName) + ->andWhere(['channel' => $this->queue->channel]) + ->andWhere('[[reserved_at]] is not null') + ->andWhere(['done_at' => null])->count('*', $this->queue->db); + } + + /** + * @inheritdoc + */ + public function getDoneCount() + { + return (new Query()) + ->from($this->queue->tableName) + ->andWhere(['channel' => $this->queue->channel]) + ->andWhere('[[done_at]] is not null')->count('*', $this->queue->db); + } +} diff --git a/src/drivers/file/Command.php b/src/drivers/file/Command.php index ae6685c31..edfdd5dbd 100644 --- a/src/drivers/file/Command.php +++ b/src/drivers/file/Command.php @@ -9,6 +9,7 @@ use yii\console\Exception; use yii\queue\cli\Command as CliCommand; +use yii\queue\cli\InfoAction; /** * Manages application file-queue. diff --git a/src/drivers/file/InfoAction.php b/src/drivers/file/InfoAction.php deleted file mode 100644 index 9c996f2a3..000000000 --- a/src/drivers/file/InfoAction.php +++ /dev/null @@ -1,97 +0,0 @@ - - */ -class InfoAction extends Action -{ - /** - * @var Queue - */ - public $queue; - - - /** - * Info about queue status. - */ - public function run() - { - Console::output($this->format('Jobs', Console::FG_GREEN)); - - Console::stdout($this->format('- waiting: ', Console::FG_YELLOW)); - Console::output($this->getWaitingCount()); - - Console::stdout($this->format('- delayed: ', Console::FG_YELLOW)); - Console::output($this->getDelayedCount()); - - Console::stdout($this->format('- reserved: ', Console::FG_YELLOW)); - Console::output($this->getReservedCount()); - - Console::stdout($this->format('- done: ', Console::FG_YELLOW)); - Console::output($this->getDoneCount()); - } - - /** - * @return int - */ - protected function getWaitingCount() - { - $data = $this->getIndexData(); - return !empty($data['waiting']) ? count($data['waiting']) : 0; - } - - /** - * @return int - */ - protected function getDelayedCount() - { - $data = $this->getIndexData(); - return !empty($data['delayed']) ? count($data['delayed']) : 0; - } - - /** - * @return int - */ - protected function getReservedCount() - { - $data = $this->getIndexData(); - return !empty($data['reserved']) ? count($data['reserved']) : 0; - } - - /** - * @return int - */ - protected function getDoneCount() - { - $data = $this->getIndexData(); - $total = isset($data['lastId']) ? $data['lastId'] : 0; - return $total - $this->getDelayedCount() - $this->getWaitingCount(); - } - - protected function getIndexData() - { - static $data; - if ($data === null) { - $fileName = $this->queue->path . '/index.data'; - if (file_exists($fileName)) { - $data = call_user_func($this->queue->indexDeserializer, file_get_contents($fileName)); - } else { - $data = []; - } - } - - return $data; - } -} diff --git a/src/drivers/file/Queue.php b/src/drivers/file/Queue.php index a054bf557..c1e2fddc6 100644 --- a/src/drivers/file/Queue.php +++ b/src/drivers/file/Queue.php @@ -13,13 +13,14 @@ use yii\base\NotSupportedException; use yii\helpers\FileHelper; use yii\queue\cli\Queue as CliQueue; +use yii\queue\interfaces\StatisticsProviderInterface; /** * File Queue. * * @author Roman Zhuravlev */ -class Queue extends CliQueue +class Queue extends CliQueue implements StatisticsProviderInterface { /** * @var string @@ -304,4 +305,17 @@ private function touchIndex($callback) fclose($file); } } + + private $_statistcsProvider; + + /** + * @return StatisticsProvider + */ + public function getStatisticsProvider() + { + if (!$this->_statistcsProvider) { + $this->_statistcsProvider = new StatisticsProvider($this); + } + return $this->_statistcsProvider; + } } diff --git a/src/drivers/file/StatisticsProvider.php b/src/drivers/file/StatisticsProvider.php new file mode 100644 index 000000000..da0780ce8 --- /dev/null +++ b/src/drivers/file/StatisticsProvider.php @@ -0,0 +1,81 @@ + + */ +class StatisticsProvider extends BaseObject implements DoneCountInterface, WaitingCountInterface, DelayedCountInterface, ReservedCountInterface +{ + /** + * @var Queue + */ + protected $queue; + + public function __construct(Queue $queue, $config = []) + { + $this->queue = $queue; + parent::__construct($config); + } + + /** + * @inheritdoc + */ + public function getWaitingCount() + { + $data = $this->getIndexData(); + return !empty($data['waiting']) ? count($data['waiting']) : 0; + } + + /** + * @inheritdoc + */ + public function getDelayedCount() + { + $data = $this->getIndexData(); + return !empty($data['delayed']) ? count($data['delayed']) : 0; + } + + /** + * @inheritdoc + */ + public function getReservedCount() + { + $data = $this->getIndexData(); + return !empty($data['reserved']) ? count($data['reserved']) : 0; + } + + /** + * @inheritdoc + */ + public function getDoneCount() + { + $data = $this->getIndexData(); + $total = isset($data['lastId']) ? $data['lastId'] : 0; + return $total - $this->getDelayedCount() - $this->getWaitingCount(); + } + + protected function getIndexData() + { + $fileName = $this->queue->path . '/index.data'; + if (file_exists($fileName)) { + return call_user_func($this->queue->indexDeserializer, file_get_contents($fileName)); + } else { + return []; + } + } +} diff --git a/src/drivers/redis/Command.php b/src/drivers/redis/Command.php index 146aa18d2..0d152389c 100644 --- a/src/drivers/redis/Command.php +++ b/src/drivers/redis/Command.php @@ -9,6 +9,7 @@ use yii\console\Exception; use yii\queue\cli\Command as CliCommand; +use yii\queue\cli\InfoAction; /** * Manages application redis-queue. diff --git a/src/drivers/redis/InfoAction.php b/src/drivers/redis/InfoAction.php deleted file mode 100644 index 62d410a25..000000000 --- a/src/drivers/redis/InfoAction.php +++ /dev/null @@ -1,52 +0,0 @@ - - */ -class InfoAction extends Action -{ - /** - * @var Queue - */ - public $queue; - - - /** - * Info about queue status. - */ - public function run() - { - $prefix = $this->queue->channel; - $waiting = $this->queue->redis->llen("$prefix.waiting"); - $delayed = $this->queue->redis->zcount("$prefix.delayed", '-inf', '+inf'); - $reserved = $this->queue->redis->zcount("$prefix.reserved", '-inf', '+inf'); - $total = $this->queue->redis->get("$prefix.message_id"); - $done = $total - $waiting - $delayed - $reserved; - - Console::output($this->format('Jobs', Console::FG_GREEN)); - - Console::stdout($this->format('- waiting: ', Console::FG_YELLOW)); - Console::output($waiting); - - Console::stdout($this->format('- delayed: ', Console::FG_YELLOW)); - Console::output($delayed); - - Console::stdout($this->format('- reserved: ', Console::FG_YELLOW)); - Console::output($reserved); - - Console::stdout($this->format('- done: ', Console::FG_YELLOW)); - Console::output($done); - } -} diff --git a/src/drivers/redis/Queue.php b/src/drivers/redis/Queue.php index b56d1d07f..2babf2c92 100644 --- a/src/drivers/redis/Queue.php +++ b/src/drivers/redis/Queue.php @@ -11,6 +11,7 @@ use yii\base\NotSupportedException; use yii\di\Instance; use yii\queue\cli\Queue as CliQueue; +use yii\queue\interfaces\StatisticsProviderInterface; use yii\redis\Connection; /** @@ -18,7 +19,7 @@ * * @author Roman Zhuravlev */ -class Queue extends CliQueue +class Queue extends CliQueue implements StatisticsProviderInterface { /** * @var Connection|array|string @@ -205,4 +206,17 @@ protected function pushMessage($message, $ttr, $delay, $priority) return $id; } + + private $_statistcsProvider; + + /** + * @return StatisticsProvider + */ + public function getStatisticsProvider() + { + if (!$this->_statistcsProvider) { + $this->_statistcsProvider = new StatisticsProvider($this); + } + return $this->_statistcsProvider; + } } diff --git a/src/drivers/redis/StatisticsProvider.php b/src/drivers/redis/StatisticsProvider.php new file mode 100644 index 000000000..13824b4cb --- /dev/null +++ b/src/drivers/redis/StatisticsProvider.php @@ -0,0 +1,74 @@ + + */ +class StatisticsProvider extends BaseObject implements DoneCountInterface, WaitingCountInterface, DelayedCountInterface, ReservedCountInterface +{ + /** + * @var Queue + */ + protected $queue; + + public function __construct(Queue $queue, $config = []) + { + $this->queue = $queue; + parent::__construct($config); + } + + /** + * @inheritdoc + */ + public function getWaitingCount() + { + $prefix = $this->queue->channel; + return $this->queue->redis->llen("$prefix.waiting"); + } + + /** + * @inheritdoc + */ + public function getDelayedCount() + { + $prefix = $this->queue->channel; + return $this->queue->redis->zcount("$prefix.delayed", '-inf', '+inf'); + } + + /** + * @inheritdoc + */ + public function getReservedCount() + { + $prefix = $this->queue->channel; + return $this->queue->redis->zcount("$prefix.reserved", '-inf', '+inf'); + } + + /** + * @inheritdoc + */ + public function getDoneCount() + { + $prefix = $this->queue->channel; + $waiting = $this->getWaitingCount(); + $delayed = $this->getDelayedCount(); + $reserved = $this->getReservedCount(); + $total = $this->queue->redis->get("$prefix.message_id"); + return $total - $waiting - $delayed - $reserved; + } +} diff --git a/src/interfaces/DelayedCountInterface.php b/src/interfaces/DelayedCountInterface.php new file mode 100644 index 000000000..63ad9e8f1 --- /dev/null +++ b/src/interfaces/DelayedCountInterface.php @@ -0,0 +1,22 @@ + + */ +interface DelayedCountInterface +{ + /** + * @return int + */ + public function getDelayedCount(); +} diff --git a/src/interfaces/DoneCountInterface.php b/src/interfaces/DoneCountInterface.php new file mode 100644 index 000000000..be8f0c453 --- /dev/null +++ b/src/interfaces/DoneCountInterface.php @@ -0,0 +1,22 @@ + + */ +interface DoneCountInterface +{ + /** + * @return int + */ + public function getDoneCount(); +} diff --git a/src/interfaces/ReservedCountInterface.php b/src/interfaces/ReservedCountInterface.php new file mode 100644 index 000000000..78985f3ed --- /dev/null +++ b/src/interfaces/ReservedCountInterface.php @@ -0,0 +1,22 @@ + + */ +interface ReservedCountInterface +{ + /** + * @return int + */ + public function getReservedCount(); +} diff --git a/src/interfaces/StatisticsProviderInterface.php b/src/interfaces/StatisticsProviderInterface.php new file mode 100644 index 000000000..7ca7bfcca --- /dev/null +++ b/src/interfaces/StatisticsProviderInterface.php @@ -0,0 +1,22 @@ + + */ +interface StatisticsProviderInterface +{ + /** + * @return int + */ + public function getStatisticsProvider(); +} diff --git a/src/interfaces/WaitingCountInterface.php b/src/interfaces/WaitingCountInterface.php new file mode 100644 index 000000000..aa142a8ad --- /dev/null +++ b/src/interfaces/WaitingCountInterface.php @@ -0,0 +1,22 @@ + + */ +interface WaitingCountInterface +{ + /** + * @return int + */ + public function getWaitingCount(); +} diff --git a/tests/app/config/main.php b/tests/app/config/main.php index 29b16b8f8..a4cb30fd1 100644 --- a/tests/app/config/main.php +++ b/tests/app/config/main.php @@ -43,6 +43,7 @@ 'class' => \yii\mutex\MysqlMutex::class, 'db' => 'mysql', ], + 'deleteReleased' => false, ], 'sqlite' => [ 'class' => \yii\db\Connection::class, @@ -52,6 +53,7 @@ 'class' => \yii\queue\db\Queue::class, 'db' => 'sqlite', 'mutex' => \yii\mutex\FileMutex::class, + 'deleteReleased' => false, ], 'pgsql' => [ 'class' => \yii\db\Connection::class, @@ -72,6 +74,7 @@ 'db' => 'pgsql', ], 'mutexTimeout' => 0, + 'deleteReleased' => false, ], 'redis' => [ 'class' => \yii\redis\Connection::class, diff --git a/tests/cli/InfoActionTest.php b/tests/cli/InfoActionTest.php new file mode 100644 index 000000000..1756e374a --- /dev/null +++ b/tests/cli/InfoActionTest.php @@ -0,0 +1,198 @@ + + */ +class InfoActionTest extends TestCase +{ + public function testWaitingCount() + { + $controller = $this->getMockBuilder(Controller::class) + ->setConstructorArgs(['testController', new Module('testModule')]) + ->getMock() + ; + + $controller->expects(self::exactly(3)) + ->method('stdout') + ->withConsecutive( + [ + 'Jobs' . PHP_EOL, + Console::FG_GREEN, + ], + [ + '- waiting: ', + Console::FG_YELLOW, + ], + [ + 10 . PHP_EOL + ] + ) + ; + + $queue = $this->getMockBuilder(Queue::class)->getMock(); + + $provider = $this->getMockBuilder(WaitingCountProvider::class) + ->setConstructorArgs([$queue]) + ->getMock() + ; + $provider->expects(self::once()) + ->method('getWaitingCount') + ->willReturn(10) + ; + + $queue->method('getStatisticsProvider')->willReturn($provider); + + $action = (new InfoAction('infoAction', $controller, [ + 'queue' => $queue, + ])); + $action->run(); + } + + public function testDelayedCount() + { + $controller = $this->getMockBuilder(Controller::class) + ->setConstructorArgs(['testController', new Module('testModule')]) + ->getMock() + ; + + $controller->expects(self::exactly(3)) + ->method('stdout') + ->withConsecutive( + [ + 'Jobs' . PHP_EOL, + Console::FG_GREEN, + ], + [ + '- delayed: ', + Console::FG_YELLOW, + ], + [ + 10 . PHP_EOL + ] + ) + ; + + $queue = $this->getMockBuilder(Queue::class)->getMock(); + + $provider = $this->getMockBuilder(DelayedCountProvider::class) + ->setConstructorArgs([$queue]) + ->getMock() + ; + $provider->expects(self::once()) + ->method('getDelayedCount') + ->willReturn(10) + ; + + $queue->method('getStatisticsProvider')->willReturn($provider); + + $action = (new InfoAction('infoAction', $controller, [ + 'queue' => $queue, + ])); + $action->run(); + } + + public function testReservedCount() + { + $controller = $this->getMockBuilder(Controller::class) + ->setConstructorArgs(['testController', new Module('testModule')]) + ->getMock() + ; + + $controller->expects(self::exactly(3)) + ->method('stdout') + ->withConsecutive( + [ + 'Jobs' . PHP_EOL, + Console::FG_GREEN, + ], + [ + '- reserved: ', + Console::FG_YELLOW, + ], + [ + 10 . PHP_EOL + ] + ) + ; + + $queue = $this->getMockBuilder(Queue::class)->getMock(); + + $provider = $this->getMockBuilder(ReservedCountProvider::class) + ->setConstructorArgs([$queue]) + ->getMock() + ; + $provider->expects(self::once()) + ->method('getReservedCount') + ->willReturn(10) + ; + + $queue->method('getStatisticsProvider')->willReturn($provider); + + $action = (new InfoAction('infoAction', $controller, [ + 'queue' => $queue, + ])); + $action->run(); + } + + public function testDoneCount() + { + $controller = $this->getMockBuilder(Controller::class) + ->setConstructorArgs(['testController', new Module('testModule')]) + ->getMock() + ; + + $controller->expects(self::exactly(3)) + ->method('stdout') + ->withConsecutive( + [ + 'Jobs' . PHP_EOL, + Console::FG_GREEN, + ], + [ + '- done: ', + Console::FG_YELLOW, + ], + [ + 10 . PHP_EOL + ] + ) + ; + + $queue = $this->getMockBuilder(Queue::class)->getMock(); + + $provider = $this->getMockBuilder(DoneCountProvider::class) + ->setConstructorArgs([$queue]) + ->getMock() + ; + $provider->expects(self::once()) + ->method('getDoneCount') + ->willReturn(10) + ; + + $queue->method('getStatisticsProvider')->willReturn($provider); + + $action = (new InfoAction('infoAction', $controller, [ + 'queue' => $queue, + ])); + $action->run(); + } +} diff --git a/tests/cli/Queue.php b/tests/cli/Queue.php new file mode 100644 index 000000000..e7a6412d7 --- /dev/null +++ b/tests/cli/Queue.php @@ -0,0 +1,47 @@ + + */ +class Queue extends CliQueue implements StatisticsProviderInterface +{ + /** + * @inheritdoc + */ + public function status($id) + { + throw new NotSupportedException('"status" method is not supported.'); + } + /** + * @inheritdoc + */ + protected function pushMessage($message, $ttr, $delay, $priority) + { + throw new NotSupportedException('"pushMessage" method is not supported.'); + } + + /** + * @return StatisticsProvider + */ + public function getStatisticsProvider() + { + if (!$this->_statistcsProvider) { + $this->_statistcsProvider = new BaseStatisticsProvider($this); + } + return $this->_statistcsProvider; + } +} diff --git a/tests/cli/providers/BaseStatisticsProvider.php b/tests/cli/providers/BaseStatisticsProvider.php new file mode 100644 index 000000000..a403d52ca --- /dev/null +++ b/tests/cli/providers/BaseStatisticsProvider.php @@ -0,0 +1,31 @@ + + */ +class BaseStatisticsProvider extends BaseObject +{ + /** + * @var Queue + */ + protected $queue; + + public function __construct(Queue $queue, $config = []) + { + $this->queue = $queue; + parent::__construct($config); + } +} diff --git a/tests/cli/providers/DelayedCountProvider.php b/tests/cli/providers/DelayedCountProvider.php new file mode 100644 index 000000000..d15beab58 --- /dev/null +++ b/tests/cli/providers/DelayedCountProvider.php @@ -0,0 +1,27 @@ + + */ +class DelayedCountProvider extends BaseStatisticsProvider implements DelayedCountInterface +{ + /** + * @inheritdoc + */ + public function getDelayedCount() + { + return 10; + } +} diff --git a/tests/cli/providers/DoneCountProvider.php b/tests/cli/providers/DoneCountProvider.php new file mode 100644 index 000000000..0514aed66 --- /dev/null +++ b/tests/cli/providers/DoneCountProvider.php @@ -0,0 +1,27 @@ + + */ +class DoneCountProvider extends BaseStatisticsProvider implements DoneCountInterface +{ + /** + * @inheritdoc + */ + public function getDoneCount() + { + return 10; + } +} diff --git a/tests/cli/providers/ReservedCountProvider.php b/tests/cli/providers/ReservedCountProvider.php new file mode 100644 index 000000000..fbb894ac4 --- /dev/null +++ b/tests/cli/providers/ReservedCountProvider.php @@ -0,0 +1,27 @@ + + */ +class ReservedCountProvider extends BaseStatisticsProvider implements ReservedCountInterface +{ + /** + * @inheritdoc + */ + public function getReservedCount() + { + return 10; + } +} diff --git a/tests/cli/providers/WaitingCountProvider.php b/tests/cli/providers/WaitingCountProvider.php new file mode 100644 index 000000000..7a53b32dc --- /dev/null +++ b/tests/cli/providers/WaitingCountProvider.php @@ -0,0 +1,27 @@ + + */ +class WaitingCountProvider extends BaseStatisticsProvider implements WaitingCountInterface +{ + /** + * @inheritdoc + */ + public function getWaitingCount() + { + return 10; + } +} diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 2b7000c6a..6e78de144 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -62,6 +62,10 @@ services: MYSQL_DATABASE: yii2_queue_test networks: net: {} + ulimits: + nofile: + soft: 262144 + hard: 262144 # https://hub.docker.com/_/postgres/ postgres: @@ -110,7 +114,7 @@ services: # https://hub.docker.com/r/webcenter/activemq/ activemq: - image: webcenter/activemq + image: islandora/activemq:3 ports: - 61613:61613 networks: diff --git a/tests/drivers/db/TestCase.php b/tests/drivers/db/TestCase.php index bc57ca752..7581312f4 100644 --- a/tests/drivers/db/TestCase.php +++ b/tests/drivers/db/TestCase.php @@ -10,6 +10,7 @@ use tests\app\PriorityJob; use tests\app\RetryJob; use tests\drivers\CliTestCase; +use Yii; use yii\db\Query; /** @@ -105,8 +106,47 @@ public function testRemove() $this->assertEquals(0, $actual); } + public function testWaitingCount() + { + $this->getQueue()->push($this->createSimpleJob()); + + $this->assertEquals(1, $this->getQueue()->getStatisticsProvider()->getWaitingCount()); + } + + public function testDelayedCount() + { + $this->getQueue()->delay(5)->push($this->createSimpleJob()); + + $this->assertEquals(1, $this->getQueue()->getStatisticsProvider()->getDelayedCount()); + } + + public function testReservedCount() + { + $this->getQueue()->messageHandler = function () { + $this->assertEquals(1, $this->getQueue()->getStatisticsProvider()->getReservedCount()); + }; + + $job = $this->createSimpleJob(); + $this->getQueue()->push($job); + $this->getQueue()->run(false); + } + + public function testDoneCount() + { + $this->startProcess(['php', 'yii', 'queue/listen', '1']); + + $job = $this->createSimpleJob(); + $this->getQueue()->push($job); + + $this->assertSimpleJobDone($job); + + $this->assertEquals(1, $this->getQueue()->getStatisticsProvider()->getDoneCount()); + } + + protected function tearDown() { + $this->getQueue()->messageHandler = null; $this->getQueue()->db->createCommand() ->delete($this->getQueue()->tableName) ->execute(); diff --git a/tests/drivers/file/QueueTest.php b/tests/drivers/file/QueueTest.php index 1faea8006..42bc68050 100644 --- a/tests/drivers/file/QueueTest.php +++ b/tests/drivers/file/QueueTest.php @@ -87,6 +87,42 @@ public function testRemove() $this->assertFileNotExists($this->getQueue()->path . "/job$id.data"); } + public function testWaitingCount() + { + $this->getQueue()->push($this->createSimpleJob()); + + $this->assertEquals(1, $this->getQueue()->getStatisticsProvider()->getWaitingCount()); + } + + public function testDelayedCount() + { + $this->getQueue()->delay(5)->push($this->createSimpleJob()); + + $this->assertEquals(1, $this->getQueue()->getStatisticsProvider()->getDelayedCount()); + } + + public function testReservedCount() + { + $this->getQueue()->messageHandler = function () { + $this->assertEquals(1, $this->getQueue()->getStatisticsProvider()->getReservedCount()); + }; + + $job = $this->createSimpleJob(); + $this->getQueue()->push($job); + $this->getQueue()->run(false); + } + + public function testDoneCount() + { + $this->startProcess(['php', 'yii', 'queue/listen', '1']); + $job = $this->createSimpleJob(); + $this->getQueue()->push($job); + + $this->assertSimpleJobDone($job); + + $this->assertEquals(1, $this->getQueue()->getStatisticsProvider()->getDoneCount()); + } + /** * @return Queue */ @@ -97,6 +133,7 @@ protected function getQueue() protected function tearDown() { + $this->getQueue()->messageHandler = null; foreach (glob(Yii::getAlias("@runtime/queue/*")) as $fileName) { unlink($fileName); } diff --git a/tests/drivers/redis/QueueTest.php b/tests/drivers/redis/QueueTest.php index 8e6c462e9..8fbbf5a00 100644 --- a/tests/drivers/redis/QueueTest.php +++ b/tests/drivers/redis/QueueTest.php @@ -87,6 +87,42 @@ public function testRemove() $this->assertFalse((bool) $this->getQueue()->redis->hexists($this->getQueue()->channel . '.messages', $id)); } + public function testWaitingCount() + { + $this->getQueue()->push($this->createSimpleJob()); + + $this->assertEquals(1, $this->getQueue()->getStatisticsProvider()->getWaitingCount()); + } + + public function testDelayedCount() + { + $this->getQueue()->delay(5)->push($this->createSimpleJob()); + + $this->assertEquals(1, $this->getQueue()->getStatisticsProvider()->getDelayedCount()); + } + + public function testReservedCount() + { + $this->getQueue()->messageHandler = function () { + $this->assertEquals(1, $this->getQueue()->getStatisticsProvider()->getReservedCount()); + }; + + $job = $this->createSimpleJob(); + $this->getQueue()->push($job); + $this->getQueue()->run(false); + } + + public function testDoneCount() + { + $this->startProcess(['php', 'yii', 'queue/listen', '1']); + $job = $this->createSimpleJob(); + $this->getQueue()->push($job); + + $this->assertSimpleJobDone($job); + + $this->assertEquals(1, $this->getQueue()->getStatisticsProvider()->getDoneCount()); + } + /** * @return Queue */ @@ -97,6 +133,7 @@ protected function getQueue() protected function tearDown() { + $this->getQueue()->messageHandler = null; $this->getQueue()->redis->flushdb(); parent::tearDown(); } From 2cd5ff1ffe1a8ff6f6e72307b55ff1529e360d48 Mon Sep 17 00:00:00 2001 From: Kalmer Kaurson Date: Tue, 30 Jan 2024 13:19:27 +0200 Subject: [PATCH 2/4] anomaly test --- tests/drivers/db/TestCase.php | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/drivers/db/TestCase.php b/tests/drivers/db/TestCase.php index 7581312f4..dd44156d1 100644 --- a/tests/drivers/db/TestCase.php +++ b/tests/drivers/db/TestCase.php @@ -137,7 +137,6 @@ public function testDoneCount() $job = $this->createSimpleJob(); $this->getQueue()->push($job); - $this->assertSimpleJobDone($job); $this->assertEquals(1, $this->getQueue()->getStatisticsProvider()->getDoneCount()); From b22c58cdc110b60bc071af40019fb7c5e5d475a6 Mon Sep 17 00:00:00 2001 From: Kalmer Kaurson Date: Wed, 31 Jan 2024 12:00:53 +0200 Subject: [PATCH 3/4] restore old info action and deprecate it --- src/drivers/db/InfoAction.php | 95 ++++++++++++++++++++++++++++++ src/drivers/file/InfoAction.php | 99 ++++++++++++++++++++++++++++++++ src/drivers/redis/InfoAction.php | 54 +++++++++++++++++ 3 files changed, 248 insertions(+) create mode 100644 src/drivers/db/InfoAction.php create mode 100644 src/drivers/file/InfoAction.php create mode 100644 src/drivers/redis/InfoAction.php diff --git a/src/drivers/db/InfoAction.php b/src/drivers/db/InfoAction.php new file mode 100644 index 000000000..a91db6578 --- /dev/null +++ b/src/drivers/db/InfoAction.php @@ -0,0 +1,95 @@ + + */ +class InfoAction extends Action +{ + /** + * @var Queue + */ + public $queue; + + + /** + * Info about queue status. + */ + public function run() + { + Console::output($this->format('Jobs', Console::FG_GREEN)); + + Console::stdout($this->format('- waiting: ', Console::FG_YELLOW)); + Console::output($this->getWaiting()->count('*', $this->queue->db)); + + Console::stdout($this->format('- delayed: ', Console::FG_YELLOW)); + Console::output($this->getDelayed()->count('*', $this->queue->db)); + + Console::stdout($this->format('- reserved: ', Console::FG_YELLOW)); + Console::output($this->getReserved()->count('*', $this->queue->db)); + + Console::stdout($this->format('- done: ', Console::FG_YELLOW)); + Console::output($this->getDone()->count('*', $this->queue->db)); + } + + /** + * @return Query + */ + protected function getWaiting() + { + return (new Query()) + ->from($this->queue->tableName) + ->andWhere(['channel' => $this->queue->channel]) + ->andWhere(['reserved_at' => null]) + ->andWhere(['delay' => 0]); + } + + /** + * @return Query + */ + protected function getDelayed() + { + return (new Query()) + ->from($this->queue->tableName) + ->andWhere(['channel' => $this->queue->channel]) + ->andWhere(['reserved_at' => null]) + ->andWhere(['>', 'delay', 0]); + } + + /** + * @return Query + */ + protected function getReserved() + { + return (new Query()) + ->from($this->queue->tableName) + ->andWhere(['channel' => $this->queue->channel]) + ->andWhere('[[reserved_at]] is not null') + ->andWhere(['done_at' => null]); + } + + /** + * @return Query + */ + protected function getDone() + { + return (new Query()) + ->from($this->queue->tableName) + ->andWhere(['channel' => $this->queue->channel]) + ->andWhere('[[done_at]] is not null'); + } +} diff --git a/src/drivers/file/InfoAction.php b/src/drivers/file/InfoAction.php new file mode 100644 index 000000000..6a168f7fb --- /dev/null +++ b/src/drivers/file/InfoAction.php @@ -0,0 +1,99 @@ + + */ +class InfoAction extends Action +{ + /** + * @var Queue + */ + public $queue; + + + /** + * Info about queue status. + */ + public function run() + { + Console::output($this->format('Jobs', Console::FG_GREEN)); + + Console::stdout($this->format('- waiting: ', Console::FG_YELLOW)); + Console::output($this->getWaitingCount()); + + Console::stdout($this->format('- delayed: ', Console::FG_YELLOW)); + Console::output($this->getDelayedCount()); + + Console::stdout($this->format('- reserved: ', Console::FG_YELLOW)); + Console::output($this->getReservedCount()); + + Console::stdout($this->format('- done: ', Console::FG_YELLOW)); + Console::output($this->getDoneCount()); + } + + /** + * @return int + */ + protected function getWaitingCount() + { + $data = $this->getIndexData(); + return !empty($data['waiting']) ? count($data['waiting']) : 0; + } + + /** + * @return int + */ + protected function getDelayedCount() + { + $data = $this->getIndexData(); + return !empty($data['delayed']) ? count($data['delayed']) : 0; + } + + /** + * @return int + */ + protected function getReservedCount() + { + $data = $this->getIndexData(); + return !empty($data['reserved']) ? count($data['reserved']) : 0; + } + + /** + * @return int + */ + protected function getDoneCount() + { + $data = $this->getIndexData(); + $total = isset($data['lastId']) ? $data['lastId'] : 0; + return $total - $this->getDelayedCount() - $this->getWaitingCount(); + } + + protected function getIndexData() + { + static $data; + if ($data === null) { + $fileName = $this->queue->path . '/index.data'; + if (file_exists($fileName)) { + $data = call_user_func($this->queue->indexDeserializer, file_get_contents($fileName)); + } else { + $data = []; + } + } + + return $data; + } +} diff --git a/src/drivers/redis/InfoAction.php b/src/drivers/redis/InfoAction.php new file mode 100644 index 000000000..b9f1cd507 --- /dev/null +++ b/src/drivers/redis/InfoAction.php @@ -0,0 +1,54 @@ + + */ +class InfoAction extends Action +{ + /** + * @var Queue + */ + public $queue; + + + /** + * Info about queue status. + */ + public function run() + { + $prefix = $this->queue->channel; + $waiting = $this->queue->redis->llen("$prefix.waiting"); + $delayed = $this->queue->redis->zcount("$prefix.delayed", '-inf', '+inf'); + $reserved = $this->queue->redis->zcount("$prefix.reserved", '-inf', '+inf'); + $total = $this->queue->redis->get("$prefix.message_id"); + $done = $total - $waiting - $delayed - $reserved; + + Console::output($this->format('Jobs', Console::FG_GREEN)); + + Console::stdout($this->format('- waiting: ', Console::FG_YELLOW)); + Console::output($waiting); + + Console::stdout($this->format('- delayed: ', Console::FG_YELLOW)); + Console::output($delayed); + + Console::stdout($this->format('- reserved: ', Console::FG_YELLOW)); + Console::output($reserved); + + Console::stdout($this->format('- done: ', Console::FG_YELLOW)); + Console::output($done); + } +} From b13590432fed4f8e1f9fa9e59e2c20637fdf5b1d Mon Sep 17 00:00:00 2001 From: Kalmer Kaurson Date: Wed, 31 Jan 2024 15:07:00 +0200 Subject: [PATCH 4/4] fix tests --- tests/drivers/db/TestCase.php | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/drivers/db/TestCase.php b/tests/drivers/db/TestCase.php index dd44156d1..181f95b85 100644 --- a/tests/drivers/db/TestCase.php +++ b/tests/drivers/db/TestCase.php @@ -133,11 +133,13 @@ public function testReservedCount() public function testDoneCount() { - $this->startProcess(['php', 'yii', 'queue/listen', '1']); + $this->getQueue()->messageHandler = function () { + return true; + }; $job = $this->createSimpleJob(); $this->getQueue()->push($job); - $this->assertSimpleJobDone($job); + $this->getQueue()->run(false); $this->assertEquals(1, $this->getQueue()->getStatisticsProvider()->getDoneCount()); }