Skip to content

Commit e6f9c3c

Browse files
authored
Merge pull request #6 from adm-bome/fix-laravel-job-attempts
Fix laravel job attempts, when removing DLX
2 parents 1cf0d53 + 1008703 commit e6f9c3c

21 files changed

+739
-413
lines changed

README.md

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,94 @@ Add connection to `config/queue.php`:
7373
],
7474
```
7575

76+
### Optional Config
77+
78+
Optionally add queue options to the config of a connection.
79+
Every queue created for this connection, get's the properties.
80+
81+
When you want to prioritize messages when they were delayed, then this is possible by adding extra options.
82+
- When max-priority is omitted, the max priority is set with 2 when used.
83+
84+
```php
85+
'connections' => [
86+
// ...
87+
88+
'rabbitmq' => [
89+
// ...
90+
91+
'options' => [
92+
'queue' => [
93+
// ...
94+
95+
'prioritize_delayed_messages' => false,
96+
'queue_max_priority' => 10,
97+
],
98+
],
99+
],
100+
101+
// ...
102+
],
103+
```
104+
105+
When you want to publish messages against an exchange with routing-key's, then this is possible by adding extra options.
106+
- When the exchange is omitted, RabbitMQ will use the `amq.direct` exchange for the routing-key
107+
- When routing-key is omitted the routing-key by default is the `queue` name.
108+
- When using `%s` in the routing-key the queue_name will be substituted.
109+
110+
> Note: when using exchange with routing-key, u probably create your queues with bindings yourself.
111+
112+
```php
113+
'connections' => [
114+
// ...
115+
116+
'rabbitmq' => [
117+
// ...
118+
119+
'options' => [
120+
'queue' => [
121+
// ...
122+
123+
'exchange' => 'application-x',
124+
'exchange_type' => 'topic',
125+
'exchange_routing_key' => '',
126+
],
127+
],
128+
],
129+
130+
// ...
131+
],
132+
```
133+
134+
In Laravel failed jobs are stored into the database. But maybe you want to instruct some other process to also do something with the message.
135+
When you want to instruct RabbitMQ to reroute failed messages to a exchange or a specific queue, then this is possible by adding extra options.
136+
- When the exchange is omitted, RabbitMQ will use the `amq.direct` exchange for the routing-key
137+
- When routing-key is omitted, the routing-key by default the `queue` name is substituted with `'.failed'`.
138+
- When using `%s` in the routing-key the queue_name will be substituted.
139+
140+
> Note: When using failed_job exchange with routing-key, u probably need to create your exchange/queue with bindings yourself.
141+
142+
```php
143+
'connections' => [
144+
// ...
145+
146+
'rabbitmq' => [
147+
// ...
148+
149+
'options' => [
150+
'queue' => [
151+
// ...
152+
153+
'reroute_failed' => true,
154+
'failed_exchange' => 'failed-exchange',
155+
'failed_routing_key' => 'application-x.%s',
156+
],
157+
],
158+
],
159+
160+
// ...
161+
],
162+
```
163+
76164
## Laravel Usage
77165

78166
Once you completed the configuration you can use Laravel Queue API. If you used other queue drivers you do not need to change anything else. If you do not know how to use Queue API, please refer to the official Laravel documentation: http://laravel.com/docs/queues

config/rabbitmq.php

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -36,44 +36,4 @@
3636
*/
3737
'worker' => env('RABBITMQ_WORKER', 'default'),
3838

39-
/*
40-
* ## Manage the delay strategy from the config.
41-
*
42-
* The delay strategy can be set to:
43-
* - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\DlxDelayStrategy::class
44-
*
45-
* ### Backoff Strategy
46-
*
47-
* The `DlxDelayStrategy` is BackoffAware and by default a ConstantBackoffStrategy is assigned.
48-
* This ensures the same behavior as if the `RabbitMqDlxDelayStrategy` was assigned.
49-
*
50-
* You can assign different backoffStrategies with extra options, for example:
51-
* - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\ConstantBackoffStrategy::class
52-
* - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\LinearBackoffStrategy::class
53-
* - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\ExponentialBackoffStrategy::class
54-
* - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\PolynomialBackoffStrategy::class
55-
*
56-
* The options must be an array of key -> value.
57-
*
58-
* For reference about RabbitMQ backoff strategy, see the following article:
59-
* https://m.alphasights.com/exponential-backoff-with-rabbitmq-78386b9bec81
60-
*
61-
* ### First-in First-out concept
62-
*
63-
* U can easily prioritize delayed messages. When set to `true` a message will be set with a higher priority.
64-
* This means that delayed messages are handled first when returning to the queue.
65-
*
66-
* This is useful when your queue has allot of jobs, and you want to make sure, a job will be handled
67-
* as soon as possible. This way RabbitMq handles the jobs and the way they are consumed by workers.
68-
*
69-
*/
70-
'delay' => [
71-
'strategy' => env('RABBITMQ_DELAY_STRATEGY', \Enqueue\AmqpTools\RabbitMqDlxDelayStrategy::class),
72-
'backoff' => [
73-
'strategy' => env('RABBITMQ_DELAY_BACKOFF_STRATEGY', \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\ConstantBackoffStrategy::class),
74-
'options' => [],
75-
],
76-
'prioritize'=> env('RABBITMQ_DELAY_PRIORITIZE'),
77-
],
78-
7939
];

src/Console/ExchangeDeclareCommand.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,6 @@ public function handle(RabbitMQConnector $connector): void
4040
(bool) $this->option('auto-delete')
4141
);
4242

43-
$this->warn('Exchange declared successfully.');
43+
$this->info('Exchange declared successfully.');
4444
}
4545
}

src/Queue/Connectors/RabbitMQConnector.php

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,13 @@ public function __construct(Dispatcher $dispatcher)
3838
*/
3939
public function connect(array $config): Queue
4040
{
41-
$connection = $this->createConnection($config);
41+
$connection = $this->createConnection(Arr::except($config, 'options.queue'));
4242

4343
$queue = $this->createQueue(
4444
Arr::get($config, 'worker', 'default'),
4545
$connection,
46-
$config['queue']
46+
$config['queue'],
47+
Arr::get($config, 'options.queue', [])
4748
);
4849

4950
if (! $queue instanceof RabbitMQQueue) {
@@ -71,29 +72,42 @@ protected function createConnection(array $config): AbstractConnection
7172
/** @var AbstractConnection $connection */
7273
$connection = Arr::get($config, 'connection', AMQPLazyConnection::class);
7374

74-
$hosts = Arr::shuffle(Arr::get($config, 'hosts', []));
75-
7675
// manually disable heartbeat so long-running tasks will not fail
77-
$config['options']['heartbeat'] = 0;
76+
Arr::set($config, 'options.heartbeat', 0);
7877

7978
return $connection::create_connection(
80-
$hosts,
79+
Arr::shuffle(Arr::get($config, 'hosts', [])),
8180
$this->filter(Arr::get($config, 'options', []))
8281
);
8382
}
8483

85-
protected function createQueue(string $worker, AbstractConnection $connection, string $queue)
84+
/**
85+
* Create a queue for the worker.
86+
*
87+
* @param string $worker
88+
* @param AbstractConnection $connection
89+
* @param string $queue
90+
* @param array $options
91+
* @return HorizonRabbitMQQueue|RabbitMQQueue|Queue
92+
*/
93+
protected function createQueue(string $worker, AbstractConnection $connection, string $queue, array $options = [])
8694
{
8795
switch ($worker) {
8896
case 'default':
89-
return new RabbitMQQueue($connection, $queue);
97+
return new RabbitMQQueue($connection, $queue, $options);
9098
case 'horizon':
91-
return new HorizonRabbitMQQueue($connection, $queue);
99+
return new HorizonRabbitMQQueue($connection, $queue, $options);
92100
default:
93-
return new $worker($connection, $queue);
101+
return new $worker($connection, $queue, $options);
94102
}
95103
}
96104

105+
/**
106+
* Recursively filter only null values.
107+
*
108+
* @param array $array
109+
* @return array
110+
*/
97111
private function filter(array $array): array
98112
{
99113
foreach ($array as $index => &$value) {

src/Queue/Jobs/RabbitMQJob.php

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,25 @@ public function attempts(): int
7575
$headers = Arr::get($this->message->get_properties(), 'application_headers');
7676

7777
if (! $headers) {
78-
return 0;
78+
return 1;
7979
}
8080

8181
$data = $headers->getNativeData();
8282

8383
$laravelAttempts = (int) Arr::get($data, 'laravel.attempts', 0);
8484
$xDeathCount = (int) Arr::get($headers->getNativeData(), 'x-death.0.count', 0);
8585

86-
return $laravelAttempts + $xDeathCount;
86+
return ($laravelAttempts) + 1;
87+
}
88+
89+
public function fail($e = null): void
90+
{
91+
parent::fail($e);
92+
93+
// We must tel rabbitMQ this Job is failed
94+
// The message must be rejected when the Job marked as failed, in case rabbitMQ wants to do some extra magic.
95+
// like: Death lettering the message to an other exchange/routing-key.
96+
$this->rabbitmq->reject($this);
8797
}
8898

8999
/**
@@ -95,7 +105,11 @@ public function delete(): void
95105
{
96106
parent::delete();
97107

98-
$this->rabbitmq->ack($this);
108+
// When delete is called and the Job was not failed, the message must be acknowledged.
109+
// This is because this is a controlled call by a developer. So the message was handled correct.
110+
if (! $this->failed) {
111+
$this->rabbitmq->ack($this);
112+
}
99113

100114
// required for Laravel Horizon
101115
if ($this->rabbitmq instanceof HorizonRabbitMQQueue) {
@@ -108,17 +122,15 @@ public function delete(): void
108122
*/
109123
public function release($delay = 0): void
110124
{
111-
parent::release($delay);
112-
113-
if ($delay > 0) {
114-
$this->rabbitmq->ack($this);
125+
parent::release();
115126

116-
$this->rabbitmq->laterRaw($delay, $this->message->body, $this->queue, $this->attempts());
127+
// Always create a new message when this Job is released
128+
$this->rabbitmq->laterRaw($delay, $this->message->body, $this->queue, $this->attempts());
117129

118-
return;
119-
}
120-
121-
$this->rabbitmq->reject($this);
130+
// Releasing a Job means the message was failed to process.
131+
// Because this Job is always recreated and pushed as new message, this Job is correctly handled.
132+
// We must tell rabbitMQ this fact.
133+
$this->rabbitmq->ack($this);
122134
}
123135

124136
/**

0 commit comments

Comments
 (0)