Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Waking up a specific thread from sleep from another thread in python

I've been chasing my tail on this one for days now, and I'm going crazy. I'm a total amateur and totally new to python, so please excuse my stupidity.

My main thread repeatedly checks a database for an entry and then starts threads for every new entry that it finds in the database.

The threads that it starts up basically poll the database for a value and if it doesn't find that value, it does some things and then it sleeps for 60 seconds and starts over.

Simplified non-code for the started up thread

while True:
    stop = _Get_a_Value_From_Database_for_Exit() #..... a call to DBMS 
    If stop = 0:
        Do_stuff()
        time.sleep(60)
    else:
        break

There could be many of these threads running at any given time. What I'd like to do is have the main thread check another spot in the database for specific value and then can interrupt the sleep in the example above in a specific thread that was started. The goal would be to exit a specific thread like the one listed above without having to wait the remainder of the sleep duration. All of these threads can be referenced by the database id that is shared. I've seen references to event.wait() and event.set() been trying to figure out how I replace the time.sleep() with it, but I have no idea how I could use it to wake up a specific thread instead of all of them.

This is where my ignorance show through: is there a way where I could do something based on database id for the event.wait (Like 12345.wait(60) in the started up thread and 12345.set() in the main thread (all dynamic based on the ever changing database id).

Thanks for your help!!

like image 992
tonyk4316 Avatar asked Mar 15 '26 19:03

tonyk4316


1 Answers

The project is a little complex, and here's my version of it.

  • scan database file /tmp/db.dat, prefilled with two words

  • manager: create a thread for each word; default is a "whiskey" thread and a "syrup" thread

  • if a word ends in _stop, like syrup_stop, tell that thread to die by setting its stop event

  • each thread scans the database file and exits if it sees the word stop. It'll also exit if its stop event is set.

  • note that if the Manager thread sets a worker's stop_event, the worker will immediately exit. Each thread does a little bit of stuff, but spends most of its time in the stop_ev.wait() call. Thus, when the event does get set, it doesn't have to sit around, it can exit immediately.

The server is fun to play around with! Start it, then send commands to it by adding lines to the database. Try each one of the following:

$ echo pie >> /tmp/db.dat  # start new thread

$ echo pie_stop >> /tmp/db.dat # stop thread by event

$ echo whiskey_stop >> /tmp/db.dat # stop another thread "

$ echo stop >> /tmp/db.dat # stop all threads

source

import logging, sys, threading, time

STOP_VALUE = 'stop'

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)-4s %(threadName)s %(levelname)s %(message)s", 
    datefmt="%H:%M:%S",
    stream=sys.stderr,
)


class Database(list):
    PATH = '/tmp/db.dat'
    def __init__(self):
        super(Database,self).__init__()
        self._update_lock = threading.Lock()

    def update(self):
        with self._update_lock:
            self[:] = [ line.strip() for line in open(self.PATH) ]

db = Database()

def spawn(events, key):
    events[key] = threading.Event()
    th = threading.Thread(
        target=search_worker,
        kwargs=dict(stop_ev=events[key]),
        name='thread-{}'.format(key),
    )
    th.daemon = True
    th.start()

def search_worker(stop_ev):
    """
    scan database until "stop" found, or our event is set
    """
    logging.info('start')
    while True:
        logging.debug('scan')
        db.update()
        if STOP_VALUE in db:
            logging.info('stopvalue: done')
            return
        if stop_ev.wait(timeout=10):
            logging.info('event: done')
            return 

def manager():
    """
    scan database
    - word: spawn thread if none already
    - word_stop: tell thread to die by setting its stop event
    """
    logging.info('start')
    events = dict()
    while True:
        db.update()
        for key in db:
            if key == STOP_VALUE:
                continue
            if key in events:
                continue
            if key.endswith('_stop'):
                key = key.split('_')[0]
                if key not in events:
                    logging.error('stop: missing key=%s!', key)
                else:
                    # signal thread to stop
                    logging.info('stop: key=%s', key)
                    events[key].set()
                    del events[key]
            else:
                spawn(events, key)
                logging.info('spawn: key=%s', key)
        time.sleep(2)


if __name__=='__main__':

    with open(Database.PATH, 'w') as dbf:
        dbf.write(
        'whiskey\nsyrup\n'
        )

    db.update()
    logging.info('start: db=%s -- %s', db.PATH, db)

    manager_t = threading.Thread(
        target=manager,
        name='manager',
    )
    manager_t.start()
    manager_t.join()
like image 108
johntellsall Avatar answered Mar 17 '26 08:03

johntellsall



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!