Magento 2 RABBITMQ

Magento 2 Asynchronous Message Queue Management with RabbitMQ

created February 13, 2021, last updated February 15, 2021.

.

The Magento Message Queue Framework was introduced with Magento 2 and provides a standards based system for modules to publish messages to queues, and also defines the consumers that will receive the messages asynchronously.

What are Asynchronous Message Queues?

The normal way for a module to process data caused by some kind of front or backend action is to create an observer that listens for the specific Magento event and then processes the event data in some way. For example if you want to process order data whenever an order is placed, you would create a custom module with an order event listener. When a customer places an order the event will be fired and your module will process the data.

This event type processing occurs synchronously with the event. This can be a problem if your process is intensive and takes some time to complete, or if for some reason it causes an error. Synchronous processing of events may cause delays in frontend processes.

Asynchronous Message Queues allow your event to be placed in a queuing system and processed by your modules consumer as soon as possible.

The Magento Message Queue Framework consists of the following components:

  • Publisher
    A publisher is a component that sends messages to an exchange.
  • Exchange
    An exchange receives messages from publishers and sends them to queues.
  • Queue
    A queue is a buffer that stores messages.
  • Consumer
    A consumer receives messages. It knows which queue to consume.

By default Magento 2 uses the MySQL database as the Exchange and Queue system for messages. It does this with a MySQL adapter, Message data is stored in the queue, queue_message, queue_message_status tables. Magento uses the cron job consumers_runner to manage queued messages by starting (or restarting) message consumers.

The fastest this can happen with the Magento cron system is once every 60 seconds. The job is configured in the MessageQueue module.

job name="consumers_runner" instance="Magento\MessageQueue\Model\Cron\ConsumersRunner" method="run">
    <schedule>* * * * *</schedule>
</job>

I recommend looking at Magento\MessageQueue\Model\Cron\ConsumersRunner to understand how the default Magento consumers are managed.

If you want to list all the default consumers available in Magento 2 use the command bin/magento queue:consumers:list

Default message consumers/queues in Magento 2.4.2
product_action_attribute.update
product_action_attribute.website.update
exportProcessor
inventory.source.items.cleanup
inventory.mass.update
inventory.reservations.cleanup
inventory.reservations.update
codegeneratorProcessor
media.storage.catalog.image.resize
inventory.reservations.updateSalabilityStatus
inventory.indexer.sourceItem
inventory.indexer.stock
media.content.synchronization
media.gallery.renditions.update
media.gallery.synchronization
async.operations.all

 

Using the Magento database for messaging is not very scalable

RabbitMQ should be used whenever possible.

Converting Magento 2 Message Queues to Rabbit MQ AMQP

RabbitMQ is an open source message broker system and the Magento Message Queue Framework has built in support for RabbitMQ as a scalable platform for sending and receiving messages. RabbitMQ is based on the Advanced Message Queuing Protocol (AMQP) 0.9.1 specification.

Configuring Magento 2 to use RabbitMQ requires the addition of a new queue node in app/etc/env.php

    'queue' => [
        'amqp' => [
            'host' => '<rabbitmq_host>',
            'port' => '<rabbitmq_port>',
            'user' => '<rabbitmq_user>',
            'password' => '<rabbitmq_pass>',
            'virtualhost' => '/',
            'ssl' => false
        ],
     ]

Enabling message queues for RabbitMQ AMQP is simply a case of changing the configuration of the queue from DB to AMQP.

connection="amqp"

One built in message queue is by default configured to use AMQP this is the async.operations.all queue. If you have successfully configured RabbitMQ you should see this queue in the admin interface

Default AMQP Message Queue
Default AMQP Message Queue

When creating message queues for new modules you should configure them to use AMQP, check out this module for an example module using a product save event message.

If you want to convert existing MySQL DB message queues to use AMQP you can accomplish this using extra nodes in the env.php queue configuration. This example is taken from the official documentation for product_action_attribute.update queue:

'queue' => [
    'topics' => [
        'product_action_attribute.update' => [
            'publisher' => 'amqp-magento'
        ]
    ],
    'config' => [
        'publishers' => [
            'product_action_attribute.update' => [
                'connections' => [
                    'amqp' => [
                        'name' => 'amqp',
                        'exchange' => 'magento',
                        'disabled' => false
                    ],
                    'db' => [
                        'name' => 'db',
                        'disabled' => true
                    ]
                ]
            ]
        ]
    ],
    'consumers' => [
        'product_action_attribute.update' => [
            'connection' => 'amqp',
        ],
    ],
],

