Skip to content

Commit da7d6c5

Browse files
committed
check if job notification already has been sent
1 parent 4b4d5a6 commit da7d6c5

File tree

4 files changed

+60
-15
lines changed

4 files changed

+60
-15
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
## 4.0.15-beta1
2+
**Maintainer**: Sandro Aebischer <aebischer@gyselroth.com>\
3+
**Date**: Thu Dec 08 18:45:00 CET 2022
4+
5+
### Bugfix
6+
* Check if job notification already has been sent
7+
18
## 4.0.4
29
**Maintainer**: Sandro Aebischer <aebischer@gyselroth.com>\
310
**Date**: Wed Jun 29 10:00:00 CET 2022

src/Queue.php

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,17 +373,29 @@ protected function rescheduleOrphanedJobs(): self
373373

374374
protected function failJobAndNotifyJobClass(Process $job): UpdateResult
375375
{
376+
$job_id = $job->getId();
377+
376378
$result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
377-
'_id' => $job->getId(),
379+
'_id' => $job_id,
378380
], [
379381
'$set' => ['status' => JobInterface::STATUS_FAILED],
380382
]);
381383

382384
if ($this->container !== null) {
383385
$instance = $this->container->get($job->getClass());
386+
sleep(rand(1, 5));
387+
$job = $this->scheduler->getJob($job_id)->toArray();
384388

385389
if (method_exists($instance, 'notification')) {
386-
$instance->notification(JobInterface::STATUS_FAILED, $job->toArray());
390+
if (!isset($job['notification_sent'])) {
391+
$instance->notification(JobInterface::STATUS_FAILED, $job);
392+
393+
$this->db->{$this->scheduler->getJobQueue()}->updateMany([
394+
'_id' => $job_id,
395+
], [
396+
'$set' => ['notification_sent' => true],
397+
]);
398+
}
387399
} else {
388400
$this->logger->info('method notification() does not exists on instance', [
389401
'category' => get_class($this),

src/Worker.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,9 +448,13 @@ protected function updateJob(array $job, int $status): bool
448448

449449
if ($this->container !== null) {
450450
$instance = $this->container->get($job['class']);
451+
$live_job = $this->scheduler->getJob($job['_id'])->toArray();
451452

452453
if (method_exists($instance, 'notification')) {
453-
$instance->notification($status, $this->scheduler->getJob($job['_id'])->toArray());
454+
if ($job['status'] !== $status && !isset($job['notification_sent'])) {
455+
$instance->notification($status, $live_job);
456+
$set['notification_sent'] = true;
457+
}
454458
} else {
455459
$this->logger->info('method notification() does not exists on instance', [
456460
'category' => get_class($this),

src/WorkerManager.php

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,13 @@ class WorkerManager
129129
*/
130130
protected $factory;
131131

132+
/**
133+
* Sent notification JobIds
134+
*
135+
* @var array
136+
*/
137+
protected $sent_notifications = [];
138+
132139
/**
133140
* Init queue.
134141
*/
@@ -400,18 +407,6 @@ protected function handleJob(array $event): self
400407
return $this;
401408

402409
case JobInterface::STATUS_CANCELED:
403-
if ($this->container !== null) {
404-
$instance = $this->container->get($event['class']);
405-
406-
if (method_exists($instance, 'notification')) {
407-
$instance->notification($event['status'], $this->scheduler->getJob($event['_id'])->toArray());
408-
} else {
409-
$this->logger->info('method notification() does not exists on instance', [
410-
'category' => get_class($this),
411-
]);
412-
}
413-
}
414-
415410
case JobInterface::STATUS_FAILED:
416411
case JobInterface::STATUS_TIMEOUT:
417412
$worker = array_search($event['_id'], $this->job_map, false);
@@ -432,6 +427,33 @@ protected function handleJob(array $event): self
432427
unset($this->job_map[(string) $worker]);
433428
posix_kill($this->forks[(string) $worker], SIGKILL);
434429
}
430+
if ($event['status'] === JobInterface::STATUS_CANCELED) {
431+
if ($this->container !== null) {
432+
$instance = $this->container->get($event['class']);
433+
434+
if (method_exists($instance, 'notification')) {
435+
sleep(rand(1, 5));
436+
$job = $this->scheduler->getJob($event['_id'])->toArray();
437+
438+
if (!isset($job['notification_sent']) && !in_array($event['_id'], $this->sent_notifications)) {
439+
$this->sent_notifications[] = $event['_id'];
440+
$instance->notification($event['status'], $job);
441+
442+
$this->db->{$this->scheduler->getJobQueue()}->updateOne([
443+
'_id' => $event['_id'],
444+
], [
445+
'$set' => [
446+
'notification_sent' => true,
447+
],
448+
]);
449+
}
450+
} else {
451+
$this->logger->info('method notification() does not exists on instance', [
452+
'category' => get_class($this),
453+
]);
454+
}
455+
}
456+
}
435457

436458
return $this;
437459
default:

0 commit comments

Comments
 (0)