I have a simple asynchronous consumer for AMQP/RabbitMQ, written in Python using the Pika library and based on the Asynchronous consumer example from the Pika docs. The main difference is that I want to run mine in a thread and I want it to close the connection properly then exit (i.e. terminate the thread) after a certain time interval. Here are my methods to open a connection and set a timeout. I also open a channel, create an exchange and bind a queue... all that works fine.
def connect(self):
LOGGER.info('OPEN connection...')
return pika.SelectConnection(self._parameters, self.on_connection_open, stop_ioloop_on_close=False)
def on_connection_open(self, unused_connection):
LOGGER.info('Connection opened')
self.add_on_connection_close_callback()
self._connection.add_timeout(5, self.timer_tick)
self.open_recv_channel()
Here's the timeout callback:
def timer_tick(self):
LOGGER.info('---TICK---')
self._stop()
Here's the _stop method:
def _stop(self):
LOGGER.info('Stopping...')
self._connection.close()
LOGGER.info('Stopped')
time.sleep(5)
self._connection.ioloop.stop()
Here's the run method which launches the thread:
def run(self):
print "-Run Started-"
self._connection = self.connect()
self._connection.ioloop.start()
print "-Run Finished-"
Here's the main bit of main():
client = TestClient()
client.start()
client.join()
LOGGER.info('Returned.')
time.sleep(30)
My problem is that the "self._connection.close()" won't work properly. I added an on_close callback:
self._connection.add_on_close_callback(self.on_connection_closed)
But on_connection_closed() is never called. Also, the connection is NOT closed. I can see it in the RabbitMQ management web interface, and it remains even after the thread finishes. Here's the output:
-Run Started-
2015-01-28 14:39:28,431: OPEN connection...
2015-01-28 14:39:28,491: Queue bound
(...[snipped] various other messages here...)
2015-01-28 14:39:28,491: Issuing consumer related RPC commands
2015-01-28 14:39:28,491: Adding consumer cancellation callback
(Pause here waiting for timeout callback)
2015-01-28 14:39:33,505: ---TICK---
2015-01-28 14:39:33,505: Stopping...
2015-01-28 14:39:33,505: Closing connection (200): Normal shutdown
2015-01-28 14:39:33,505: Stopped
-Run Finished-
2015-01-28 14:39:39,507: Returned.
"Closing connection (200): Normal shutdown" comes from Pika, but none of my on_close or on_cancel callbacks are called, whether I start by closing the channel, or just close the connection. The only thing that DOES work is stopping the consumer with "basic_cancel", which causes my "on_cancel_callback" to be called.
I want to use a loop in the main program to create and destroy consumer threads, but at the moment, every time I run one I end up with an orphaned connection left over so my number of connections goes up indefinitely. The connections DO disappear when the program closes.
Using connection.close() should work: From the Pika Docs:
close(reply_code=200, reply_text='Normal shutdown')
Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.
If you're sharing the connection between your threads this can cause problems. pika is not thread safe and connections shouldn't be used by different threads.
First bit of the FAQ:
Q:
Is Pika thread safe?
A:
Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With