Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
chaz6chez committed Mar 19, 2024
2 parents c82e8b5 + 2448cd2 commit 5fc159a
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 15 deletions.
26 changes: 21 additions & 5 deletions src/Builders/Traits/MessageQueueMethod.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Workbunny\WebmanRqueue\Exceptions\WebmanRqueueException;
use Workbunny\WebmanRqueue\Headers;
use Workerman\Worker;
use function Workbunny\WebmanRqueue\config;

trait MessageQueueMethod
{
Expand Down Expand Up @@ -242,7 +243,7 @@ public function publishGetIds(string $body, array $headers = [], null|string $qu
'_header' => $header->toString(),
'_body' => $body,
])) {
$ids[] = $id;
$ids[$queue] = $id;
}
}
return $ids;
Expand Down Expand Up @@ -429,15 +430,30 @@ public function consume(Worker $worker, bool $del = true): bool
} catch (\Throwable $throwable) {
// republish
$header->_count = $header->_count + 1;
$header->_error = $throwable->getMessage();
$header->_error = "{$throwable->getMessage()} [{$throwable->getFile()}:{$throwable->getLine()}]";
// republish都将刷新使用redis stream自身的id,自定义id无效
$header->_id = '*';
if ($this->requeue($body, $header->toArray())) {
// blocking-retry ack
$this->ack($queueName, $groupName, $this->idsAdd($ids, $id), true);
// 如果错误超过error max count,则存入本地error表
if (
($errorMaxCount = config('plugin.workbunny.webman-rqueue.app.error_max_count', 0)) > 0 and
$header->_count >= $errorMaxCount
) {
// 存入
if ($this->tempInsert('error', $queueName, [
'_header' => $header->toArray(),
'_body' => $body
])) {
// blocking-retry ack
$this->ack($queueName, $groupName, $this->idsAdd($ids, $id), true);
}
} else {
if ($this->requeue($body, $header->toArray())) {
// blocking-retry ack
$this->ack($queueName, $groupName, $this->idsAdd($ids, $id), true);
// if (!$this->ack($queueName, $groupName, $this->idsAdd($ids, $id))) {
// $this->idsDel($ids, $id);
// }
}
}
}
}
Expand Down
32 changes: 24 additions & 8 deletions src/Builders/Traits/MessageTempMethod.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,23 @@ trait MessageTempMethod
protected ?int $_requeueTimer = null;

protected static array $_tables = [
'requeue', 'pending'
'requeue', 'pending', 'error'
];

/**
* 注册本地table
* @param string $table
* @return bool
*/
public function tempTableRegister(string $table): bool
{
if (!in_array($table, self::$_tables)) {
self::$_tables[] = $table;
return true;
}
return false;
}


/**
* 初始化temp
Expand All @@ -43,13 +57,15 @@ public function tempInit(): void
$builder = Db::schema('plugin.workbunny.webman-rqueue.local-storage');
foreach (self::$_tables as $table) {
if (!$builder->hasTable($table)) {
$builder->create($table, function (Blueprint $table) {
$table->id();
$table->string('queue');
$table->json('data');
$table->integer('create_at');
});
echo "local-storage db $table-table created. " . PHP_EOL;
try {
$builder->create($table, function (Blueprint $table) {
$table->id();
$table->string('queue');
$table->json('data');
$table->integer('create_at');
});
echo "local-storage db $table-table created. " . PHP_EOL;
} catch (\Throwable) {}
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/config/plugin/workbunny/webman-rqueue/app.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
<?php
return [
'enable' => true,

// 默认redis连接
'connection' => 'default',
'requeue_interval' => 0
// 本地消息重拾定时器间隔
'requeue_interval' => 0,
// 最大错误计数
'error_max_count' => 0
];

0 comments on commit 5fc159a

Please sign in to comment.