Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Thread-safe queue with tasks

I have some kind of work manager in c# which will receive tasks to do and it will execute them. Task will be arriving from different threads, but they must be executed only one at the same time in the order they were received. I don't want a while loop, which will be running all the time, checking if they are new tasks in the queue. Is there a built-in queue or an easy way to implement a queue which will wait for tasks and execute them synchronously without busy-waiting?

like image 980
user3126358 Avatar asked Mar 05 '26 02:03

user3126358


2 Answers

As per the comments you should look into ConcurrentQueue but also BlockingCollection and use GetConsumingEnumerable() instead of your unwanted WHILE loop

BlockingCollection<YourClass> _collection =
            new BlockingCollection<YourClass>(new ConcurrentQueue<YourClass>());

_collection.Add() can be called from multiple threads

On a separate thread you can use

foreach (var message in _collection.GetConsumingEnumerable())
{}
like image 61
Janus Pienaar Avatar answered Mar 06 '26 14:03

Janus Pienaar


You can use a SemaphoreSlim (https://msdn.microsoft.com/en-us/library/system.threading.semaphoreslim(v=vs.110).aspx) and a ConcurrentQueue

Example:

    private delegate void TaskBody();

    private class TaskManager
    {
        private ConcurrentQueue<TaskBody>
            TaskBodyQueue = new ConcurrentQueue<TaskBody>();

        private readonly SemaphoreSlim 
            TaskBodySemaphoreSlim = new SemaphoreSlim(1, 1);

        public async void Enqueue(TaskBody body)
        {
            TaskBodyQueue.Enqueue(body);

            await TaskBodySemaphoreSlim.WaitAsync();

            Console.WriteLine($"Cycle ...");

            if (TaskBodyQueue.TryDequeue(out body) == false) {
                throw new InvalidProgramException($"TaskBodyQueue is empty!");
            }

            body();

            Console.WriteLine($"Cycle ... done ({TaskBodyQueue.Count} left)");

            TaskBodySemaphoreSlim.Release();
        }
    }

    public static void Main(string[] args)
    {
        var random = new Random();
        var tm = new TaskManager();

        Parallel.ForEach(Enumerable.Range(0, 30), async number => {
            await Task.Delay(100 * number);

            tm.Enqueue(delegate {
                Console.WriteLine($"Print {number}");
            });
        });

        Task
            .Delay(4000)
            .Wait();

        WaitFor(action: "exit");
    }

    public static void WaitFor(ConsoleKey consoleKey = ConsoleKey.Escape, string action = "continue")
    {
        Console.Write($"Press {consoleKey} to {action} ...");

        var consoleKeyInfo = default(ConsoleKeyInfo);

        do {
            consoleKeyInfo = Console.ReadKey(true);
        }
        while (Equals(consoleKeyInfo.Key, consoleKey) == false);

        Console.WriteLine();
    }
like image 32
incureforce Avatar answered Mar 06 '26 15:03

incureforce



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!