Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive Programming using RxScala

I have a Observable that connects to a service via a Socket protocol. The connection to the socket happens through a client library. The client library that I use has java.util.Observer with with I can register for events being pushed into it

final class MyObservable extends Observable[MyEvent] {

  def subscribe(subscriber: Subscriber[MyEvent]) = {
    // connect to the Socket (Step: 1)
    // get the responses that are pushed (Step: 2)
    // transform them into MyEvent type (Step: 3)
  }
}

I have two open questions that I do not understand.

How can I get the result of Step: 3 in my Subscriber?

Every time when I get a MyEvent, with a subscriber like below, I see that there is a new connection being created. Eventually Step 1, Step 2 and Step 3 are run for each incoming event.

val myObservable = new MyObservale()
myObservable.subscribe()
like image 365
joesan Avatar asked Jun 25 '26 13:06

joesan


1 Answers

Unless I'm misunderstanding your question, you just call onNext:

def subscribe(subscriber: Subscriber[MyEvent]) = {
  // connect to the Socket (Step: 1)
  // get the responses that are pushed (Step: 2)
  // transform them into MyEvent type (Step: 3)

  // finally notify the subscriber:
  subscriber.onNext(myEventFromStep3)
}

and code that subscribes would do something like:

myObservable.subscribe(onNext = println(_))
like image 172
Brandon Avatar answered Jun 28 '26 10:06

Brandon



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!