Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ObserveOnDispatcher dose not invoke the handler in UI thread

I have a message bus class which use Rx to push multiple threads events in WPF application. My problem ObserveOnDispatcher does not invoke the event handler in the UI thread.

Code:

private void button_Click(object sender, RoutedEventArgs e)
{
  var messageBus = new MessageBus();
  messageBus.GetMessages<Message>().ObserveOnDispatcher().Subscribe(x => TestHanlder(x));

  Trace.WriteLine("Main Thread Id:" + Thread.CurrentThread.ManagedThreadId);
  var deviceManager = new DeviceManager(messageBus);
  deviceManager.Start();
}

private void TestHanlder(Message message)
{
  Trace.WriteLine("UI Handler ThreadId:" + Thread.CurrentThread.ManagedThreadId);
}

public class DeviceManager
{
  private readonly MessageBus _messageBus;

  public DeviceManager(MessageBus messageBus)
  {
    _messageBus = messageBus;
  }

  public void Start()
  {
    for (;;)
    {
      var t = Task.Factory.StartNew(() => BackGroundTask(), TaskCreationOptions.LongRunning);
      t.Wait();
    }
  }

  private void BackGroundTask()
  {
    Thread.Sleep(1000);
    Trace.WriteLine("Push ThreadId:" + Thread.CurrentThread.ManagedThreadId);
    var message = new Message();
    _messageBus.Publish(message);
  }
}

public class MessageBus
{
  readonly ISubject<object> _messages;

  public MessageBus()
  {
    _messages = new Subject<object>();
  }
  public void Publish<TMessage>(TMessage message)
  {
    _messages.OnNext(message);
  }

  public IObservable<TMessage> GetMessages<TMessage>()
  {
    return _messages.OfType<TMessage>();
  }
}

public class Message
{
  public Message()
  {

  }
}

Without ObserveOnDispatcher:
messageBus.GetMessages<Message>().Subscribe(x => TestHanlder(x));
..........................Output.................................
Main Thread Id:8
Push ThreadId:9
UI Handler ThreadId:9
But I need to execute the TestHanlder function in the main thread or UI thread, in my use case above it must be the thread number 8.

When I use ObserveOnDispatcher:
messageBus.GetMessages<Message>().ObserveOnDispatcher().Subscribe(x => TestHanlder(x));
..........................Output.................................

Main Thread Id:9
Push ThreadId:10
 ------------------> UI Handler ThreadId: are missing not there!?
What I'm doing wrong here?!!!!!
like image 694
Bassam Alugili Avatar asked Jan 26 '26 08:01

Bassam Alugili


1 Answers

for (;;) { t.Wait(); } this code is executed in the UI thread and prevents it from executing anything else dispatched on it. ObserveOnDispatcher is working fine, but your dispatcher thread is blocked.

If you introduce async/await (that will release the thread), the scenario will work fine:

    private async void button_Click(object sender, RoutedEventArgs e)
    {
        var messageBus = new MessageBus();
        messageBus.GetMessages<Message>().ObserveOnDispatcher().Subscribe(x => TestHanlder(x));

        Trace.WriteLine("Main Thread Id:" + Thread.CurrentThread.ManagedThreadId);
        var deviceManager = new DeviceManager(messageBus);
        await deviceManager.Start();
    }

    ...

        public async Task<Unit> Start()
        {
            for (;;)
            {
                await Task.Factory.StartNew(() => BackGroundTask(), TaskCreationOptions.LongRunning);
            }
        }
like image 98
Gluck Avatar answered Jan 28 '26 22:01

Gluck



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!