diff --git a/src/Builders/Traits/MessageQueueMethod.php b/src/Builders/Traits/MessageQueueMethod.php index 8015ad6..415c716 100644 --- a/src/Builders/Traits/MessageQueueMethod.php +++ b/src/Builders/Traits/MessageQueueMethod.php @@ -7,6 +7,7 @@ use Workbunny\WebmanRqueue\Exceptions\WebmanRqueueException; use Workbunny\WebmanRqueue\Headers; use Workerman\Worker; +use function Workbunny\WebmanRqueue\config; trait MessageQueueMethod { @@ -242,7 +243,7 @@ public function publishGetIds(string $body, array $headers = [], null|string $qu '_header' => $header->toString(), '_body' => $body, ])) { - $ids[] = $id; + $ids[$queue] = $id; } } return $ids; @@ -429,15 +430,30 @@ public function consume(Worker $worker, bool $del = true): bool } catch (\Throwable $throwable) { // republish $header->_count = $header->_count + 1; - $header->_error = $throwable->getMessage(); + $header->_error = "{$throwable->getMessage()} [{$throwable->getFile()}:{$throwable->getLine()}]"; // republish都将刷新使用redis stream自身的id,自定义id无效 $header->_id = '*'; - if ($this->requeue($body, $header->toArray())) { - // blocking-retry ack - $this->ack($queueName, $groupName, $this->idsAdd($ids, $id), true); + // 如果错误超过error max count,则存入本地error表 + if ( + ($errorMaxCount = config('plugin.workbunny.webman-rqueue.app.error_max_count', 0)) > 0 and + $header->_count >= $errorMaxCount + ) { + // 存入 + if ($this->tempInsert('error', $queueName, [ + '_header' => $header->toArray(), + '_body' => $body + ])) { + // blocking-retry ack + $this->ack($queueName, $groupName, $this->idsAdd($ids, $id), true); + } + } else { + if ($this->requeue($body, $header->toArray())) { + // blocking-retry ack + $this->ack($queueName, $groupName, $this->idsAdd($ids, $id), true); // if (!$this->ack($queueName, $groupName, $this->idsAdd($ids, $id))) { // $this->idsDel($ids, $id); // } + } } } } diff --git a/src/Builders/Traits/MessageTempMethod.php b/src/Builders/Traits/MessageTempMethod.php index 34938d3..ba7c3f6 100644 --- a/src/Builders/Traits/MessageTempMethod.php +++ b/src/Builders/Traits/MessageTempMethod.php @@ -17,9 +17,23 @@ trait MessageTempMethod protected ?int $_requeueTimer = null; protected static array $_tables = [ - 'requeue', 'pending' + 'requeue', 'pending', 'error' ]; + /** + * 注册本地table + * @param string $table + * @return bool + */ + public function tempTableRegister(string $table): bool + { + if (!in_array($table, self::$_tables)) { + self::$_tables[] = $table; + return true; + } + return false; + } + /** * 初始化temp @@ -43,13 +57,15 @@ public function tempInit(): void $builder = Db::schema('plugin.workbunny.webman-rqueue.local-storage'); foreach (self::$_tables as $table) { if (!$builder->hasTable($table)) { - $builder->create($table, function (Blueprint $table) { - $table->id(); - $table->string('queue'); - $table->json('data'); - $table->integer('create_at'); - }); - echo "local-storage db $table-table created. " . PHP_EOL; + try { + $builder->create($table, function (Blueprint $table) { + $table->id(); + $table->string('queue'); + $table->json('data'); + $table->integer('create_at'); + }); + echo "local-storage db $table-table created. " . PHP_EOL; + } catch (\Throwable) {} } } } diff --git a/src/config/plugin/workbunny/webman-rqueue/app.php b/src/config/plugin/workbunny/webman-rqueue/app.php index e94418b..d347755 100644 --- a/src/config/plugin/workbunny/webman-rqueue/app.php +++ b/src/config/plugin/workbunny/webman-rqueue/app.php @@ -1,7 +1,10 @@ true, - + // 默认redis连接 'connection' => 'default', - 'requeue_interval' => 0 + // 本地消息重拾定时器间隔 + 'requeue_interval' => 0, + // 最大错误计数 + 'error_max_count' => 0 ]; \ No newline at end of file