I've come up with this solution. ( Not tested yet ) via a lot of bouncing around on the web.
Private Function ObserveUDP() As IObservable(Of bytes())
Dim f = Function(observer)
Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
Dim client = New UdpClient(endpoint)
Dim obs = observable.*emphasized text*Generate(Of Task(Of UdpReceiveResult), UdpReceiveResult) _
( Nothing _
, Function(task As Task(Of UdpReceiveResult)) task Is Nothing Or Not task.IsCompleted() _
, Function(task As Task(Of UdpReceiveResult)) client.ReceiveAsync() _
, Function(task As Task(Of UdpReceiveResult)) task.Result)
Dim observable = obs.Select(Function(r) r.Buffer)
dim handle = observable.Subscribe(observer)
Dim df = Sub()
client.Close()
handle.Dispose()
End Sub
Return Disposable.Create(df)
End Function
Return observable.Create(f)
End Function
My requirement was to make sure the UDP client is closed when the subscription is droppped. I'm pretty sure the above code is close but I think it's not quite right. Any input would be appreciated.
* EDIT *
Actually the above example is totally wrong and will just create a large number of task objects synchronously but not await them. After a bit of trial and error I've come up with the following generic function for unfolding an awaitable which is called over and over again. Any comments?
''' initializer - a function that initializes and returns the state object
''' generator - a function that asynchronously using await generates each value
''' finalizer - a function for cleaning up the state object when the sequence is unsubscribed
Private Function ObservableAsyncSeq(Of T, I)( _
initializer As Func(Of I), _
generator As Func(Of I, Task(Of T)), _
finalizer As Action(Of I)) As IObservable(Of T)
Dim q = Function(observer As IObserver(Of T))
Dim go = True
Try
Dim r = Async Sub()
Dim ii As I = initializer()
While go
Dim result = Await generator(ii)
observer.OnNext(result)
End While
finalizer(ii)
observer.OnCompleted()
End Sub
Task.Run(r)
Catch ex As Exception
observer.OnError(ex)
End Try
' Disposable for stopping the sequence as per
' the observable contract
Return Sub() go = False
End Function
Return Observable.Create(q)
End Function
And example of use with UDP
Private Function ObserveMeasurementPoints2() As IObservable(Of ProcessedDate)
Dim initializer = Function()
Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
Return New UdpClient(endpoint)
End Function
Dim finalizer = Function(client As UdpClient)
client.Close()
End Function
Dim generator = Function(client As UdpClient) As Task(Of UdpReceiveResult)
Return client.ReceiveAsync()
End Function
Return ObservableAsyncSeq(initializer, generator, finalizer).Select(Function(r) ProcessBytes(r.Buffer))
End Function
You can use either Observable.Using as Enigmativity mentioned or simply use the regular Observable.Create method which accepts an IDisposable as a return argument - this is enough for safe disposal.
Using iterators or async is perfectly fine. I've listed a more Rx-ish way to do this:
Public Shared Function UdpStream(Of T)(endpoint As IPEndPoint, processor As Func(Of Byte(), T)) As IObservable(Of T)
Return Observable.Using(Of T, UdpClient)(
Function() New UdpClient(endpoint),
Function(udpClient) _
Observable.Defer(Function() udpClient.ReceiveAsync().ToObservable()) _
.Repeat() _
.Select(Function(result) processor(result.Buffer))
)
End Function
Legacy way:
Public Shared Function UdpStream(Of T)(endpoint As IPEndPoint, processor As Func(Of Byte(), T)) As IObservable(Of T)
Return Observable.Using(
Function() New UdpClient(endpoint),
Function(udpClient) Observable.Defer( _
Observable.FromAsyncPattern(
AddressOf udpClient.BeginReceive,
Function(iar)
Dim remoteEp = TryCast(iar.AsyncState, IPEndPoint)
Return udpClient.EndReceive(iar, remoteEp)
End Function)
).Repeat() _
.Select(processor)
)
End Function
Test:
Shared Sub Main()
Using UdpStream(New IPEndPoint(IPAddress.Loopback, 13200),
Function(bytes) String.Join(",", bytes)
).Subscribe(AddressOf Console.WriteLine)
Console.ReadLine()
End Using
Console.WriteLine("Done")
Console.ReadKey()
End Sub
Take a look at Observable.Using - it is specifically used to create an observable that uses a disposable resource to generate its values and when completed it automatically disposes the resource.
You'll find that the UdpClient has identical Close & Dispose method implementations so you don't need to call Close if you call Dispose.
From reflector:
void IDisposable.Dispose()
{
this.Dispose(true);
}
public void Close()
{
this.Dispose(true);
}
Here's the signature for Using:
Public Shared Function Using(Of TSource, TResource As IDisposable)(
ByVal resourceFactory As Func(Of TResource),
ByVal observableFactory As Func(Of TResource, IObservable(Of TSource)))
As IObservable(Of TSource)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With