I'm using ZeroMQ to facilitate a publish/subscribe environment I'm needing. I'm running a publish server on machine A using Python (using EventLoop), and right now I have one subscriber running in C++ on machine B and a second subscriber running in Python (using EventLoop) on machine C.
If machine B subscribes to machine A before machine C does, then B gets subscribed messages and C does not. Furthermore, if I look at the established connections on machine A, there only exists a connection for machine B but not for C. If machine C subscribes to A before B does, then it's the other way around.
Here's my publisher code:
import zmq
from zmq.eventloop import ioloop, zmqstream
ioloop.install()
context   = zmq.Context(1)
socket    = context.socket(zmq.PUB)
publisher = zmqstream.ZMQStream(socket)
socket.bind("tcp://*:1337")
def publish():
  publisher.send_multipart(("heartbeat", "OHAI"))
ioloop.PeriodicCallback(publish, 5000).start()
ioloop.IOLoop.instance().start()
Here's my Python subscriber code:
import zmq
from zmq.eventloop import ioloop, zmqstream
ioloop.install()
context    = zmq.Context(1)
socket     = context.socket(zmq.SUB)
subscriber = zmqstream.ZMQStream(socket)
socket.setsockopt(zmq.SUBSCRIBE, "heartbeat")
socket.connect("tcp://pub.local:1337")
def subscription(message):
  print "Message Received: %s" % (message[1])
subscriber.on_recv(subscription)
ioloop.IOLoop.instance().start()
Why isn't my publisher accepting multiple incoming subscriber sockets? It's probably worth noting that multiple subscribers works fine when running them on machine A, but I don't think it's a firewall issue because I tested subscriber connections from B and C to A with the firewall disabled.
Pub/Sub is a pattern where the publisher is not programmed to send a message (payload) to a specific receiver. These messages are sent by publishers to specific channels, and receivers can subscribe to one or more channels to consume those same messages.
ZeroMQ (also known as ØMQ, 0MQ, or zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast.
ZMQ. Context Context is an object serving as a container for all the sockets of a single process. By creating a new context, you start one or more input/output threads: DEFINE context ZMQ.Context. Associated methods: socket()
Thanks to everyone for all the helpful comments in the original posting. This behavior turned out to be due to a mismatch in ZeroMQ versions being used... an oversight on my part.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With