Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can we speed up publishing messages via RabbitMQ

I am running some tests on my Ubuntu workstation. These benchmarks start with populating a queue, which runs very slowly:

import pika
import datetime

if __name__ == '__main__':
    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
        channel = connection.channel()

        channel.queue_declare(queue='hello_durable', durable=True)
        started_at = datetime.datetime.now()
        properties = pika.BasicProperties(delivery_mode=2)
        for i in range(0, 100000):
            channel.basic_publish(exchange='',
                                  routing_key='hello',
                                  body='Hello World!',
                                  properties=properties)
            if i%10000 == 0:
                duration = datetime.datetime.now() - started_at
                print(i, duration.total_seconds())
        print(" [x] Sent 'Hello World!'")
        connection.close()
        now = datetime.datetime.now()
        duration = now - started_at
        print(duration.total_seconds())
    except Exception as e:
        print(e)

It takes more than 30 seconds to send 10K messages. The workstation has 12 cores, which are not busy, according to top command. There are over 8Gb of free memory. It does not matter much whether the queue is durable.

How can we speed up sending messages?

like image 288
AlexC Avatar asked Oct 22 '25 15:10

AlexC


2 Answers

Switching from BlockingConnection to SelectConnection made a huge difference, speeding up the process almost fifty times. All I needed to do is modify the example from the following tutorial:, publishing messages in a loop:

import pika

# Step #3
def on_open(connection):

    connection.channel(on_channel_open)

# Step #4
def on_channel_open(channel):

    channel.basic_publish('test_exchange',
                            'test_routing_key',
                            'message body value',
                            pika.BasicProperties(content_type='text/plain',
                                                 delivery_mode=1))

    connection.close()

# Step #1: Connect to RabbitMQ
parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')

connection = pika.SelectConnection(parameters=parameters,
                                   on_open_callback=on_open)

try:

    # Step #2 - Block on the IOLoop
    connection.ioloop.start()

# Catch a Keyboard Interrupt to make sure that the connection is closed cleanly
except KeyboardInterrupt:

    # Gracefully close the connection
    connection.close()

    # Start the IOLoop again so Pika can communicate, it will stop on its own when the connection is closed
    connection.ioloop.start()
like image 179
AlexC Avatar answered Oct 25 '25 05:10

AlexC


Am assuming that you don't run any consumers These benchmarks start with populating a queue. Since you are only publishing messages, the rabbitmq switches to flow state. To be more precise, your exchanges and/or queues go to flow state. Quote from rabbitmq blog

This (roughly) means that the client is being rate-limited; it would like to publish faster but the server can't keep up

I'm sure that if you look close enough, you will see that the first portion of the messages (on initial setup, with empty queue) goes fast, but the sending rate drops drastically at some point.

like image 23
cantSleepNow Avatar answered Oct 25 '25 05:10

cantSleepNow