Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Beanstalk driver to version 5 #498

Merged
merged 9 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"enqueue/amqp-bunny": "^0.10.0",
"enqueue/amqp-ext": "^0.10.8",
"enqueue/stomp": "^0.10.0",
"pda/pheanstalk": "3.2.1",
"pda/pheanstalk": "^v5.0.0",
"aws/aws-sdk-php": ">=2.4",
"vimeo/psalm": "^5.10.0"
},
Expand Down
1 change: 0 additions & 1 deletion psalm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
<directory name="src/debug" />
<directory name="src/gii" />
<directory name="src/drivers/db/migrations" />
<directory name="src/drivers/beanstalk" />
</ignoreFiles>
</projectFiles>
<extraFiles>
Expand Down
3 changes: 0 additions & 3 deletions src/drivers/beanstalk/Command.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ public function actionRun(): ?int
*/
public function actionListen(int $timeout = 3): ?int
{
if (!is_numeric($timeout)) {
throw new Exception('Timeout must be numeric.');
}
if ($timeout < 1) {
throw new Exception('Timeout must be greater than zero.');
}
Expand Down
139 changes: 90 additions & 49 deletions src/drivers/beanstalk/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@

namespace yii\queue\beanstalk;

use Pheanstalk\Exception\ServerException;
use Pheanstalk\Job;
use Exception;
use Pheanstalk\Contract\PheanstalkPublisherInterface;
use Pheanstalk\Contract\SocketFactoryInterface;
use Pheanstalk\Pheanstalk;
use Pheanstalk\PheanstalkInterface;
use Pheanstalk\Response;
use Pheanstalk\Values\JobId;
use Pheanstalk\Values\Timeout;
use Pheanstalk\Values\TubeName;
use Pheanstalk\Values\TubeStats;
use yii\base\InvalidArgumentException;
use yii\queue\cli\Queue as CliQueue;

/**
* Beanstalk Queue.
*
* @property-read TubeName $tubeName
* @property-read object $statsTube Tube statistics.
*
* @author Roman Zhuravlev <[email protected]>
Expand All @@ -34,7 +38,15 @@ class Queue extends CliQueue
/**
* @var int connection port
*/
public int $port = PheanstalkInterface::DEFAULT_PORT;
public int $port = SocketFactoryInterface::DEFAULT_PORT;
/**
* @var int|null connection timeout in seconds
*/
public ?int $connectTimeout = null;
/**
* @var int|null receive timeout in seconds
*/
public ?int $receiveTimeout = null;
/**
* @var string beanstalk tube
*/
Expand All @@ -44,11 +56,13 @@ class Queue extends CliQueue
*/
public string $commandClass = Command::class;

private ?Pheanstalk $pheanstalk = null;

/**
* Listens queue and runs each job.
*
* @param bool $repeat whether to continue listening when queue is empty.
* @param int $timeout number of seconds to wait for next message.
* @param int<0, max> $timeout number of seconds to wait for next message.
* @return null|int exit code.
* @internal for worker command only.
* @since 2.0.2
Expand All @@ -57,15 +71,24 @@ public function run(bool $repeat, int $timeout = 0): ?int
{
return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) {
while ($canContinue()) {
if ($payload = $this->getPheanstalk()->reserveFromTube($this->tube, $timeout)) {
$info = $this->getPheanstalk()->statsJob($payload);
if ($this->handleMessage(
$payload->getId(),
$payload->getData(),
(int)$info->ttr,
(int)$info->reserves
)) {
$this->getPheanstalk()->delete($payload);
$pheanstalk = $this->getPheanstalk();
$pheanstalk->watch($this->getTubeName());

$job = $pheanstalk->reserveWithTimeout($timeout);
if (null !== $job) {
try {
$info = $pheanstalk->statsJob($job);

if ($this->handleMessage(
$job->getId(),
$job->getData(),
$info->timeToRelease,
$info->reserves
)) {
$pheanstalk->delete($job);
}
} catch (Exception) {
$pheanstalk->release($job);
}
} elseif (!$repeat) {
break;
Expand All @@ -84,39 +107,32 @@ public function status($id): int
}

try {
$stats = $this->getPheanstalk()->statsJob($id);
if ($stats['state'] === 'reserved') {
$stats = $this->getPheanstalk()->statsJob(new JobId($id));

if ($stats->state->value === 'reserved') {
return self::STATUS_RESERVED;
}

return self::STATUS_WAITING;
} catch (ServerException $e) {
if ($e->getMessage() === 'Server reported NOT_FOUND') {
return self::STATUS_DONE;
}

throw $e;
} catch (Exception) {
s1lver marked this conversation as resolved.
Show resolved Hide resolved
return self::STATUS_DONE;
}
}

/**
* Removes a job by ID.
*
* @param int $id of a job
* @param int|string $id of a job
* @return bool
* @since 2.0.1
*/
public function remove(int $id): bool
public function remove(int|string $id): bool
{
try {
$this->getPheanstalk()->delete(new Job($id, null));
$this->getPheanstalk()->delete(new JobId($id));
return true;
} catch (ServerException $e) {
if (str_starts_with($e->getMessage(), 'NOT_FOUND')) {
return false;
}

throw $e;
} catch (Exception) {
s1lver marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
}

Expand All @@ -125,33 +141,58 @@ public function remove(int $id): bool
*/
protected function pushMessage(string $payload, int $ttr, int $delay, mixed $priority): int|string|null
{
return $this->getPheanstalk()->putInTube(
$this->tube,
$payload,
$priority ?: PheanstalkInterface::DEFAULT_PRIORITY,
$delay,
$ttr
);
$pheanstalk = $this->getPheanstalk();
$pheanstalk->useTube($this->getTubeName());

$result = $pheanstalk
->put(
$payload,
$priority ?: PheanstalkPublisherInterface::DEFAULT_PRIORITY,
$delay, // Seconds to wait before job becomes ready
$ttr // Time To Run: seconds a job can be reserved for
);
return $result->getId();
}

/**
* @return object tube statistics
* @return TubeStats tube statistics
*/
public function getStatsTube(): object
public function getStatsTube(): TubeStats
{
return $this->getPheanstalk()->statsTube($this->tube);
return $this->getPheanstalk()->statsTube($this->getTubeName());
}

/**
* @return Pheanstalk
*/
protected function getPheanstalk(): Pheanstalk
{
if (!$this->_pheanstalk) {
$this->_pheanstalk = new Pheanstalk($this->host, $this->port);
if (null === $this->pheanstalk) {
$this->pheanstalk = Pheanstalk::create(
$this->host,
$this->port,
$this->getConnectTimeout(),
$this->getReceiveTimeout()
);
}
return $this->_pheanstalk;
return $this->pheanstalk;
}

protected function getTubeName(): TubeName
{
return new TubeName($this->tube);
}

private $_pheanstalk;
private function getConnectTimeout(): ?Timeout
{
if (null === $this->connectTimeout) {
return null;
}
return new Timeout($this->connectTimeout);
}

private function getReceiveTimeout(): ?Timeout
{
if (null === $this->receiveTimeout) {
return null;
}
return new Timeout($this->receiveTimeout);
}
}
2 changes: 1 addition & 1 deletion tests/app/PriorityJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class PriorityJob extends BaseObject implements JobInterface
{
public int $number;

public function execute(Queue $queue)
public function execute(Queue $queue): void
{
file_put_contents(self::getFileName(), $this->number, FILE_APPEND);
}
Expand Down
9 changes: 5 additions & 4 deletions tests/app/RetryJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

namespace tests\app;

use Exception;
use Yii;
use yii\base\BaseObject;
use yii\queue\Queue;
Expand All @@ -22,17 +23,17 @@
*/
class RetryJob extends BaseObject implements RetryableJobInterface
{
public $uid;
public string $uid;

public function execute(Queue $queue)
public function execute(Queue $queue): void
{
file_put_contents($this->getFileName(), 'a', FILE_APPEND);
throw new \Exception('Planned error.');
throw new Exception('Planned error.');
}

public function getFileName(): bool|string
{
return Yii::getAlias("@runtime/job-{$this->uid}.lock");
return Yii::getAlias("@runtime/job-$this->uid.lock");
}

public function getTtr(): int
Expand Down
2 changes: 1 addition & 1 deletion tests/app/SimpleJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SimpleJob extends BaseObject implements JobInterface
{
public string $uid;

public function execute(Queue $queue)
public function execute(Queue $queue): void
{
file_put_contents($this->getFileName(), '');
}
Expand Down
16 changes: 13 additions & 3 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ services:
- 8.8.8.8
- 4.4.4.4
environment:
COMPOSER_ALLOW_SUPERUSER: 1
XDEBUG_MODE: ${XDEBUG_MODE:-off} # Setup "debug" to enable debugging
XDEBUG_CONFIG: "client_host=host.docker.internal"
XDEBUG_TRIGGER: ${XDEBUG_TRIGGER:-yes}
PHP_IDE_CONFIG: "serverName=yii2-queue"
MYSQL_HOST: mysql
MYSQL_USER: yii2_queue_test
MYSQL_PASSWORD: yii2_queue_test
Expand All @@ -31,7 +36,6 @@ services:
RABBITMQ_PASSWORD: guest
BEANSTALK_HOST: beanstalk
GEARMAN_HOST: gearmand
COMPOSER_ALLOW_SUPERUSER: 1
ACTIVEMQ_HOST: activemq
AWS_KEY: ${AWS_KEY:-admin}
AWS_SECRET: ${AWS_SECRET:-admin}
Expand All @@ -50,6 +54,8 @@ services:
- localstack
networks:
net: {}
extra_hosts:
- host.docker.internal:${HOST_IP:-host-gateway}

# https://hub.docker.com/_/mysql/
mysql:
Expand Down Expand Up @@ -93,9 +99,9 @@ services:
networks:
net: {}

# https://hub.docker.com/r/schickling/beanstalkd/
# https://hub.docker.com/r/rayyounghong/beanstalkd/
beanstalk:
image: schickling/beanstalkd
image: rayyounghong/beanstalkd
ports:
- "11301:11300"
networks:
Expand Down Expand Up @@ -135,4 +141,8 @@ services:

networks:
net:
driver: bridge
name: yii2_queue_net
ipam:
config:
- subnet: 172.18.0.0/16
1 change: 0 additions & 1 deletion tests/drivers/CliTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ private function prepareCmd(array $cmd): array
{
$class = new ReflectionClass($this->getQueue());
$method = $class->getMethod('getCommandId');
$method->setAccessible(true);

$replace = [
'php' => PHP_BINARY,
Expand Down
Loading
Loading