The Magento Message Queue Framework was made available in the Open Source version of Magento with version 2.3 and provides a standards based system for modules to publish messages to queues and also defines the consumers that will receive the messages asynchronously.
What are Asynchronous Message Queues?
The normal way for a module to process data caused by some kind of front or backend action is to create an observer that listens for the specific Magento event and then processes the event data in some way. For example if you want to process order data whenever an order is placed, you would create a custom module with an order event observer. When a customer places an order the event will be fired and your module can process the order data.
This event type processing occurs synchronously with the event. This can be a problem if your process is intensive and takes some time to complete, or if for some reason it causes an error. Synchronous processing of events may cause delays for the customer in frontend processes.
Asynchronous Message Queues allow your event to be placed in a queuing system and processed by your modules consumer as soon as possible.
The Magento Message Queue Framework consists of the following components:
- Publisher
A publisher is a component that sends messages to an exchange. - Exchange
An exchange receives messages from publishers and sends them to queues. - Queue
A queue is a buffer that stores messages. - Consumer
A consumer receives messages. It knows which queue to consume.
By default Magento 2 uses the MySQL database as the Exchange and Queue system for messages. It does this with a MySQL adapter, Message data is stored in the queue, queue_message, queue_message_status
tables. Magento uses the cron job consumers_runner
to manage queued messages by starting (or restarting) message consumers.
The fastest this can happen with the Magento cron system is once every 60 seconds. The job is configured in the MessageQueue module.
job name="consumers_runner" instance="Magento\MessageQueue\Model\Cron\ConsumersRunner" method="run"> <schedule>* * * * *</schedule> </job>
I recommend looking at Magento\MessageQueue\Model\Cron\ConsumersRunner
to understand how the default Magento consumers are managed.
If you want to list all the default consumers available in Magento 2 use the command bin/magento queue:consumers:list
product_action_attribute.website.update
exportProcessor
inventory.source.items.cleanup
inventory.mass.update
inventory.reservations.cleanup
inventory.reservations.update
codegeneratorProcessor
media.storage.catalog.image.resize
inventory.reservations.updateSalabilityStatus
inventory.indexer.sourceItem
inventory.indexer.stock
media.content.synchronization
media.gallery.renditions.update
media.gallery.synchronization
async.operations.all
[/text]
Using the Magento database for messaging is not very scalable
RabbitMQ should be used whenever possible.
Converting Magento 2 Message Queues to Rabbit MQ AMQP
RabbitMQ is an open source message broker system and the Magento Message Queue Framework has built in support for RabbitMQ as a scalable platform for sending and receiving messages. RabbitMQ is based on the Advanced Message Queuing Protocol (AMQP) 0.9.1 specification.
Configuring Magento 2 to use RabbitMQ requires the addition of a new queue node in app/etc/env.php
'queue' => [ 'amqp' => [ 'host' => '<rabbitmq_host>', 'port' => '<rabbitmq_port>', 'user' => '<rabbitmq_user>', 'password' => '<rabbitmq_pass>', 'virtualhost' => '/', 'ssl' => false ], ]
Enabling message queues for RabbitMQ AMQP is simply a case of changing the configuration of the queue from DB to AMQP.
connection="amqp"
One built in message queue is by default configured to use AMQP this is the async.operations.all
queue. If you have successfully configured RabbitMQ you should see this queue in the admin interface
When creating message queues for new modules you should configure them to use AMQP, check out this module for an example module using a product save event message.
If you want to convert existing MySQL DB message queues to use AMQP you can accomplish this using extra nodes in the env.php
queue configuration. This example is taken from the official documentation for product_action_attribute.update
queue:
'queue' => [ 'topics' => [ 'product_action_attribute.update' => [ 'publisher' => 'amqp-magento' ] ], 'config' => [ 'publishers' => [ 'product_action_attribute.update' => [ 'connections' => [ 'amqp' => [ 'name' => 'amqp', 'exchange' => 'magento', 'disabled' => false ], 'db' => [ 'name' => 'db', 'disabled' => true ] ] ] ] ], 'consumers' => [ 'product_action_attribute.update' => [ 'connection' => 'amqp', ], ], ],
In theory you can try and convert all Magento 2 MySQL message queues to AMQP RabbitMQ message queues. If you do this you will also probably want to create your own external consumers to process RabbitMQ message queues more efficiently.
Keep in mind that if you create external message queue consumers you should ensure that the consumer processes are always running. There are a few ways to accomplish this for example by using a supervisord process control system. If you are working with a Docker environment I recommend creating one or more consumer containers to manage Magento 2 AMQP messages.
You can configure the default Magento 2 cron_runners_runner
cron job via env.php
settings
'cron_consumers_runner' => [ 'cron_run' => true, 'max_messages' => 20000, 'consumers' => [ 'consumer1', 'consumer2', ] ],
Note that if you set cron_run
here to false
you are completely disabling the default consumer cron job in Magento. If you are using external consumers for some or all message queues think carefully before completely disabling this cron job.
Note – during testing I was unable to convert some existing Magento 2.4.2 MySQL queues to AMQP
Docker Consumers and Message Management
I like to use dedicated Docker containers to manage message queue consumer processes in my Magento 2 Docker environment. In theory a container should manage a single consumer, in practice it can be easier to run multiple consumers in one container. The container needs to know which consumers to start, and also needs to monitor consumer processes to ensure that they remain running.
To manage this process and make it easier to convert multiple message queues from MySQL db to AMQP I created a Message Manager module.
This module allows you to convert some or all queues to AMQP, automatically updating the env.php
settings to configure the changes required for each module. The module also feeds data to my Docker container telling the container which consumer processes it should manage.
After installing the module you can display all existing message consumers with:
bin/magento messagemanager:getconsumers
This will return an array of consumers showing the queue name and the current connection method – db for MySQL, – amqp for RabbitMQ AMQP.
To convert specific queues to AMQP you can define a whitelist in Gaiterjones\MessageManager\Helper\Data
public function whitelist(): array { return array( 'gaiterjones_message_manager', 'product_action_attribute.update' ); }
Here I am specifying two queues that I want to use with RabbitMQ AMQP. I know gaiterjones_message_manager
is already configured for AMQP and I want to convert product_action_attribute.update
from MySQL to AMQP.
To do this I can use the getconfig command. First check your current AMQP config with bin/magento messagemanager:getconfig
ensure that your RabbitMQ server is configured.
[amqp] => Array ( [host] => magento2_rabbitmq_1 [port] => 5672 [user] => blah [password] => blah [virtualhost] => / )
Next display the configuration required to convert your whitelisted queues to AMQP
bin/magento messagemanager:getconfig --buildconfig --whitelist
(To display the configuration for all MySQL configured queues use bin/magento messagemanager:getconfig --buildconfig
)
After reviewing the configuration use the saveconfig
switch to save this configuration to env.php
:
bin/magento messagemanager:getconfig --buildconfig --whitelist --saveconfig
Make sure you backup env.php
before running this command!
After changing the config run setup:upgrade
to reconfigure the module queues. If the queue config creates an error restore your env.php
settings and run setup:upgrade
again to restore the configuration.
Now you should see the converted queues in RabbitMQ admin.
The gaiterjones_message_manager
queue is installed with the module, to test RabbitMQ is working use the test queue command bin/magento messagemanager:testqueue
You should see a new message created in RabbitMQ.
Summary
It is very likely that future versions of Magento 2 will stop using MySQL as a messaging broker. In the same way that Search was moved to Elasticsearch we may have to use RabbitMQ in the future for all Magento async messaging.
Message queues are cool! If you are building production Magento 2 sites in a Docker environment it makes sense to start using RabbitMQ and external consumers now.
More Reading
In conclusion I recommend reading through the official documentation for more information on this subject.
Credits
Thanks to Diana Botean for her helpful comments.
Comments