I am attempting to build a multiple publishers / multiple subscriber topology using ZMQ. I have created an example using the espresso.py sample by doing some slight modifications to it. I wanted to make sure what I am doing is right as I am fairly new to zeromq. Please feel free to critique and comment.
I have basically taken a few lessons to heart.
A zmq socket can bind to one port only across multiple processes to a single network card (aka regular sockets)
Binding does not mean listen i.e. you can issue a connect() after a bind (very confusing for a socket developer but hey this is not sockets)
The Proxy and XPUB/XSUB is meant to be used a s pattern when subscribers should not have to figure out and connect to all the publishers.
What I really dont like about the code below is that Each subscriber binds to a separate socket. While this is a necessary evil, Somehow I kept thinking this does not look right.
So here is my sample code.
# Espresso Pattern
# This shows how to capture data using a pub-sub proxy
#
import time
from random import randint
from string import uppercase
from threading import Thread
import zmq
from zmq.devices import monitored_queue
from zhelpers import zpipe
# The subscriber thread requests messages starting with
# A and B, then reads and counts incoming messages.
def subscriber_thread():
ctx = zmq.Context.instance()
# Subscribe to "A" and "B"
subscriber = ctx.socket(zmq.SUB)
subscriber.connect("tcp://localhost:6001")
subscriber.setsockopt(zmq.SUBSCRIBE, b"A")
subscriber.setsockopt(zmq.SUBSCRIBE, b"B")
count = 0
while True:
try:
msg = subscriber.recv_multipart()
except zmq.ZMQError as e:
if e.errno == zmq.ETERM:
break # Interrupted
else:
raise
count += 1
print ("Subscriber received %d messages" % count)
# .split publisher thread
# The publisher sends random messages starting with A-J:
def publisher_thread(port, char):
ctx = zmq.Context.instance()
publisher = ctx.socket(zmq.PUB)
publisher.bind("tcp://*:"+str(port))
while True:
string = "%s-%05d" % (char, randint(port, port+500))
try:
publisher.send(string)
except zmq.ZMQError as e:
if e.errno == zmq.ETERM:
break # Interrupted
else:
raise
time.sleep(0.1) # Wait for 1/10th second
# .split listener thread
# The listener receives all messages flowing through the proxy, on its
# pipe. Here, the pipe is a pair of ZMQ_PAIR sockets that connects
# attached child threads via inproc. In other languages your mileage may vary:
def listener_thread(pipe):
# Print everything that arrives on pipe
while True:
try:
print (pipe.recv_multipart())
except zmq.ZMQError as e:
if e.errno == zmq.ETERM:
break # Interrupted
# .split main thread
# The main task starts the subscriber and publisher, and then sets
# itself up as a listening proxy. The listener runs as a child thread:
def main():
# Start child threads
ctx = zmq.Context.instance()
p_thread1 = Thread(target=publisher_thread, args=(6000,'A'))
p_thread2 = Thread(target=publisher_thread, args=(7000,'B'))
s_thread = Thread(target=subscriber_thread)
p_thread1.start()
p_thread2.start()
s_thread.start()
pipe = zpipe(ctx)
subscriber = ctx.socket(zmq.XSUB)
subscriber.connect("tcp://localhost:6000")
subscriber.connect("tcp://localhost:7000")
publisher = ctx.socket(zmq.XPUB)
publisher.bind("tcp://*:6001")
l_thread = Thread(target=listener_thread, args=(pipe[1],))
l_thread.start()
try:
monitored_queue(subscriber, publisher, pipe[0], 'pub', 'sub')
except KeyboardInterrupt:
print ("Interrupted")
del subscriber, publisher, pipe
ctx.term()
if __name__ == '__main__':
main()
I raised an issue on ZeroMQ github page and got a response. It is a known bug in ZeroMQ that is caused due to the fact that publish and subscribe are happening in different threads that are raising subscription requests before the receivers of the subscription messages are fully ready. More details can be found here.
https://github.com/zeromq/libzmq/issues/897
I tried to simulate the issue here
https://gist.github.com/vivekfantain/9021979
Sharing all this for anybody else who stumbles on the same issue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With