I've written a sample test that replicates the issue. This is not my actual code, I've tried to write a small repro. If you increase the bounding capacity to the number of iterations effectively giving it no bounding it does not deadlock and if you put the max parallelism to a small number like 1 it does not deadlock.
Again, I know the code below is not great but the code I actually found this in was much larger and hard to understand. Basically there was a blocking object pool of connections to a remote resource and several of the blocks in the flow used the connection.
Any ideas on how to solve this? At first glance it appears to be a problem with dataflow. When I break to take a look at the threads I see many threads blocked on Add and 0 threads blocked on take. There are several items in the addBlocks outbound queue that have not yet propagated to the takeblock so it's stuck or deadlocked.
    var blockingCollection = new BlockingCollection<int>(10000);
    var takeBlock = new ActionBlock<int>((i) =>
    {
        int j = blockingCollection.Take();
    }, new ExecutionDataflowBlockOptions()
           {
              MaxDegreeOfParallelism = 20,
              SingleProducerConstrained = true
           });
    var addBlock = new TransformBlock<int, int>((i) => 
    {
        blockingCollection.Add(i);
        return i;
    }, new ExecutionDataflowBlockOptions()
           {
              MaxDegreeOfParallelism = 20
           });
    addBlock.LinkTo(takeBlock, new DataflowLinkOptions()
          {
             PropagateCompletion = true
          });
    for (int i = 0; i < 100000; i++)
    {
        addBlock.Post(i);
    }
    addBlock.Complete();
    await addBlock.Completion;
    await takeBlock.Completion;
TPL Dataflow wasn't meant to be used with code that is blocking a lot, and I think this issue stems from that.
I couldn't figure out what exactly is going on, but I think a solution would be to use a non-blocking collection. Conveniently, Dataflow provides you with one in the form of BufferBlock. With that, your code would look like this:
var bufferBlock = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10000 });
var takeBlock = new ActionBlock<int>(
    async i =>
    {
        int j = await bufferBlock.ReceiveAsync();
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 20,
        SingleProducerConstrained = true
    });
var addBlock = new TransformBlock<int, int>(
    async i =>
    {
        await bufferBlock.SendAsync(i);
        return i;
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 20
    });
Although I find the whole design of your code suspicious. If you want to send some additional data along with the normal result of a block, change the type of the output of that block to a type that includes that additional data.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With