I am running some tests on my Ubuntu workstation. These benchmarks start with populating a queue, which runs very slowly:
import pika
import datetime
if __name__ == '__main__':
    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
        channel = connection.channel()
        channel.queue_declare(queue='hello_durable', durable=True)
        started_at = datetime.datetime.now()
        properties = pika.BasicProperties(delivery_mode=2)
        for i in range(0, 100000):
            channel.basic_publish(exchange='',
                                  routing_key='hello',
                                  body='Hello World!',
                                  properties=properties)
            if i%10000 == 0:
                duration = datetime.datetime.now() - started_at
                print(i, duration.total_seconds())
        print(" [x] Sent 'Hello World!'")
        connection.close()
        now = datetime.datetime.now()
        duration = now - started_at
        print(duration.total_seconds())
    except Exception as e:
        print(e)
It takes more than 30 seconds to send 10K messages. The workstation has 12 cores, which are not busy, according to top command. There are over 8Gb of free memory. It does not matter much whether the queue is durable.
How can we speed up sending messages?
Switching from BlockingConnection to SelectConnection made a huge difference, speeding up the process almost fifty times. All I needed to do is modify the example from the following tutorial:, publishing messages in a loop:
import pika
# Step #3
def on_open(connection):
    connection.channel(on_channel_open)
# Step #4
def on_channel_open(channel):
    channel.basic_publish('test_exchange',
                            'test_routing_key',
                            'message body value',
                            pika.BasicProperties(content_type='text/plain',
                                                 delivery_mode=1))
    connection.close()
# Step #1: Connect to RabbitMQ
parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection = pika.SelectConnection(parameters=parameters,
                                   on_open_callback=on_open)
try:
    # Step #2 - Block on the IOLoop
    connection.ioloop.start()
# Catch a Keyboard Interrupt to make sure that the connection is closed cleanly
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Start the IOLoop again so Pika can communicate, it will stop on its own when the connection is closed
    connection.ioloop.start()
Am assuming that you don't run any consumers These benchmarks start with populating a queue.
Since you are only publishing messages, the rabbitmq switches to flow state. To be more precise, your exchanges and/or queues go to flow state. 
Quote from rabbitmq blog
This (roughly) means that the client is being rate-limited; it would like to publish faster but the server can't keep up
I'm sure that if you look close enough, you will see that the first portion of the messages (on initial setup, with empty queue) goes fast, but the sending rate drops drastically at some point.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With