Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CoreWCF stream continues to be written to after client disconnects

Tags:

c#

.net

wcf

corewcf

I have a very simple service class:

using System.Diagnostics;

var builder = WebApplication.CreateBuilder();

builder.Services.AddServiceModelServices();
builder.Services.AddServiceModelMetadata();
builder.Services.AddSingleton<IServiceBehavior, UseRequestHeadersForMetadataAddressBehavior>();
builder.WebHost.UseKestrel(options =>
{
    options.AllowSynchronousIO = true;
    options.ListenLocalhost(7151, listenOptions =>
    {
        listenOptions.UseHttps();
        if (Debugger.IsAttached)
        {
            listenOptions.UseConnectionLogging();
        }
    });
});

var app = builder.Build();

app.UseServiceModel(serviceBuilder =>
{
    serviceBuilder.AddService<StreamingService>();
    serviceBuilder.AddServiceEndpoint<StreamingService, IStreamingService>(new BasicHttpBinding(BasicHttpSecurityMode.Transport){TransferMode = TransferMode.Streamed}, $"https://localhost:7151/StreamingService.svc");
 
    var serviceMetadataBehavior = app.Services.GetRequiredService<ServiceMetadataBehavior>();
    serviceMetadataBehavior.HttpsGetEnabled = true;
});

app.Run();

Where RandomStream is a stream that returns random bytes:

namespace RandomNumberCore;

public class RandomStream : Stream
{
    public RandomStream(Random random)
    {
        this._random = random;
    }
    
    private int _sequence;
    private readonly Random _random;
    public override bool CanRead => true;

    public override bool CanSeek => false;

    public override bool CanWrite => false;

    public override long Length => throw new NotSupportedException();

    // ReSharper disable once ValueParameterNotUsed
    public override long Position { get => _sequence; set => throw new NotSupportedException(); }

    public override void Flush()
    {}
    
    public override int Read(byte[] buffer, int offset, int count)
    {
        var internalBuffer = new Span<byte>(buffer, offset, count);
        _random.NextBytes(internalBuffer);
        _sequence+=count;
        return count;
    }

    public override int Read(Span<byte> buffer)
    {
        _random.NextBytes(buffer);
        _sequence+=buffer.Length;
        return buffer.Length;
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotSupportedException();
    }

    public override void SetLength(long value)
    {
        throw new NotSupportedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        throw new NotSupportedException();
    }
}

And Streaming Service is:

namespace RandomNumberCore;


[ServiceContract]
public interface IStreamingService
{
    [OperationContract]
    Stream GetRandomStream();
}
    

public class StreamingService : IStreamingService
{
    public Stream GetRandomStream()
    {
        return new RandomStream(Random.Shared);
    }
}

And a simple client:

using System.ServiceModel;

namespace RandomNumberConsumerNet8
{
    [ServiceContract]
    public interface IStreamingService
    {
        [OperationContract]
        Stream GetRandomStream();
    }
    public interface IStreamingServiceChannel : IStreamingService, IClientChannel;

    internal class Program
    {
        public static async Task Main(string[] args)
        {
            var cts = new CancellationTokenSource();
            using var channelFactory = new ChannelFactory<IStreamingServiceChannel>(new BasicHttpBinding(BasicHttpSecurityMode.Transport){TransferMode = TransferMode.Streamed, MaxReceivedMessageSize = 1_000_000_000 }, new EndpointAddress("https://localhost:7151/StreamingService.svc"));
            using var service = channelFactory.CreateChannel();
            service.Open();
            using var randomStream = service.GetRandomStream();
            byte[] buffer = new byte[4];
            await randomStream.ReadExactlyAsync(buffer, cts.Token);
            service.Close();
            channelFactory.Close();
        }
    }
}

The code appears to work fine, I can read the stream and it appears to contain random bytes.

The problem is that every time I GetRandomStream, I get very high CPU usage on the server. Profiling it shows that the service is still trying to write to the stream under CoreWCF.Dispatcher.ImmutableDispatchRuntime.ProcessMessageAsync, but long after the client has disconnected.

like image 795
Eterm Avatar asked Nov 30 '25 02:11

Eterm


1 Answers

I can reproduce the behaviour, and I believe you have found a bug in CoreWCF.

The technical term for the condition is “abandoned request”. A client sends an HTTP request, and closes the connection before receiving the complete response body. In your test case response body has infinite length, which makes it impossible to receive the complete response.

It’s a bit tricky, but possible to implement the workaround on your side.

Setup the framework to provide you HTTP contexts using dependency injection, with the following two lines in the main function of the server before the builder.Build() line:

builder.Services.AddSingleton<IHttpContextAccessor, HttpContextAccessor>();
builder.Services.AddScoped<StreamingService>();

Consume that object in your service class, pass the CancellationToken to your output stream.

sealed class StreamingService: IStreamingService
{
    readonly IHttpContextAccessor context;
    public StreamingService( IHttpContextAccessor context ) =>
        this.context = context;

    public Stream GetRandomStream()
    {
        CancellationToken token = context.HttpContext?.RequestAborted ?? CancellationToken.None;
        return new Server.RandomStream( Random.Shared, token );
    }
}

Finally, subscribe to aborted HTTP request event in your stream class, disposing the stream when the request is aborted.

sealed class RandomStream: Stream
{
    public RandomStream( Random random, CancellationToken token )
    {
        this.random = random;
        cancelReg = token.Register( Dispose );
    }

    readonly Random random;
    long sequence = 0;
    CancellationTokenRegistration? cancelReg;
    volatile bool isDisposed = false;

    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;

    public override long Length => throw new NotSupportedException();

    public override long Position
    {
        get => sequence;
        set => throw new NotSupportedException();
    }

    public override void Flush() { }

    public override int Read( byte[] buffer, int offset, int count )
    {
        if( isDisposed )
            throw new ObjectDisposedException( null );

        var internalBuffer = new Span<byte>( buffer, offset, count );
        random.NextBytes( internalBuffer );
        sequence += count;
        return count;
    }

    public override int Read( Span<byte> buffer )
    {
        if( isDisposed )
            throw new ObjectDisposedException( null );

        random.NextBytes( buffer );
        sequence += buffer.Length;
        return buffer.Length;
    }

    public override long Seek( long offset, SeekOrigin origin ) =>
        throw new NotSupportedException();

    public override void SetLength( long value ) =>
        throw new NotSupportedException();

    public override void Write( byte[] buffer, int offset, int count ) =>
        throw new NotSupportedException();

    protected override void Dispose( bool disposing )
    {
        base.Dispose( disposing );
        isDisposed = true;
        cancelReg?.Unregister();
        cancelReg = null;
    }
}

That ObjectDisposedException thrown from the Read methods is sufficient for the CoreWCF runtime to do the right thing, which is quit the endless loop which generates gigabytes of random numbers, encodes them to Base64, and sends the text to some black hole deep inside the framework.

like image 76
Soonts Avatar answered Dec 02 '25 16:12

Soonts



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!