Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Making sure a worker process always terminate in zeroMQ

Tags:

python

zeromq

I am implementing a pipeline pattern with zeroMQ using the python bindings.

tasks are fanned out to workers which listen for new tasks with an infinite loop like this:

    while True:
        socks = dict(self.poller.poll())
        if self.receiver in socks and socks[self.receiver] == zmq.POLLIN:
            msg = self.receiver.recv_unicode(encoding='utf-8')
            self.process(msg)
        if self.hear in socks and socks[self.hear] == zmq.POLLIN:
            msg = self.hear.recv()
            print self.pid,":",  msg
            sys.exit(0)

they exit when they get a message from the sink node, confirming having received all the results expected.

however, worker may miss such a message and not finish. What is the best way to have workers always finish, when they have no way to know (other than through the already mentioned message, that there are no further tasks to process).

Here is the testing code I wrote for checking the workers status:

#-*- coding:utf-8 -*-
"""
Test module containing tests for all modules of pypln 

"""
import unittest
from servers.ventilator import Ventilator
from subprocess import Popen, PIPE
import time
class testWorkerModules(unittest.TestCase):
    def setUp(self):
        self.nw = 4
        #spawn 4 workers
        self.ws = [Popen(['python', 'workers/dummy_worker.py'], stdout=None) for i in range(self.nw)]
        #spawn a sink
        self.sink = Popen(['python', 'sinks/dummy_sink.py'], stdout=None)
        #start a ventilator
        self.V = Ventilator()
        # wait for workers and sinks to connect
        time.sleep(1)

    def test_send_unicode(self):
        '''
        Pushing unicode strings through workers to sinks.
        '''

        self.V.push_load([u'são joão' for i in xrange(80)])
        time.sleep(1)
        #[p.wait() for p in self.ws]#wait for the workers to terminate
        wsr = [p.poll() for p in self.ws]
        while None in wsr:
            print wsr, [p.pid for p in self.ws if p.poll() == None] #these are the unfinished workers
            time.sleep(0.5)
            wsr = [p.poll() for p in self.ws]
        self.sink.wait()
        self.sink = self.sink.returncode
        self.assertEqual([0]*self.nw, wsr)
        self.assertEqual(0, self.sink)

if __name__ == '__main__':
    unittest.main()
like image 956
fccoelho Avatar asked Nov 21 '25 09:11

fccoelho


1 Answers

All the messaging stuff eventually ends up with heartbeats. If you (as a worker or a sink or whatever) discover that a component you need to work with is dead, you can basically either try to connect somewhere else or kill yourself. So if you as a worker discover that the sink is there no more, just exit. This also means that you may exit even though the sink is still there but the connection is broken. But I am not sure you can do more, perhaps set all the timeouts more reasonably...

like image 197
tchap Avatar answered Nov 22 '25 21:11

tchap



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!