Following Pika timed received example, I would like to have a client handling more concurrent requests. My question is, if handle_delivery could be somehow called each time new message is received and not waiting for previous handle_delivery return?
RabbitMQ includes a wide variety of features that make it useful when building distributed systems that communicate via asynchronous messaging. To get started, reading our RabbitMQ - Getting started guide is a great way to learn more about message queue architecture!
Pika is a Python implementation of the AMQP 0-9-1 protocol for RabbitMQ. This tutorial guides you through installing Pika, declaring a queue, setting up a publisher to send messages to the broker's default exchange, and setting up a consumer to recieve messages from the queue. Topics. Prerequisites. Permissions.
Asynchronous consumer example — pika 1.2.
The basic pipe is unidirectional. You cannot send messages from the consumer to the producer through the same queue that the consumer received messages from the producer. If you want send messages the other way, your consumer will need to be a producer as well, and your producer will need to be a consumer as well.
It looks like the call to handle_delivery is blocking, but you could have it add a secondary handler to the I/O event loop using add_timeout.  I think this is what you are looking to do:
"""
Asyncronous amqp consumer; do our processing via an ioloop timeout
"""
import sys
import time
from pika.adapters import SelectConnection
from pika.connection import ConnectionParameters
connection = None
channel = None
def on_connected(connection):
    print "timed_receive: Connected to RabbitMQ"
    connection.channel(on_channel_open)
def on_channel_open(channel_):
    global channel
    channel = channel_
    print "timed_receive: Received our Channel"
    channel.queue_declare(queue="test", durable=True,
                          exclusive=False, auto_delete=False,
                          callback=on_queue_declared)
class TimingHandler(object):
    count = 0
    last_count = 0
    def __init__(self, delay=0):
        self.start_time = time.time()
        self.delay = delay
    def handle_delivery(self, channel, method, header, body):
        connection.add_timeout(self.delay, self)
    def __call__(self):
        self.count += 1
        if not self.count % 1000:
            now = time.time()
            duration = now - self.start_time
            sent = self.count - self.last_count
            rate = sent / duration
            self.last_count = self.count
            self.start_time = now
            print "timed_receive: %i Messages Received, %.4f per second" %\
                  (self.count, rate)
def on_queue_declared(frame):
    print "timed_receive: Queue Declared"
    channel.basic_consume(TimingHandler().handle_delivery, queue='test', no_ack=True)
if __name__ == '__main__':
    # Connect to RabbitMQ
    host = (len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1'
    connection = SelectConnection(ConnectionParameters(host),
                                  on_connected)
    # Loop until CTRL-C
    try:
        # Start our blocking loop
        connection.ioloop.start()
    except KeyboardInterrupt:
        # Close the connection
        connection.close()
        # Loop until the connection is closed
        connection.ioloop.start()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With