Skip to content
This repository was archived by the owner on Oct 24, 2025. It is now read-only.

Commit 46ab5d2

Browse files
Switch to Laravels own queue worker (#13)
* Switch to Laravels own queue worker * cleanup * cleanup * formatting * drop extra method * rename method * Add job logging * Use pcntl * Supply the queue handler with a real timeout value * Utilise native shouldFailOnTimeout for timeouts * Remove unused import * Ensure job is deleted after successful invocation * Fix name * formatting * Reverse job timeout behavior * some formatting Co-authored-by: Till Krüss <tillkruss@users.noreply.github.com>
1 parent 3c5119c commit 46ab5d2

File tree

4 files changed

+204
-63
lines changed

4 files changed

+204
-63
lines changed

src/BrefServiceProvider.php

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,17 @@
55
use Monolog\Formatter\JsonFormatter;
66

77
use Illuminate\Log\LogManager;
8-
use Illuminate\Contracts\Http\Kernel;
8+
99
use Illuminate\Support\Facades\Config;
1010
use Illuminate\Support\ServiceProvider;
1111

12+
use Illuminate\Contracts\Http\Kernel;
13+
use Illuminate\Contracts\Events\Dispatcher;
14+
15+
use Illuminate\Queue\Events\JobProcessed;
16+
use Illuminate\Queue\Events\JobProcessing;
17+
use Illuminate\Queue\Events\JobExceptionOccurred;
18+
1219
class BrefServiceProvider extends ServiceProvider
1320
{
1421
/**
@@ -60,7 +67,7 @@ public function register()
6067
*
6168
* @return void
6269
*/
63-
public function boot()
70+
public function boot(Dispatcher $dispatcher, LogManager $logManager)
6471
{
6572
if ($this->app->runningInConsole()) {
6673
$this->publishes([
@@ -71,6 +78,27 @@ public function boot()
7178
__DIR__ . '/../stubs/runtime.php' => base_path('php/runtime.php'),
7279
], 'bref-runtime');
7380
}
81+
82+
$dispatcher->listen(
83+
fn (JobProcessing $event) => $logManager->info(
84+
"Processing job {$event->job->getJobId()}",
85+
['name' => $event->job->resolveName()]
86+
)
87+
);
88+
89+
$dispatcher->listen(
90+
fn (JobProcessed $event) => $logManager->info(
91+
"Processed job {$event->job->getJobId()}",
92+
['name' => $event->job->resolveName()]
93+
)
94+
);
95+
96+
$dispatcher->listen(
97+
fn (JobExceptionOccurred $event) => $logManager->info(
98+
"Job failed {$event->job->getJobId()}",
99+
['name' => $event->job->resolveName()]
100+
)
101+
);
74102
}
75103

76104
/**

src/Queue/QueueHandler.php

Lines changed: 81 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,41 @@
22

33
namespace CacheWerk\BrefLaravelBridge\Queue;
44

5-
use Throwable;
65
use RuntimeException;
76

7+
use Aws\Sqs\SqsClient;
8+
89
use Bref\Context\Context;
910
use Bref\Event\Sqs\SqsEvent;
1011
use Bref\Event\Sqs\SqsHandler;
12+
use Bref\Event\Sqs\SqsRecord;
1113

14+
use Illuminate\Queue\SqsQueue;
15+
use Illuminate\Queue\Jobs\SqsJob;
16+
use Illuminate\Queue\QueueManager;
17+
use Illuminate\Queue\WorkerOptions;
1218
use Illuminate\Container\Container;
1319
use Illuminate\Contracts\Events\Dispatcher;
1420
use Illuminate\Contracts\Debug\ExceptionHandler;
1521

16-
use Illuminate\Log\LogManager;
17-
18-
use Illuminate\Queue\SqsQueue;
19-
use Illuminate\Queue\Jobs\SqsJob;
20-
use Illuminate\Queue\QueueManager;
21-
use Illuminate\Queue\Events\JobProcessed;
22-
use Illuminate\Queue\Events\JobProcessing;
23-
use Illuminate\Queue\Events\JobExceptionOccurred;
22+
use CacheWerk\BrefLaravelBridge\MaintenanceMode;
2423

2524
class QueueHandler extends SqsHandler
2625
{
26+
/**
27+
* The AWS SQS client.
28+
*
29+
* @var \Aws\Sqs\SqsClient
30+
*/
31+
protected SqsClient $sqs;
32+
33+
/**
34+
* Number of seconds before Lambda invocation deadline to timeout the job.
35+
*
36+
* @var float
37+
*/
38+
protected const JOB_TIMEOUT_SAFETY_MARGIN = 1.0;
39+
2740
/**
2841
* Creates a new SQS queue handler instance.
2942
*
@@ -60,78 +73,85 @@ public function __construct(
6073
*/
6174
public function handleSqs(SqsEvent $event, Context $context): void
6275
{
76+
$worker = $this->container->makeWith(Worker::class, [
77+
'isDownForMaintenance' => fn () => MaintenanceMode::active(),
78+
]);
79+
6380
foreach ($event->getRecords() as $sqsRecord) {
64-
$recordData = $sqsRecord->toArray();
65-
66-
$jobData = [
67-
'MessageId' => $recordData['messageId'],
68-
'ReceiptHandle' => $recordData['receiptHandle'],
69-
'Attributes' => $recordData['attributes'],
70-
'Body' => $recordData['body'],
71-
];
72-
73-
$job = new SqsJob(
74-
$this->container,
75-
$this->sqs,
76-
$jobData,
81+
$timeout = $this->calculateJobTimeout($context->getRemainingTimeInMillis());
82+
83+
$worker->runSqsJob(
84+
$job = $this->marshalJob($sqsRecord),
7785
$this->connection,
78-
$this->queue,
86+
$this->gatherWorkerOptions($timeout),
7987
);
8088

81-
$this->process($this->connection, $job);
89+
if (! $job->hasFailed() && ! $job->isDeleted()) {
90+
$job->delete();
91+
}
8292
}
8393
}
8494

8595
/**
86-
* @see \Illuminate\Queue\Worker::process()
87-
*/
88-
protected function process(string $connectionName, SqsJob $job): void
89-
{
90-
try {
91-
$this->raiseBeforeJobEvent($connectionName, $job);
92-
93-
$job->fire();
94-
95-
$this->raiseAfterJobEvent($connectionName, $job);
96-
} catch (Throwable $exception) {
97-
$this->raiseExceptionOccurredJobEvent($connectionName, $job, $exception);
98-
99-
$this->exceptions->report($exception);
100-
101-
throw $exception;
102-
}
103-
}
104-
105-
/**
106-
* @see \Illuminate\Queue\Worker::raiseBeforeJobEvent()
96+
* Marshal the job with the given Bref SQS record.
97+
*
98+
* @param \Bref\Event\Sqs\SqsRecord $sqsRecord
99+
* @return \Illuminate\Queue\Jobs\SqsJob
107100
*/
108-
protected function raiseBeforeJobEvent(string $connectionName, SqsJob $job): void
101+
protected function marshalJob(SqsRecord $sqsRecord): SqsJob
109102
{
110-
$this->container->make(LogManager::class)
111-
->info("Processing job {$job->getJobId()}", ['name' => $job->resolveName()]);
112-
113-
$this->events->dispatch(new JobProcessing($connectionName, $job));
103+
$message = [
104+
'MessageId' => $sqsRecord->getMessageId(),
105+
'ReceiptHandle' => $sqsRecord->getReceiptHandle(),
106+
'Body' => $sqsRecord->getBody(),
107+
'Attributes' => $sqsRecord->toArray()['attributes'],
108+
'MessageAttributes' => $sqsRecord->getMessageAttributes(),
109+
];
110+
111+
return new SqsJob(
112+
$this->container,
113+
$this->sqs,
114+
$message,
115+
$this->connection,
116+
$this->queue,
117+
);
114118
}
115119

116120
/**
117-
* @see \Illuminate\Queue\Worker::raiseAfterJobEvent()
121+
* Gather all of the queue worker options as a single object.
122+
*
123+
* @param int $timeout
124+
* @return \Illuminate\Queue\WorkerOptions
118125
*/
119-
protected function raiseAfterJobEvent(string $connectionName, SqsJob $job): void
126+
protected function gatherWorkerOptions(int $timeout): WorkerOptions
120127
{
121-
$this->container->make(LogManager::class)
122-
->info("Processed job {$job->getJobId()}", ['name' => $job->resolveName()]);
128+
$options = [
129+
0, // backoff
130+
512, // memory
131+
$timeout, // timeout
132+
0, // sleep
133+
3, // maxTries
134+
false, // force
135+
false, // stopWhenEmpty
136+
0, // maxJobs
137+
0, // maxTime
138+
];
139+
140+
if (property_exists(WorkerOptions::class, 'name')) {
141+
$options = array_merge(['default'], $options);
142+
}
123143

124-
$this->events->dispatch(new JobProcessed($connectionName, $job));
144+
return new WorkerOptions(...$options);
125145
}
126146

127147
/**
128-
* @see \Illuminate\Queue\Worker::raiseExceptionOccurredJobEvent()
148+
* Calculate the timeout for a job
149+
*
150+
* @param int $remainingInvocationTimeInMs
151+
* @return int
129152
*/
130-
protected function raiseExceptionOccurredJobEvent(string $connectionName, SqsJob $job, Throwable $th): void
153+
protected function calculateJobTimeout(int $remainingInvocationTimeInMs): int
131154
{
132-
$this->container->make(LogManager::class)
133-
->error("Job failed {$job->getJobId()}", ['name' => $job->resolveName()]);
134-
135-
$this->events->dispatch(new JobExceptionOccurred($connectionName, $job, $th));
155+
return max((int) (($remainingInvocationTimeInMs - self::JOB_TIMEOUT_SAFETY_MARGIN) / 1000), 0);
136156
}
137157
}

src/Queue/SqsJob.php

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
namespace CacheWerk\BrefLaravelBridge\Queue;
4+
5+
use Illuminate\Queue\Jobs\SqsJob as LaravelSqsJob;
6+
7+
class SqsJob extends LaravelSqsJob
8+
{
9+
/**
10+
* {@inheritDoc}
11+
*/
12+
public function release($delay = 0)
13+
{
14+
$this->released = true;
15+
16+
$payload = $this->payload();
17+
$payload['attempts'] = ($payload['attempts'] ?? 0) + 1;
18+
19+
$this->sqs->deleteMessage([
20+
'QueueUrl' => $this->queue,
21+
'ReceiptHandle' => $this->job['ReceiptHandle'],
22+
]);
23+
24+
$this->sqs->sendMessage([
25+
'QueueUrl' => $this->queue,
26+
'MessageBody' => json_encode($payload),
27+
'DelaySeconds' => $this->secondsUntil($delay),
28+
]);
29+
}
30+
31+
/**
32+
* {@inheritDoc}
33+
*/
34+
public function attempts()
35+
{
36+
return ($this->payload()['attempts'] ?? 0) + 1;
37+
}
38+
}

src/Queue/Worker.php

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<?php
2+
3+
namespace CacheWerk\BrefLaravelBridge\Queue;
4+
5+
use Illuminate\Contracts\Queue\Job;
6+
use Illuminate\Queue\WorkerOptions;
7+
use Illuminate\Queue\Worker as LaravelWorker;
8+
use Throwable;
9+
10+
class Worker extends LaravelWorker
11+
{
12+
/**
13+
* Creates a new SQS queue handler instance.
14+
*
15+
* @param \Illuminate\Contracts\Queue\Job $job
16+
* @param string $connectionName
17+
* @param \Illuminate\Queue\WorkerOptions $options
18+
* @return void
19+
*/
20+
public function runSqsJob(Job $job, string $connectionName, WorkerOptions $options): void
21+
{
22+
pcntl_async_signals(true);
23+
24+
pcntl_signal(SIGALRM, function () use ($job) {
25+
$this->markJobAsFailedIfItShouldFailOnTimeout(
26+
$job->getConnectionName(),
27+
$job,
28+
$this->maxAttemptsExceededException($job),
29+
);
30+
});
31+
32+
pcntl_alarm(
33+
max($this->timeoutForJob($job, $options), 0)
34+
);
35+
36+
$this->runJob($job, $connectionName, $options);
37+
38+
pcntl_alarm(0); // cancel the previous alarm
39+
}
40+
41+
/**
42+
* Mark the given job as failed if it should fail on timeouts.
43+
*
44+
* @param string $connectionName
45+
* @param \Illuminate\Contracts\Queue\Job $job
46+
* @param \Throwable $e
47+
* @return void
48+
*/
49+
protected function markJobAsFailedIfItShouldFailOnTimeout($connectionName, $job, Throwable $e)
50+
{
51+
if (method_exists($job, 'shouldFailOnTimeout') ? $job->shouldFailOnTimeout() : true) {
52+
$this->failJob($job, $e);
53+
}
54+
}
55+
}

0 commit comments

Comments
 (0)