I have created a class that allows me to run multiple operations concurrently with an option to set a max concurrency limit. I.e., if I have 100 operations to do, and I set maxCurrency to 10, at any given time, maximum 10 operations should be running concurrently. Eventually, all of the operations should be executed.
Here's the code:
public async Task<IReadOnlyCollection<T>> Run<T>(IEnumerable<Func<CancellationToken, Task<T>>> operations, int maxConcurrency, CancellationToken ct)
{
using var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
var results = new ConcurrentBag<T>();
var tasks = new List<Task>();
foreach (var operation in operations)
{
await semaphore.WaitAsync(ct).ConfigureAwait(false);
var task = Task.Factory.StartNew(async () =>
{
try
{
Debug.WriteLine($"Adding new result");
var singleResult = await operation(ct).ConfigureAwait(false);
results.Add(singleResult);
Debug.WriteLine($"Added {singleResult}");
}
finally
{
semaphore.Release();
}
}, ct);
tasks.Add(task);
}
await Task.WhenAll(tasks).ConfigureAwait(false);
Debug.WriteLine($"Completed tasks: {tasks.Count(t => t.IsCompleted)}");
Debug.WriteLine($"Calculated results: {results.Count}");
return results.ToList().AsReadOnly();
}
Here's an example of how I use it:
var operations = Enumerable.Range(1, 10)
.Select<int, Func<CancellationToken, Task<int>>>(n => async ct =>
{
await Task.Delay(100, ct);
return n;
});
var data = await _sut.Run(operations, 2, CancellationToken.None);
Every time I execute this, the data collection has just 8 results. I'd expect to have 10 results.
Here's the Debug log:
Adding new
Adding new
Added 1
Added 2
Adding new
Adding new
Added 3
Added 4
Adding new
Adding new
Added 5
Adding new
Added 6
Adding new
Added 7
Adding new
Added 8
Adding new
Completed tasks: 10
Calculated results: 8
As you can see:
I do not understand why the 2 last operations are not finished. All tasks have IsComplete set as true, which, as I understand, should mean that all of them got executed to an end.
The issue here is that Task.Factory.StartNew returns a task that when awaited returns the inner task.
It does not give you a task that will wait for this inner task, hence your problem.
The easiest way to fix this is to call Unwrap on the tasks you create, which will unwrap the inner task and allow you to wait for that.
This should work:
var task = ....
....
}, ct).Unwrap();
with this small change you get this output:
...
Added 9
Added 10
Completed tasks: 10
Calculated results: 10
Note that my comments on your question still stands:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With