In theory you can try and convert all Magento 2 MySQL message queues to AMQP RabbitMQ message queues. If you do this you will also probably want to create your own external consumers to process RabbitMQ message queues more efficiently.

Note – during testing I was unable to convert some existing Magento 2.4.2 MySQL queues to AMQP

Keep in mind that if you create external message queue consumers you should ensure that the consumer processes are always running. There are a few ways to accomplish this for example by using a supervisord process control system. If you are working with a Docker environment I recommend creating one or more consumer containers to manage Magento 2 AMQP messages.

You can configure the default Magento 2 cron_runners_runner cron job via env.php settings

    'cron_consumers_runner' => [
        'cron_run' => true,
        'max_messages' => 20000,
        'consumers' => [
            'consumer1',
            'consumer2',
        ]
    ],

Note that if you set cron_run here to false you are completely disabling the default consumer cron job in Magento. If you are using external consumers for some or all message queues think carefully before completely disabling this cron job.

Docker Consumers and Message Management

I like to use dedicated Docker containers to manage message queue consumer processes in my Magento 2 Docker environment. In theory a container should manage a single consumer, in practice it can be easier to run multiple consumers in one container. The container needs to know which consumers to start, and also needs to monitor consumer processes to ensure that they remain running.

To manage this process and make it easier to convert message queues from MySQL to AMQP I created a Message Manager module.

This module allows you to convert some or all queues to AMQP, automatically updating the env.php settings to configure the changes required for each module. The module also feeds data to my Docker container telling the container which consumer processes it should manage.

After installing the module you can display all existing message consumers with:

bin/magento messagemanager:getconsumers

This will return an array of consumers showing the queue name and the current connection method – db for MySQL, – amqp for RabbitMQ AMQP.

To convert specific queues to AMQP you can define a whitelist in Gaiterjones\MessageManager\Helper\Data

    public function whitelist(): array
    {
        return array(
            'gaiterjones_message_manager',
            'product_action_attribute.update'
        );
    }

Here I am specifying two queues that I want to use with RabbitMQ AMQP. I know gaiterjones_message_manager is already configured for AMQP and I want to convert product_action_attribute.update from MySQL to AMQP.

To do this I can use the getconfig command. First check your current AMQP config with bin/magento messagemanager:getconfig ensure that your RabbitMQ server is configured.

    [amqp] => Array
        (
            [host] => magento2_rabbitmq_1
            [port] => 5672
            [user] => blah
            [password] => blah
            [virtualhost] => /
        )

Next display the configuration required to convert your whitelisted queues to AMQP

bin/magento messagemanager:getconfig --buildconfig --whitelist

(To display the configuration for all MySQL configured queues use bin/magento messagemanager:getconfig --buildconfig)

After reviewing the configuration use the saveconfig switch to save this configuration to env.php :

bin/magento messagemanager:getconfig --buildconfig --whitelist --saveconfig

Make sure you backup env.php before running this command!

After changing the config run setup:upgrade to reconfigure the module queues. If the queue config creates an error restore your env.php settings and run setup:upgrade again to restore the configuration.

Now you should now see the converted queues in RabbitMQ admin.

MySQL queues converted to AMQP and visible in RabbitMQ
MySQL queues converted to AMQP and visible in RabbitMQ

The gaiterjones_message_manager queue is installed with the module, to test RabbitMQ is working use the test queue command bin/magento messagemanager:testqueue

You should see a new message created in RabbitMQ.

RabbitMQ Message Queue Test
RabbitMQ Message Queue Test

Summary

It is very likely that future versions of Magento 2 will stop using MySQL as a messaging broker. In the same way that Search was moved to Elasticsearch we may have to use RabbitMQ in the future for all Magento async messaging.

Message queues are cool! If you are building production Magento 2 sites in a Docker environment it makes sense to start using RabbitMQ and external consumers now.

More Reading

In conclusion I recommend reading through the official documentation for more information on this subject.

Credits

Thanks to Diana Botean for her helpful comments.

Comments

This site uses Akismet to reduce spam. Learn how your comment data is processed.