Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Replay timestamped event stream with Reactive Extensions

I have a collection of items of following class:

public class Event
{
    public DateTimeOffset Timestamp;
    public object Data;
}

I want to create IObservable<Event> where each item is published at the time of Timestamp in the future. Is this possible with Observable.Delay or do I have to write my own IObservable<T> implementation?

I will mention that this structure is something like a log file. There can be tens of thousands of Event items, but only 1-2 are to be published per second.

like image 709
ghord Avatar asked Sep 13 '25 12:09

ghord


1 Answers

While my first answer is working as intended, performance of creating the observable sequence is not ideal with hundreds of thousands of events - you pay substantial initialization cost (order of 10 seconds on my machine).

To improve performance, taking advantage of already sorted nature of my data, I implemented custom IEnumerable<Event> that is looping through events, yielding and sleeping between them. With this IEnumerable one can easily call ToObservable<T> and it works as intended:

IObservable<Event> CreateSimulation(IEnumerable<Event> events)
{
     IEnumerable<Event> simulation()
     {
         foreach(var ev in events)
         {
             var now = DateTime.UtcNow;

             if(ev.Timestamp > now)
             {
                 Thread.Sleep(ev.Timestamp - now);
             }

             yield return ev;          
        }
    }

    return simulation().ToObservable();
}
like image 74
ghord Avatar answered Sep 15 '25 02:09

ghord