Skip to content

Commit

Permalink
check if job notification already has been sent
Browse files Browse the repository at this point in the history
  • Loading branch information
s-aebischer committed Dec 13, 2022
1 parent d686b08 commit 25370fb
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 15 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 4.0.15
**Maintainer**: Sandro Aebischer <[email protected]>\
**Date**: Tue Dez 13 12:00:00 CET 2022

### Bugfix
* check if job notification already has been sent

## 4.0.14
**Maintainer**: Sandro Aebischer <[email protected]>\
**Date**: Fri Nov 25 11:15:00 CET 2022
Expand Down
16 changes: 14 additions & 2 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -373,17 +373,29 @@ protected function rescheduleOrphanedJobs(): self

protected function failJobAndNotifyJobClass(Process $job): UpdateResult
{
$job_id = $job->getId();

$result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
'_id' => $job->getId(),
'_id' => $job_id,
], [
'$set' => ['status' => JobInterface::STATUS_FAILED],
]);

if ($this->container !== null) {
$instance = $this->container->get($job->getClass());
sleep(rand(1, 5));
$job = $this->scheduler->getJob($job_id)->toArray();

if (method_exists($instance, 'notification')) {
$instance->notification(JobInterface::STATUS_FAILED, $job->toArray());
if (!isset($job['notification_sent'])) {
$instance->notification(JobInterface::STATUS_FAILED, $job);

$this->db->{$this->scheduler->getJobQueue()}->updateMany([
'_id' => $job_id,
], [
'$set' => ['notification_sent' => true],
]);
}
} else {
$this->logger->info('method notification() does not exists on instance', [
'category' => get_class($this),
Expand Down
6 changes: 5 additions & 1 deletion src/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,13 @@ protected function updateJob(array $job, int $status): bool

if ($this->container !== null) {
$instance = $this->container->get($job['class']);
$live_job = $this->scheduler->getJob($job['_id'])->toArray();

if (method_exists($instance, 'notification')) {
$instance->notification($status, $this->scheduler->getJob($job['_id'])->toArray());
if ($job['status'] !== $status && !isset($job['notification_sent'])) {
$instance->notification($status, $live_job);
$set['notification_sent'] = true;
}
} else {
$this->logger->info('method notification() does not exists on instance', [
'category' => get_class($this),
Expand Down
46 changes: 34 additions & 12 deletions src/WorkerManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ class WorkerManager
*/
protected $factory;

/**
* Sent notification JobIds
*
* @var array
*/
protected $sent_notifications = [];

/**
* Init queue.
*/
Expand Down Expand Up @@ -400,18 +407,6 @@ protected function handleJob(array $event): self
return $this;

case JobInterface::STATUS_CANCELED:
if ($this->container !== null) {
$instance = $this->container->get($event['class']);

if (method_exists($instance, 'notification')) {
$instance->notification($event['status'], $this->scheduler->getJob($event['_id'])->toArray());
} else {
$this->logger->info('method notification() does not exists on instance', [
'category' => get_class($this),
]);
}
}

case JobInterface::STATUS_FAILED:
case JobInterface::STATUS_TIMEOUT:
$worker = array_search($event['_id'], $this->job_map, false);
Expand All @@ -432,6 +427,33 @@ protected function handleJob(array $event): self
unset($this->job_map[(string) $worker]);
posix_kill($this->forks[(string) $worker], SIGKILL);
}
if ($event['status'] === JobInterface::STATUS_CANCELED) {
if ($this->container !== null) {
$instance = $this->container->get($event['class']);

if (method_exists($instance, 'notification')) {
sleep(rand(1, 5));
$job = $this->scheduler->getJob($event['_id'])->toArray();

if (!isset($job['notification_sent']) && !in_array($event['_id'], $this->sent_notifications)) {
$this->sent_notifications[] = $event['_id'];
$instance->notification($event['status'], $job);

$this->db->{$this->scheduler->getJobQueue()}->updateOne([
'_id' => $event['_id'],
], [
'$set' => [
'notification_sent' => true,
],
]);
}
} else {
$this->logger->info('method notification() does not exists on instance', [
'category' => get_class($this),
]);
}
}
}

return $this;
default:
Expand Down

0 comments on commit 25370fb

Please sign in to comment.