Skip to main content

Adding real-time processing to QueueWorker plugins

Projects no longer need to rely on unpredictable processing time frames. The SM project can intercept legacy Drupal @QueueWorker items and insert them into the Symfony Messenger message bus, effectively giving existing core and contrib queue workers jobs real-time processing capabilities.

by daniel.phin /

This post is part 5 in a series about Symfony Messenger.

  1. Introducing Symfony Messenger integrations with Drupal
  2. Symfony Messenger’ message and message handlers, and comparison with @QueueWorker
  3. Real-time: Symfony Messenger’ Consume command and prioritised messages
  4. Automatic message scheduling and replacing hook_cron
  5. Adding real-time processing to QueueWorker plugins
  6. Making Symfony Mailer asynchronous: integration with Symfony Messenger
  7. Displaying notifications when Symfony Messenger messages are processed
  8. Future of Symfony Messenger in Drupal

QueueWorker plugins

@QueueWorker plugin implementations require no modifications, including the method of dispatch, data payload, or the processItem . The data payload must of course be serialisable. Fortunately, most QueueWorker plugins already comply since their data is serialised and stored to the queue table. As always, avoid adding complex objects like Drupal entities to payloads.

Runners

With queue interception, the sm command can be solely relied upon. Legacy runners such as Drupal web cron, request termination cron (automated_cron.module), and Drush queue:run will be rendered inoperable since they will no longer have anything to process. Consider decommissioning legacy runners when deploying queue interception.

Setup

Queue interception is a part of the primary SM module. Adding a single line in settings.php is the only action required to to enabling this feature:

$settings['queue_default'] = \Drupal\sm\QueueInterceptor\SmLegacyQueueFactory::class;

SM module will need to be fully installed before this line is added. Consider wrapping the line in a class_exists(SmLegacyQueueFactory::class) to enable in a single deployment.

Existing per-queue backends

Setup may be more complex if projects are utilising per-queue backends or anything other than the default database backend for queues, such as Redis. In that case, carefully evaluate whether to convert all or specific queues to use Symfony Messenger.

Whether per-queue backends are utilised can be determined by looking for queue_service_ or queue_reliable_service_ prefixed items in settings.php.

Routing

@QueueWorker jobs are converted to \Drupal\sm\QueueInterceptor\SmLegacyDrupalQueueItem messages in the backend. Knowing this class name allows you to configure transport routing. If routing for this message is not explicitly configured, it will naturally fall back to the default transport, or execute synchronously if there is no routing configuration.

Running the jobs

As usual, when a transport is configured, all you need to do is run sm messenger:consume to execute the tasks. The worker will either listen or poll for messages, and execute them in a very short amount of time after they are dispatched, in a dedicated thread. More information on the worker can be found in post 3 of this series.


The next post covers how Drupal emails can be dispatched to messages, so the web thread can execute faster.

Related Articles