diff --git a/_include/src/CmsExtension.php b/_include/src/CmsExtension.php index 24a355c2..5fd136dd 100644 --- a/_include/src/CmsExtension.php +++ b/_include/src/CmsExtension.php @@ -95,7 +95,7 @@ public function buildContainer(Container $container): void return match ($db_type) { 'mysql' => new PDO("mysql:host=$db_host;dbname=$db_name;charset=utf8mb4", $db_username, $db_password), - 'sqlite' => PdoSqliteFactory::create($container->getParameter('root_dir').$db_name, $p_connect), + 'sqlite' => PdoSqliteFactory::create($container->getParameter('root_dir') . $db_name, $p_connect), 'pgsql' => new PDO("pgsql:host=$db_host;dbname=$db_name", $db_username, $db_password), default => throw new \RuntimeException(sprintf('Unsupported db_type="%s"', $db_type)), }; @@ -183,14 +183,18 @@ public function buildContainer(Container $container): void }); $container->set(QueuePublisher::class, function (Container $container) { - return new QueuePublisher($container->get(\PDO::class)); + return new QueuePublisher( + $container->get(\PDO::class), + $container->getParameter('db_prefix'), + ); }); $container->set(QueueConsumer::class, function (Container $container) { return new QueueConsumer( $container->get(\PDO::class), + $container->getParameter('db_prefix'), $container->get(LoggerInterface::class), $container->get(RecommendationProvider::class), - $container->get(ThumbnailGenerator::class) + $container->get(ThumbnailGenerator::class), ); }); $container->set(RecommendationProvider::class, function (Container $container) { diff --git a/_include/src/Logger/Logger.php b/_include/src/Logger/Logger.php index e9bf1a0f..c5404b2a 100644 --- a/_include/src/Logger/Logger.php +++ b/_include/src/Logger/Logger.php @@ -229,11 +229,11 @@ private static function formatTrace(array $trace): string $stack = ''; $i = 1; foreach ($trace as $node) { - $stack .= "#$i " . $node['file'] . ":" . $node['line'] . " "; + $stack .= "#$i " . (isset($node['file']) ? $node['file'] . ':' . $node['line'] . ' ' : ''); if (isset($node['class'])) { - $stack .= $node['class'] . "->"; + $stack .= $node['class'] . '->'; } - $stack .= $node['function'] . "()" . PHP_EOL; + $stack .= $node['function'] . '()' . PHP_EOL; $i++; } return $stack; diff --git a/_include/src/Queue/QueueConsumer.php b/_include/src/Queue/QueueConsumer.php index 30f6c693..21de0224 100644 --- a/_include/src/Queue/QueueConsumer.php +++ b/_include/src/Queue/QueueConsumer.php @@ -1,38 +1,48 @@ -pdo = $pdo; + public function __construct( + private \PDO $pdo, + private string $dbPrefix, + private LoggerInterface $logger, + QueueHandlerInterface ...$handlers + ) { $this->handlers = $handlers; - $this->logger = $logger; } + /** + * Fetches and processes a job from the queue. + * + * The queue is stored in the 'queue' table of database. Jobs are fetched and locked in a transaction. + * + * NOWAIT prevents parallel job processing. It can be dangerous in case of several heavy jobs + * (PHP-FPM workers can be exhausted). + * + * @return bool + */ public function runQueue(): bool { $driverName = $this->pdo->getAttribute(\PDO::ATTR_DRIVER_NAME); $sql = match ($driverName) { - 'mysql' => 'SELECT * FROM queue LIMIT 1 FOR UPDATE', // TODO figure out how to detect support for SKIP LOCKED to make a fallback - 'pgsql' => 'SELECT * FROM queue LIMIT 1 FOR UPDATE SKIP LOCKED', - 'sqlite' => 'SELECT * FROM queue LIMIT 1', + 'mysql', 'pgsql' => 'SELECT * FROM ' . $this->dbPrefix . 'queue LIMIT 1 FOR UPDATE NOWAIT', + 'sqlite' => 'SELECT * FROM ' . $this->dbPrefix . 'queue LIMIT 1', default => throw new \RuntimeException(sprintf('Driver "%s" is not supported.', $driverName)), }; @@ -59,7 +69,7 @@ public function runQueue(): bool $this->logger->warning('Throwable occurred while processing queue: ' . $e->getMessage(), ['exception' => $e]); } - $statement = $this->pdo->prepare('DELETE FROM queue WHERE id = :id AND code = :code'); + $statement = $this->pdo->prepare('DELETE FROM ' . $this->dbPrefix . 'queue WHERE id = :id AND code = :code'); $statement->execute([ 'id' => $job['id'], 'code' => $job['code'], diff --git a/_include/src/Queue/QueueHandlerInterface.php b/_include/src/Queue/QueueHandlerInterface.php index ca420a46..0ae21d06 100644 --- a/_include/src/Queue/QueueHandlerInterface.php +++ b/_include/src/Queue/QueueHandlerInterface.php @@ -1,10 +1,12 @@ -pdo = $pdo; } public function publish(string $id, string $code, array $payload = []): void @@ -40,13 +39,13 @@ public function publish(string $id, string $code, array $payload = []): void // for releasing a lock even if the row was just locked with SELECT ... FOR UPDATE and not modified yet. // Moreover, there is no INSERT ... NOWAIT operator. Let's make it by hands. $this->pdo->exec('SET innodb_lock_wait_timeout = 0;'); - $statement = $this->pdo->prepare('INSERT IGNORE INTO queue (id, code, payload) VALUES (:id, :code, :payload)'); + $statement = $this->pdo->prepare('INSERT IGNORE INTO ' . $this->dbPrefix . 'queue (id, code, payload) VALUES (:id, :code, :payload)'); break; case 'sqlite': - $this->pdo->setAttribute(\PDO::ATTR_TIMEOUT, 0); + $this->pdo->setAttribute(\PDO::ATTR_TIMEOUT, 1); case 'pgsql': - $statement = $this->pdo->prepare('INSERT INTO queue (id, code, payload) VALUES (:id, :code, :payload) ON CONFLICT DO NOTHING'); + $statement = $this->pdo->prepare('INSERT INTO ' . $this->dbPrefix . 'queue (id, code, payload) VALUES (:id, :code, :payload) ON CONFLICT DO NOTHING'); break; default: diff --git a/_tests/integration/QueueCest.php b/_tests/integration/QueueCest.php index ffdce46f..14988159 100644 --- a/_tests/integration/QueueCest.php +++ b/_tests/integration/QueueCest.php @@ -39,22 +39,34 @@ public function testQueue(IntegrationTester $I) // Test duplication $queuePublisher->publish('test_id', 'code', ['data']); + // Test another write + $queuePublisher->publish('test_id0', 'code', ['data']); + $consumerApplication = $I->createApplication(); - $queueConsumer = $consumerApplication->container->get(QueueConsumer::class); + /** @var QueueConsumer $queueConsumer */ + $queueConsumer = $consumerApplication->container->get(QueueConsumer::class); + $I->assertTrue($queueConsumer->runQueue(), 'Job was processed'); + $I->assertTrue($queueConsumer->runQueue(), 'Job was processed'); + $I->assertFalse($queueConsumer->runQueue(), 'No more jobs'); + + // Test serial run + $queuePublisher->publish('test_id', 'code', ['data']); $I->assertTrue($queueConsumer->runQueue(), 'Job was processed'); $I->assertFalse($queueConsumer->runQueue(), 'No more jobs'); $queuePublisher->publish('test_id2', 'code', ['data']); - // Some copy-paste from QueuePublisher::publish() to simulate a parallel run + /** + * Some copy-paste to simulate a parallel run + * @see QueueConsumer::runQueue() + */ /** @var PDO $consumerPdo */ $consumerPdo = $consumerApplication->container->get(\PDO::class); $consumerPdo->beginTransaction(); $driverName = $consumerPdo->getAttribute(\PDO::ATTR_DRIVER_NAME); $sql = match ($driverName) { - 'mysql' => 'SELECT * FROM queue LIMIT 1 FOR UPDATE', // TODO figure out how to detect support for SKIP LOCKED to make a fallback - 'pgsql' => 'SELECT * FROM queue LIMIT 1 FOR UPDATE SKIP LOCKED', + 'mysql', 'pgsql' => 'SELECT * FROM queue LIMIT 1 FOR UPDATE NOWAIT', 'sqlite' => 'SELECT * FROM queue LIMIT 1', default => throw new InvalidEnvironmentException(sprintf('Driver "%s" is not supported.', $driverName)), }; @@ -67,7 +79,19 @@ public function testQueue(IntegrationTester $I) // Test no lock wait when the parallel transaction is running. $queuePublisher->publish('test_id2', 'code', ['data']); + $queuePublisher->publish('test_id3', 'code', ['data']); $consumerPdo->rollBack(); + $statement = null; + + $queuePublisher->publish('test_id4', 'code', ['data']); + + if ($driverName !== 'sqlite') { + // Sqlite loses test_id3 being written during parallel consumer transaction. + $I->assertTrue($queueConsumer->runQueue(), 'Job was processed'); + } + $I->assertTrue($queueConsumer->runQueue(), 'Job was processed'); + $I->assertTrue($queueConsumer->runQueue(), 'Job was processed'); + $I->assertFalse($queueConsumer->runQueue(), 'No more jobs'); } }