I am not certain how to set up the android side to deserialize this.
This is my definition on the server:
@GetMapping("/quotes-reactive-paged")
    fun getQuoteFlux(@RequestParam(name = "page") page: Int,
                     @RequestParam(name = "size") size: Int): Flux<Quote> {
        return quoteMongoReactiveRepository.retrieveAllQuotesPaged(PageRequest.of(page, size))
                .delayElements(Duration.ofMillis(DELAY_PER_ITEM_MS.toLong()))
    }
On the android side I have this to get the response:
@GET("quotes-reactive-paged")
Observable<Quote> queryReactivePaging(@Query("page") int page,
                                              @Query("size") int size);
And here is what I use to process:
mReactiveQuoteService.queryReactivePaging(page, size)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .map(cityResponse -> {
                return cityResponse;
            })
            .subscribeWith(new DisposableObserver<Quote>() {
                @Override
                public void onNext(Quote quote) {
                    if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
                            System.out.println(quote.content);
                            reactiveList.setText(reactiveList.getText() + "\n" + quote.content);
                    } else {
                            System.out.println(quote.content);
                            reactiveList.setText(reactiveList.getText() + "\n" + quote.content);
                    }
                }
                @Override
                public void onError(Throwable e) {
                    System.out.println(e);
                }
                @Override
                public void onComplete() {
                    System.out.println("done");
                }
            })
This is an example of a response, printed out by OkHttp3:
data:{"id":"1025","book":"El Quijote","content":"-En efecto -dijo Sancho-, ¿qué es lo que vuestra merced quiere hacer en este tan remoto lugar?"}
And here is what I get in logcat, but I did cut out some of the data elements.
05-04 21:41:58.056 13277-13338/reactive.android.cigna.com.reactiveexample D/OkHttp: <-- 200 OK http://192.168.1.104:9094/quotes-reactive-paged?page=3&size=20&username=demo (149ms)
    transfer-encoding: chunked
    Access-Control-Allow-Origin: *
    Access-Control-Allow-Methods: GET, PUT, POST, DELETE, OPTIONS
    Access-Control-Allow-Headers: DNT,X-CustomHeader,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Content-Range,Range
    Access-Control-Expose-Headers: DNT,X-CustomHeader,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Content-Range,Range
    Content-Type: text/event-stream
05-04 21:42:00.040 13277-13338/reactive.android.cigna.com.reactiveexample D/OkHttp: data:{"id":"1052","book":"El Quijote","content":"-¡Ta, ta! -dijo Sancho-. ¿Que la hija de Lorenzo Corchuelo es la señora Dulcinea del Toboso, llamada por otro nombre Aldonza Lorenzo?"}
    data:{"id":"1053","book":"El Quijote","content":"-Ésa es -dijo don Quijote-, y es la que merece ser señora de todo el universo."}
    data:{"id":"1068","book":"El Quijote","content":"Y, habiéndola escrito,se la leyó; que decía ansí:"}
    <-- END HTTP (10363-byte body)
05-04 21:42:00.047 13277-13277/reactive.android.cigna.com.reactiveexample I/System.out: com.google.gson.JsonSyntaxException: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was STRING at line 1 column 1 path $
05-04 21:42:09.672 13277-13282/reactive.android.cigna.com.reactiveexample I/zygote: Do partial code cache collection, code=48KB, data=47KB
This is the model on the android side:
public class Quote {
    public String id;
    public String content;
    public String book;
}
On the server side this is the model:
data class Quote(val id: String, val book: String, val content: String)
It appears that it isn't turning the json into an object, but I don't see why.
UPDATE
So I added @Streaming with the @GET and tried this, but there is no way for Okhttp3.Responsebody to work, as it won't compile.
           .subscribe(new DisposableObserver<ResponseBody>() {
                @Override
                public void onNext(ResponseBody responseBody) {
I have not worked with flux but you are sending a malformed JSON, it expects a Quote object so it should start with {.
You are sending a list so you should change your interface to something like
@GET("quotes-reactive-paged")
Observable<List<Quote>> queryReactivePaging(@Query("page") int page, @Query("size") int size);
Now I can't remember if Rx will work with List<>.
Anyway it will not work because lists start with [, so change you serve to correctly format the response and adapt your Rx interface to whatever you expect to receive.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With