Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to consume a UDP stream of bytes using RX extensions in .net

I've come up with this solution. ( Not tested yet ) via a lot of bouncing around on the web.

Private Function ObserveUDP() As IObservable(Of bytes())


    Dim f = Function(observer)
                Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
                Dim client = New UdpClient(endpoint)

                Dim obs = observable.*emphasized text*Generate(Of Task(Of UdpReceiveResult), UdpReceiveResult) _
                      ( Nothing _
                      , Function(task As Task(Of UdpReceiveResult)) task Is Nothing Or Not task.IsCompleted() _
                      , Function(task As Task(Of UdpReceiveResult)) client.ReceiveAsync() _
                      , Function(task As Task(Of UdpReceiveResult)) task.Result)

                Dim observable = obs.Select(Function(r) r.Buffer)

                dim handle = observable.Subscribe(observer)

                Dim df = Sub() 
                    client.Close()
                    handle.Dispose()
                End Sub

                Return Disposable.Create(df)

    End Function

    Return observable.Create(f)

End Function

My requirement was to make sure the UDP client is closed when the subscription is droppped. I'm pretty sure the above code is close but I think it's not quite right. Any input would be appreciated.

* EDIT *

Actually the above example is totally wrong and will just create a large number of task objects synchronously but not await them. After a bit of trial and error I've come up with the following generic function for unfolding an awaitable which is called over and over again. Any comments?

''' initializer - a function that initializes and returns the state object
''' generator   - a function that asynchronously using await generates each value
''' finalizer   - a function for cleaning up the state object when the sequence is unsubscribed

Private Function ObservableAsyncSeq(Of T, I)( _
    initializer As Func(Of I), _
    generator As Func(Of I, Task(Of T)), _
    finalizer As Action(Of I))  As IObservable(Of T)

    Dim q = Function(observer As IObserver(Of T))
                Dim go = True
                Try
                    Dim r = Async Sub()
                                Dim ii As I = initializer()
                                While go
                                    Dim result = Await generator(ii)
                                    observer.OnNext(result)
                                End While
                                finalizer(ii)
                                observer.OnCompleted()
                            End Sub
                    Task.Run(r)
                Catch ex As Exception
                    observer.OnError(ex)
                End Try

                ' Disposable for stopping the sequence as per
                ' the observable contract
                Return Sub() go = False

            End Function

    Return Observable.Create(q)
End Function

And example of use with UDP

Private Function ObserveMeasurementPoints2() As IObservable(Of ProcessedDate)
    Dim initializer = Function()
                          Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
                          Return New UdpClient(endpoint)
                      End Function

    Dim finalizer = Function(client As UdpClient)
                        client.Close()
                    End Function

    Dim generator = Function(client As UdpClient) As Task(Of UdpReceiveResult)
                        Return client.ReceiveAsync()
                    End Function

    Return ObservableAsyncSeq(initializer, generator, finalizer).Select(Function(r) ProcessBytes(r.Buffer))

End Function
like image 419
bradgonesurfing Avatar asked Oct 21 '25 07:10

bradgonesurfing


2 Answers

You can use either Observable.Using as Enigmativity mentioned or simply use the regular Observable.Create method which accepts an IDisposable as a return argument - this is enough for safe disposal.

Using iterators or async is perfectly fine. I've listed a more Rx-ish way to do this:

Public Shared Function UdpStream(Of T)(endpoint As IPEndPoint, processor As Func(Of Byte(), T)) As IObservable(Of T)
    Return Observable.Using(Of T, UdpClient)(
        Function() New UdpClient(endpoint),
        Function(udpClient) _
            Observable.Defer(Function() udpClient.ReceiveAsync().ToObservable()) _
            .Repeat() _
            .Select(Function(result) processor(result.Buffer))
    )
End Function

Legacy way:

Public Shared Function UdpStream(Of T)(endpoint As IPEndPoint, processor As Func(Of Byte(), T)) As IObservable(Of T)
    Return Observable.Using(
        Function() New UdpClient(endpoint),
        Function(udpClient) Observable.Defer( _
        Observable.FromAsyncPattern(
            AddressOf udpClient.BeginReceive,
            Function(iar)
                Dim remoteEp = TryCast(iar.AsyncState, IPEndPoint)
                Return udpClient.EndReceive(iar, remoteEp)
            End Function)
        ).Repeat() _
         .Select(processor)
    )
End Function

Test:

Shared Sub Main()
    Using UdpStream(New IPEndPoint(IPAddress.Loopback, 13200),
                    Function(bytes) String.Join(",", bytes)
                    ).Subscribe(AddressOf Console.WriteLine)
        Console.ReadLine()
    End Using

    Console.WriteLine("Done")
    Console.ReadKey()
End Sub
like image 95
Asti Avatar answered Oct 23 '25 23:10

Asti


Take a look at Observable.Using - it is specifically used to create an observable that uses a disposable resource to generate its values and when completed it automatically disposes the resource.

You'll find that the UdpClient has identical Close & Dispose method implementations so you don't need to call Close if you call Dispose.

From reflector:

void IDisposable.Dispose()
{
    this.Dispose(true);
}

public void Close()
{
    this.Dispose(true);
}

Here's the signature for Using:

Public Shared Function Using(Of TSource, TResource As IDisposable)(
    ByVal resourceFactory As Func(Of TResource),
    ByVal observableFactory As Func(Of TResource, IObservable(Of TSource)))
        As IObservable(Of TSource)
like image 22
Enigmativity Avatar answered Oct 23 '25 21:10

Enigmativity