Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python - checking if all tasks from a Queue are completed

Is there an option to check if a Queue is empty AND all Threads have already finished processing their tasks (i.e. task_done() was run)? I want to add additional tasks only if both conditions are met.

As I want to be able to add more tasks I cannot just quit unused threads and use activeCount(). Also I don't want to join() the Queue as I want to be able to actively monitor progress of the execution.

Here is an example code:

from Queue import Queue
from threading import Thread
import time

queue = Queue()

def my_method(queue):
    while True:
        task = queue.get()

        time.sleep((task + 2) * 3)

        queue.task_done()

num_queue_threads = 2
queue_threads = [None] * num_queue_threads

for i in range(num_queue_threads):
    queue_threads[i] = Thread(target=my_method, args=(queue,))
    queue_threads[i].setDaemon(True)
    queue_threads[i].start()

for task in range(3):
    queue.put(task)

#queue.join() #need to wait actively

while True:
    print("queue.qsize(): {}, queue.empty(): {}".format(queue.qsize(), queue.empty()))

    time.sleep(1)

Queue is empty as soon as the execution of the last task STARTS.

like image 938
Michał Malus Avatar asked Oct 18 '25 14:10

Michał Malus


1 Answers

There's no public interface for this. Someone's probably going to post a fragile solution poking at the internal attribute the queue uses to track unfinished tasks, but seriously, don't do that. That attribute isn't part of the documented API, and it could be renamed or redesigned in a future version.

Just track task completion yourself. One option would be to have a separate queue where the workers can send "task complete" messages to the orchestrator, and the orchestrator waits until it's received a number of messages equal to the number of tasks it assigned:

from Queue import Queue
from threading import Thread
import time

task_queue = Queue()
completion_queue = Queue()

def my_method(in_queue, out_queue):
    while True:
        task = in_queue.get()
        time.sleep((task + 2) * 3)
        in_queue.task_done()

        # Send completion message
        out_queue.put(task)

num_queue_threads = 2
queue_threads = [None] * num_queue_threads

for i in range(num_queue_threads):
    queue_threads[i] = Thread(target=my_method, args=(task_queue, completion_queue))
    queue_threads[i].setDaemon(True)
    queue_threads[i].start()

for task in range(3):
    task_queue.put(task)

for _ in range(3):
    completion_queue.get()
    completion_queue.task_done()
    print("One task done!")

print("All done!")
like image 141
user2357112 supports Monica Avatar answered Oct 21 '25 05:10

user2357112 supports Monica