Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ConcurrentQueue with multithreading

I am new to multi threading concepts. I need to add certain number of strings to a queue and process them with multiple threads. Using ConcurrentQueue which is thread safe.

This is what I have tried. But all the items added into concurrent queue are not processed. only first 4 items are processed.

class Program
{
    ConcurrentQueue<string> iQ = new ConcurrentQueue<string>();
    static void Main(string[] args)
    {
        new Program().run();
    }

    void run()
    {
        int threadCount = 4;
        Task[] workers = new Task[threadCount];

        for (int i = 0; i < threadCount; ++i)
        {
            int workerId = i;
            Task task = new Task(() => worker(workerId));
            workers[i] = task;
            task.Start();
        }

        for (int i = 0; i < 100; i++)
        {
            iQ.Enqueue("Item" + i);
        }

        Task.WaitAll(workers);
        Console.WriteLine("Done.");

        Console.ReadLine();
    }

    void worker(int workerId)
    {
        Console.WriteLine("Worker {0} is starting.", workerId);
        string op;
        if(iQ.TryDequeue(out op))
        {
            Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
        }

        Console.WriteLine("Worker {0} is stopping.", workerId);
    }


}
like image 200
Naveen kumar Avatar asked Oct 18 '25 07:10

Naveen kumar


2 Answers

There are a couple of issues with your implementation. The first and obvious one is that the worker method only dequeues zero or one item and then stops:

    if(iQ.TryDequeue(out op))
    {
        Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    }

It should be:

    while(iQ.TryDequeue(out op))
    {
        Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    }

That however won't be enough to make your program work properly. If your workers are dequeueing faster than the main thread is enqueueing, they will stop while the main task is still enqueueing. You need to signal the workers that they can stop. You can define a boolean variable that will be set to true once enqueueing is done:

for (int i = 0; i < 100; i++)
{
    iQ.Enqueue("Item" + i);
}
Volatile.Write(ref doneEnqueueing, true);

The workers will check the value:

void worker(int workerId)
{
    Console.WriteLine("Worker {0} is starting.", workerId);
    do {
        string op;
        while(iQ.TryDequeue(out op))
        {
            Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
        }
        SpinWait.SpinUntil(() => Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0));
    }
    while (!Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0))
    Console.WriteLine("Worker {0} is stopping.", workerId);
}  
like image 106
Sefe Avatar answered Oct 19 '25 22:10

Sefe


Your workers take one item out of the queue and then finish the work, just let them work till queue is empty.

Replace if in worker function with while

void worker(int workerId)
{
    Console.WriteLine("Worker {0} is starting.", workerId);
    string op;
    while (iQ.TryDequeue(out op))
    {
        Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    }

    Console.WriteLine("Worker {0} is stopping.", workerId);
}

As you will run it you will see, that near all items will be processed by two workers. Reason: your cpu has two cores, both are working and there is no "free tiem slot" to create new task. If you want to have all your 4 task to process items, you could add a delay to give your processor time to create anotehr tasks, something like:

while (iQ.TryDequeue(out op))
{
    Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    Task.Delay(TimeSpan.FromMilliseconds(1)).Wait();
}

that gives you output, that you want:

...
Worker 0 is processing item Item8
Worker 1 is processing item Item9
Worker 2 is processing item Item10
Worker 3 is processing item Item11
Worker 3 is processing item Item13
Worker 1 is processing item Item12
...
like image 21
Maksim Simkin Avatar answered Oct 19 '25 22:10

Maksim Simkin



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!