Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel receiving data from several IAsyncEnumerable streams

I have a case when I need to receive data from more than one IAsyncEnumerable source. For performance benefit it should be performed in parallel manner.

I have written such code to achieve this goal using AsyncAwaitBestPractices, System.Threading.Tasks.Dataflow and System.Linq.Async nuget packages:

public static async IAsyncEnumerable<T> ExecuteSimultaneouslyAsync<T>(
        this IEnumerable<IAsyncEnumerable<T>> sources,
        int outputQueueCapacity = 1,
        TaskScheduler scheduler = null)
    {
        var sourcesCount = sources.Count();

        var channel = outputQueueCapacity > 0 
            ? Channel.CreateBounded<T>(sourcesCount)
            : Channel.CreateUnbounded<T>();

        sources.AsyncParallelForEach(
                async body => 
                {
                    await foreach (var item in body)
                    {
                        await channel.Writer.WaitToWriteAsync();
                        await channel.Writer.WriteAsync(item);
                    }
                },
                maxDegreeOfParallelism: sourcesCount,
                scheduler: scheduler)
            .ContinueWith(_ => channel.Writer.Complete())
            .SafeFireAndForget();

        while (await channel.Reader.WaitToReadAsync())
            yield return await channel.Reader.ReadAsync();
    }

public static async Task AsyncParallelForEach<T>(
    this IEnumerable<T> source,
    Func<T, Task> body,
    int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    TaskScheduler scheduler = null)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDegreeOfParallelism
    };

    if (scheduler != null)
        options.TaskScheduler = scheduler;

    var block = new ActionBlock<T>(body, options);

    foreach (var item in source)
        block.Post(item);

    block.Complete();

    await block.Completion;
}

This code works fine until two or more sources throws exception. It leeds to situation when second exception can not be handled and crushes application in some cases.

So I wonder if there is better way to consume data from several IAsyncEnumerable sources in parallel manner?

like image 901
Alexander Zhyshkevich Avatar asked Nov 01 '25 00:11

Alexander Zhyshkevich


2 Answers

Keeping a pipeline running in case of exceptions is extremely difficult whether it's a functional or CSP pipeline. In most cases a pipeline will need to keep working even in case of individual message failures. A failing message doesn't mean the entire pipeline has failed.

That's why Railway-oriented programming is used to wrap messages and errors into Result<TOk,TError> wrappers and "redirect" or ignore error messages. Such a class makes programming Dataflow, Channels and IAsyncEnumerable pipelines a lot easier.

In F#, using discriminated unions, one could define a Result type just with

type Result<'T,'TError> =
    | Ok of ResultValue:'T
    | Error of ErrorValue:'TError

DUs aren't in C# yet, so various alternatives have been proposed, some using inheritance from an IResult<> base, some using classes/Records which allow exhaustive pattern matching, something not available with the IResult<> techniques.

Let's assume the Result<> here is :

public record Result<T>(T? result, Exception? error)
{
    public bool IsOk => error == null;
    public static Result<T> Ok(T result) => new(result, default);
    public static Result<T> Fail(Exception exception) =>
        new(default, exception);

    public static implicit operator Result<T> (T value) 
        =>  Result<T>.Ok(value);
    public static implicit operator Result<T>(Exception err) 
        => Result<T>.Fail(err);
}

The first step is to create a CopyAsync helper that will copy all data from the input IAsyncEnumerable<Result<T>> to an output ChannelWriter<Result<T>>

public static async Task CopyToAsync<T>(
           this IAsyncEnumerable<Result<T>> input, 
           ChannelWriter<Result<T>> output,
           CancellationToken token=default)
{
    try
    {
        await foreach(var msg in input.WithCancellationToken(token).ConfigureAwait(false))
        {
            await output.WriteAsync(msg).ConfigureAwait(false);
        }
    }
    catch(Exception exc)
    {
        await output.WriteAsync(Result.Fail(exc)).ConfigureAwait(false);
    }
}

This way, even if an exception is thrown, a Failure message will be emitted instead of aborting the pipeline.

With that, you can merge multiple sources by copying input messages to an output channel :

public static ChannelReader<Result<T>> Merge(
        this IEnumerable<IAsyncEnumerable<Result<T>> inputs,
        CancellationToken token=default)
{
    var channel=Channel.CreateBounded<Result<T>>(1);

    var tasks = inputs.Select(inp=>CopyToAsync(channel.Writer,token));

    _ = Task.WhenAll(tasks)
            .ContinueWith(t=>channel.Writer.TryComplete(t.Exception));

    return channel.Reader;
}

Using BoundedCapacity=1 maintains the backpressure behavior of downstread channels or consumers.

