Daniel PhinDeveloper
Real-time: Symfony Messenger Consume command and prioritised messages
The greatest advantage of Symfony Messenger is arguably the ability to send and process messages in a different thread almost immediately. This post covers the worker that powers this functionality.
This post is part 3 in a series about Symfony Messenger.
- Introducing Symfony Messenger integrations with Drupal
- Symfony Messenger’ message and message handlers, and comparison with @QueueWorker
- Real-time: Symfony Messenger’ Consume command and prioritised messages
- Automatic message scheduling and replacing hook_cron
- Adding real-time processing to QueueWorker plugins
- Making Symfony Mailer asynchronous: integration with Symfony Messenger
- Displaying notifications when Symfony Messenger messages are processed
- Future of Symfony Messenger in Drupal
The Symfony Messenger integration, including the worker, is provided by the SM project. The worker is tasked with listening for messages ready to be dispatched from an asynchronous transport, such as the Doctrine database transport. The worker then re-dispatches the message onto the bus.
Some messages may be added to a bus with no particular execution time, in which case they are serialised by the original thread. Then unserialised almost immediately by the consume command in a different thread.
Since Messenger has the concept of delaying messages until a particular date, the DelayStamp
can be utilised. The consume command respects this stamp and will not redispatch a message until the time is right.
The worker is found in the sm
console application, rather than Drush. When SM is installed, Composer makes the application available in your bin directory. Typically at /vendor/bin/sm
The command takes one or more transports as the argument. For example if you’re using the Doctrine transport, the command would be:
sm messenger:consume doctrine
Multiple instances of the worker may be run simultaneously to improve throughput.
Prioritised messages
The worker allows you to prioritise the processing of messages by which transport a message was dispatched to. Transport prioritisation is achieved by adding a space separated list of transports as the command argument.
For example, given transports defined in a site-level services.yml
file:
parameters:
sm.transports:
doctrine:
dsn: 'doctrine://default?table_name=messenger_messages'
highpriority:
dsn: 'doctrine://default?table_name=messenger_messages_high'
lowpriority:
dsn: 'doctrine://default?table_name=messenger_messages_low'
In this case, the command would be sm messenger:consume highpriority doctrine lowpriority
Routing from messages to transports must also be configured appropriately. For example, you may decide Email messages are the highest priority. \Symfony\Component\Mailer\Messenger\SendEmailMessage
would be mapped to highpriority
:
parameters:
sm.routing:
Symfony\Component\Mailer\Messenger\SendEmailMessage: highpriority
Drupal\my_module\LessImportantMessage: lowpriority
'*': doctrine
More information on routing can be found in the previous post.
The transport a message is sent to may also be overridden on an individual message basis by utilising the Symfony\Component\Messenger\Stamp\TransportNamesStamp
stamp. Though for simplicity I’d recommend sticking to standard routing.
Running the CLI application
The sm
worker listens and processes messages, and is designed to run forever. A variety of built in flags are included, with the ability to quit when a memory or time limit is reached, or when a certain number of messages are processed or fail. Flags can be combined to process available messages and quit, much like drush queue:run
.
Further information on how to use the worker in production can be found in the Consuming Messages (Running the Worker) documentation.
The next post covers Cron and Scheduled messages, a viable replacement to hook_cron
.