I am new to multi threading concepts. I need to add certain number of strings to a queue and process them with multiple threads. Using ConcurrentQueue
which is thread safe.
This is what I have tried. But all the items added into concurrent queue are not processed. only first 4 items are processed.
class Program
{
ConcurrentQueue<string> iQ = new ConcurrentQueue<string>();
static void Main(string[] args)
{
new Program().run();
}
void run()
{
int threadCount = 4;
Task[] workers = new Task[threadCount];
for (int i = 0; i < threadCount; ++i)
{
int workerId = i;
Task task = new Task(() => worker(workerId));
workers[i] = task;
task.Start();
}
for (int i = 0; i < 100; i++)
{
iQ.Enqueue("Item" + i);
}
Task.WaitAll(workers);
Console.WriteLine("Done.");
Console.ReadLine();
}
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
string op;
if(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
Console.WriteLine("Worker {0} is stopping.", workerId);
}
}
There are a couple of issues with your implementation. The first and obvious one is that the worker
method only dequeues zero or one item and then stops:
if(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
It should be:
while(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
That however won't be enough to make your program work properly. If your workers are dequeueing faster than the main thread is enqueueing, they will stop while the main task is still enqueueing. You need to signal the workers that they can stop. You can define a boolean variable that will be set to true
once enqueueing is done:
for (int i = 0; i < 100; i++)
{
iQ.Enqueue("Item" + i);
}
Volatile.Write(ref doneEnqueueing, true);
The workers will check the value:
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
do {
string op;
while(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
SpinWait.SpinUntil(() => Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0));
}
while (!Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0))
Console.WriteLine("Worker {0} is stopping.", workerId);
}
Your workers take one item out of the queue
and then finish the work, just let them work till queue
is empty.
Replace if
in worker function with while
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
string op;
while (iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
Console.WriteLine("Worker {0} is stopping.", workerId);
}
As you will run it you will see, that near all items will be processed by two workers. Reason: your cpu has two cores, both are working and there is no "free tiem slot" to create new task. If you want to have all your 4 task to process items, you could add a delay to give your processor time to create anotehr tasks, something like:
while (iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
Task.Delay(TimeSpan.FromMilliseconds(1)).Wait();
}
that gives you output, that you want:
...
Worker 0 is processing item Item8
Worker 1 is processing item Item9
Worker 2 is processing item Item10
Worker 3 is processing item Item11
Worker 3 is processing item Item13
Worker 1 is processing item Item12
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With