Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Running multiple operations with max concurrency - 2 last tasks are not executed

I have created a class that allows me to run multiple operations concurrently with an option to set a max concurrency limit. I.e., if I have 100 operations to do, and I set maxCurrency to 10, at any given time, maximum 10 operations should be running concurrently. Eventually, all of the operations should be executed.

Here's the code:

public async Task<IReadOnlyCollection<T>> Run<T>(IEnumerable<Func<CancellationToken, Task<T>>> operations, int maxConcurrency, CancellationToken ct)
{
    using var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
    var results = new ConcurrentBag<T>();
    var tasks = new List<Task>();
    foreach (var operation in operations)
    {
        await semaphore.WaitAsync(ct).ConfigureAwait(false);

        var task = Task.Factory.StartNew(async () =>
        {
            try
            {
                Debug.WriteLine($"Adding new result");
                var singleResult = await operation(ct).ConfigureAwait(false);
                results.Add(singleResult);
                Debug.WriteLine($"Added {singleResult}");
            }
            finally
            {
                semaphore.Release();
            }
        }, ct);
        tasks.Add(task);
    }
    await Task.WhenAll(tasks).ConfigureAwait(false);

    Debug.WriteLine($"Completed tasks: {tasks.Count(t => t.IsCompleted)}");
    Debug.WriteLine($"Calculated results: {results.Count}");
    
    return results.ToList().AsReadOnly();
}

Here's an example of how I use it:

var operations = Enumerable.Range(1, 10)
    .Select<int, Func<CancellationToken, Task<int>>>(n => async ct =>
    {
        await Task.Delay(100, ct);
        return n;
    });

var data = await _sut.Run(operations, 2, CancellationToken.None);

Every time I execute this, the data collection has just 8 results. I'd expect to have 10 results.

Here's the Debug log:

Adding new
Adding new
Added 1
Added 2
Adding new
Adding new
Added 3
Added 4
Adding new
Adding new
Added 5
Adding new
Added 6
Adding new
Added 7
Adding new
Added 8
Adding new
Completed tasks: 10
Calculated results: 8

As you can see:

  • 10 tasks are Completed
  • "Adding new" is logged 10 times
  • "Added x" is logged 8 times

I do not understand why the 2 last operations are not finished. All tasks have IsComplete set as true, which, as I understand, should mean that all of them got executed to an end.

like image 463
mnj Avatar asked Dec 07 '25 10:12

mnj


1 Answers

The issue here is that Task.Factory.StartNew returns a task that when awaited returns the inner task.

It does not give you a task that will wait for this inner task, hence your problem.

The easiest way to fix this is to call Unwrap on the tasks you create, which will unwrap the inner task and allow you to wait for that.

This should work:

var task = ....
    ....
}, ct).Unwrap();

with this small change you get this output:

...
Added 9
Added 10
Completed tasks: 10
Calculated results: 10

Note that my comments on your question still stands:

  1. You're still working with the illusion that WhenAll will wait for all tasks, when in reality all tasks except the last N have already completed because the loop itself doesn't continue until the previous tasks have completed. You should thus move the synchronization object acquisition into your inner task so that you can queue them all up before you start waiting for them.
  2. I also believe (though I don't 100% know) that using SemaphoreSlim is not a good approach as I believe any thread-related synchronization objects might be unsafe to use in a task-related work. Threads in the threadpool are reused while live tasks are waiting for subtasks to complete which means such a thread might already own the synchronization object from a previous task that has yet to complete and thus allow more than those 2 you wanted to run to run at the "same time". SemaphoreSlim is OK to use, the other synchronization primitives might not be.
like image 53
Lasse V. Karlsen Avatar answered Dec 10 '25 00:12

Lasse V. Karlsen