The Laravel Queue
Table of Contents
To do this, I started my journey with a test - this is not for testing anything, it’s just a tool to lure the way through the magical forest of Laravel.
namespace Tests\Feature;
use App\Models\Game;
use App\Queue\Events\TestEvent;
use Illuminate\Support\Facades\Artisan;
use Tests\TestCase;
class EventTest extends TestCase
{
public function testEvent()
{
/** @var Game $game */
$game = Game::first();
TestEvent::dispatch($game);
Artisan::call('queue:work --queue=game-queue');
$this->assertTrue($game->refresh()->token === 'Tested by Event');
}
}
Dispatching the Event #
The Event that is dispatched in the first step is simple enough to be interesting inside the queue.
use App\Models\Game;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class TestEvent implements ShouldQueue
{
use Dispatchable;
use InteractsWithSockets;
use SerializesModels;
public function __construct(public Game $game) {}
}
Mind the Dispatchable
trait, which will lead (using some magic helper function)
to Illuminate\Events\Dispatcher::dispatch()
. Ignoring Broadcasts here, the Dispatcher will determine the Listeners. To
emphasize this - it is not the Event that is pushed to the Queue, it is the Listener.
use App\Queue\Events\TestEvent;
use Illuminate\Contracts\Queue\ShouldQueue;
class TestListener implements ShouldQueue
{
public $connection = 'database'; // can be in the env
public $queue = 'game-queue'; // was fun testing
public function handle(TestEvent $event)
{
$event->game->update(['token' => 'Tested by Event']);
}
}
Each Listener determines if it is queueable (which means if it implements the ShouldQueue Interface). If not, the
current container resolves the Listener and calls the handle
function.
If yes, the Listener is handed to the queueHandler
.
In this step, the Listener (and the Event it receives in its method parameters) are deconstructed. The Handle Method
Arguments are mapped into an Array including ALL its attributes (including private or protected ones). If the Event
contains complex properties, this might cause problems. An Event should be as simple as possible.
$arguments = array_map(function ($a) {
return is_object($a) ? clone $a : $a;
}, func_get_args())
After mapping the arguments the Listener will receive, the Listener is packed in
a new CallQueuedListener($class, $method, $arguments)
. This object is also filled with the Queue Name and the
Connection, both usually set in the environment, but can be overwritten in the Listener.
Pushing and serializing the Listener #
Then, corresponding again to the connection, the Queue is called via Illuminate/Queue/DatabaseQueue::push
. The
Queue is responsible for defining the Payload. For this the Job (which holds the Listener) is
Serialized serialize(clone $job);
2 and then pushed to the Queue.
The goal of Serialization is to convert a complex data structure into a storable representation of a value (in this case a JSON String). This should not be used for storing objects in the Database in a normal case, but in this case, it’s used to store the Listener in some kind of queue, as well as its current state. The Constructor is not serialized, so dependency Injection is still possible in the Listener.
Small loop back the Event and the SerializesModels Trait. This Trait contains implements a __serialize()
function,
which overwrites the normal serialization, so the object stored on the queue is way smaller and only contains the
identifier, a list of eager loaded relations and changed attributes.
With all that the payload looks something like:
{
"uuid": "66b9777e-221d-4c8e-9b4e-8870d7d6aec2",
"displayName": "App\\Queue\\Listeners\\TestListener",
"job": "Illuminate\\Queue\\CallQueuedHandler@call",
"maxTries": null,
"maxExceptions": null,
"failOnTimeout": false,
"backoff": null,
"timeout": null,
"retryUntil": null,
"data": {
"commandName": "Illuminate\\Events\\CallQueuedListener",
"command": {
"class": "App\\Queue\\Listeners\\TestListener",
"method": "handle",
"data": [
{
"arePublicsGettingSerialized": true,
"game": {
"class": "App\\Models\\Game",
"id": 1,
"relations": {}
},
"socket": null
}
],
"tries": null,
"maxExceptions": null,
"backoff": null,
"retryUntil": null,
"timeout": null,
"shouldBeEncrypted": false,
"job": null,
"connection": null,
"queue": null,
"chainConnection": null,
"chainQueue": null,
"chainCatchCallbacks": null,
"delay": null,
"afterCommit": null,
"middleware": [],
"chained": []
}
}
}
Notes on Method Locations:
While most of the above happen inside the specific Queue (like DatabaseQueue
), all Queues I found are using
the createPayload()
inside the Illuminate\Queue\Queue
. I assume this is to make sure the Payload looks the same for
every Driver and can be read by every worker. But here is one weakness of the Laravel Queue:
In complex systems it might be useful to use Events to communicate between different Services throughout the System,
writing and listening to the same queue. Implementing something like that in Laravel would require writing a new Queue,
maybe not using the createPayload()
function, which sounds like a slippery slope; but if you went it down, I would be
curious to hear about it.
Now, what’s left is building the Database entry. After this the container might get deconstructed, the PHP thread
changes, everything after this step happens at a different time - maybe on a different server!
What feels like implementing an “empty” Interface like ShouldQueue
without a great impact on local Development
with QUEUE_CONNECTION=sync
hides the code and context switches happening on production.
protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
{
return [
'queue' => $queue,
'attempts' => $attempts,
'reserved_at' => null,
'available_at' => $availableAt,
'created_at' => $this->currentTime(),
'payload' => $payload,
];
}
Working the Queue #
Working the queue is triggered by a Command, the Illuminate\Queue\Console\WorkCommand::handle()
. This command triggers
a worker to run as daemon, which means it uses a while(true)
loop to run until its break conditions (e.g. Maintenance
Mode) are triggered 3.
To emphasize this: The Worker is not a daemon in the sense of a Linux Daemon, it’s php code running inside a
Laravel application. Which for me raised the question if all Jobs processed by the Worker are processed in the same
context / Container. Short answer is Yes, or as it’s stated in the Laravel Docs:
Remember, queue workers are long-lived processes and store the booted application state in memory. As a result, they will not notice changes in your code base after they have been started. 4
Not so short answer is: in production, no. A line of code I even put in my deployment script includes a Supervisor which will restart the worker every hour or after a certain number of jobs 5. Again emphasizing, because I did not expect it to work like this: Laravel uses a non Laravel Superviser task to control a Laravel Container which would otherwise just keep on running on the server, which is working all the jobs in the same context in its queue with no build in capability of sharing that queue with other applications. I mean it works.
In this loop, it will call Illuminate/Queue/DatabaseQueue::pop
to fetch the next Job and unpack it into a Job,
in this case, an Illuminate/Queue/Jobs/DatabaseJob
. The Worker will raise an Event before the Job starts and after the
Job finished (or failed). Finally, the Jobs fire()
method can be called, which instantiates the Listener and calls the
specified method with the Event as a parameter.
Conclusion #
I wrote this article to understand the Laravel Queue better, what parts can be dangerous and what hooks I can use if I need a different behavior than the Laravel default.
Happy Coding :)
Serialise function, how to overload it, and more. As so often, the best comments are better than the manual. ↩︎
Laravel Docs to explain how the worker is a long-lived process ↩︎
Supervisor is a process control system that allows you to monitor and control a worker ↩︎