You can read all messages in a ChannelReader through Channel.ReadAllAsync(CancellationToken) :

IEnumerable<IAsyncEnumerable<Result<T>>> sources = ...;
var merged=sources.Merge();
await foreach(var msg in merged.ReadAllAsync())
{
    //Pattern magic to get Good results only
    if(msg is ({} value,null)
    {
        //Work with value
    }
}

You can avoid exposing the channel by returning IAsyncEnumerable<> :

public static IAsyncEnumerable<Result<T>> MergeAsync(
        this IEnumerable<IAsyncEnumerable<Result<T>> inputs,
        CancellationToken token=default)
{
    return inputs.Merge(token).ReadAllAsync(token);
}

You can use System.Linq.Async to work on an IAsyncEnumerable<> using LINQ methods, eg to convert an IAsyncEnumerable<T> to an IAsyncEnumerable<Result<T>> :

source.Select(msg=>Result.Ok(msg))

Or filter failed messages before processing them :

source.Where(msg=>msg.IsOk)

You could create a method that applies a Func<T1,Task<T2>> to an input and propagates results or errors as results :

public async Task<Result<T2>> ApplyAsync<T1,T2>(this Result<T1> msg,
                                               Func<T1,Task<T2>> func)
{            
    if (msg is (_, { } err))
    {
        return err;
    }
    try
    {
        var result = await func(msg.result).ConfigureAwait(false);
        return result;
    }
    catch(Exception exc)
    {
        return exc;
    }
}

This is a ... bit ... easier in F#

like image 147
Panagiotis Kanavos Avatar answered Nov 02 '25 15:11

Panagiotis Kanavos


Inspired with this answer I desided to update my own code (see dependencies in question)

public record Result<T>(T Data = default, Exception error = null)
{
    public bool IsOk => error == null;
    public static Result<T> Ok(T result) => new(result, default);
    public static Result<T> Fail(Exception exception) =>
        new(default, exception);

    public static implicit operator Result<T>(T value)
        => Ok(value);

    public static implicit operator Result<T>(Exception err)
        => Fail(err);
}

public static async ValueTask AsyncParallelForEach<T>(
    this IEnumerable<T> source,
    Func<T, Task> body,
    int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    TaskScheduler scheduler = null)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDegreeOfParallelism
    };

    if (scheduler != null)
        options.TaskScheduler = scheduler;

    var block = new ActionBlock<T>(body, options);

    foreach (var item in source)
        block.Post(item);

    block.Complete();

    await block.Completion;
}

public static async IAsyncEnumerable<Result<T>> ExecuteParallelAsync<T>(
    this IEnumerable<IAsyncEnumerable<T>> sources,
    int outputQueueCapacity = 1,
    TaskScheduler scheduler = null)
{
    var sourcesCount = sources.Count();

    var channel = outputQueueCapacity > 0
        ? Channel.CreateBounded<Result<T>>(sourcesCount)
        : Channel.CreateUnbounded<Result<T>>();

    sources.AsyncParallelForEach(
            async body =>
            {
                try
                {
                    await foreach (var item in body)
                    {
                        if (await channel.Writer.WaitToWriteAsync().ConfigureAwait(false))
                            await channel.Writer.WriteAsync(item).ConfigureAwait(false);
                    }
                }
                catch (Exception ex)
                {
                    if (await channel.Writer.WaitToWriteAsync().ConfigureAwait(false))
                        await channel.Writer.WriteAsync(Result<T>.Fail(ex)).ConfigureAwait(false);
                }
            },
            maxDegreeOfParallelism: sourcesCount,
            scheduler: scheduler)
        .AsTask()
        .ContinueWith(_ => channel.Writer.Complete())
        .SafeFireAndForget();

    while (await channel.Reader.WaitToReadAsync().ConfigureAwait(false))
        yield return await channel.Reader.ReadAsync().ConfigureAwait(false);
}

Maybe this code will be easier to understand for someone rather than code in origin answer.

N.B!. You need to use c# 9 or above language version to be able to use records. Also if you are using .net framework 4x (as I have to) you have to do some tricks described in this article. In short you have to write below code somewhere in your project:

namespace System.Runtime.CompilerServices
{
    using System.ComponentModel;
    /// <summary>
    /// Reserved to be used by the compiler for tracking metadata.
    /// This class should not be used by developers in source code.
    /// </summary>
    [EditorBrowsable(EditorBrowsableState.Never)]
    internal static class IsExternalInit { }
}
like image 22
Alexander Zhyshkevich Avatar answered Nov 02 '25 15:11

Alexander Zhyshkevich



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!