My task is to simply make controller that gives me results immediately when they are ready (simple example below)
I want to get the exact number of Strings (for example 1000 Strings that are somehow made for 1 second) (actually I need to get result of func but to simplify the task just Strings)
So when I get some request in my controller I want it to give answers as soon as they are ready (without buffering results) in that way:
1 second
"some string" -> (send response to my frontend)
1 second
"another one" -> (send response to my frontend)
1 second
"third one" -> (send response to my frontend) ....
1000 seconds
"some string"
.....
"thousand strings"
Here is my code:
@GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> get3() {
        System.out.println("get3 start");
        Flux<String> result = Flux.fromStream(IntStream.range(1, 10).mapToObj(i -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "flux data--" + i;
        }));
        System.out.println("get3 end");
        return result;
    }
Actually in my console I get
"get3 start" and "get3 end" immediately but response only goes after all strings are ready
My actual service for this task is similar (but I merge 2 Flux here) and I get Flux which is formed by interval so I want it to give me results as soon as they appear
public Flux<AnswerCalculationDto> calculate(CalculationDto calculationDto){
        String checkMsg = checkCalculationDto(calculationDto);
        if(checkMsg.equals("Success")){//valid
            Long quantity = Long.parseLong(calculationDto.getQuantity());
            Flux<AnswerCalculationDto> firstFunc =  Flux.interval(interval)//func 1
                    .onBackpressureDrop()
                    .takeWhile((i)-> i < quantity)
                    .map((i)->new AnswerCalculationDto(i,1,translateToJava(calculationDto.getFunc1(),i)))
                    ;
            Flux<AnswerCalculationDto> secondFunc = Flux.interval(interval) //func 2
                    .onBackpressureDrop()
                    .takeUntil((i)-> i > quantity-2)
                    .map((i)->new AnswerCalculationDto(i,2,translateToJava(calculationDto.getFunc2(),i)) )
                    ;
            return Flux.merge(firstFunc,secondFunc);
        }
        else {//invalid data from client
            return Flux.just(new AnswerCalculationDto("",checkMsg));
        }
    }
There are several options to stream data from the server using WebFlux:
text/event-stream)application/x-ndjson)Here is a complete example that exposes both text/event-stream & application/x-ndjson endpoints and returns data in json format.
If you need plain text content - use text/event-stream.
@RestController
public class StreamingController {
    @GetMapping(produces = TEXT_EVENT_STREAM_VALUE)
    Flux<DataEntry> sse() {
        return stream();
    }
    @GetMapping(produces = APPLICATION_NDJSON_VALUE)
    Flux<DataEntry> ndjson() {
        return stream();
    }
    private Flux<DataEntry> stream() {
        return Flux.range(1, 1000)
                .delayElements(Duration.ofSeconds(1))
                .map(i -> new DataEntry(i, Instant.now()));
    }
    @Value
    @Builder
    private static class DataEntry {
        long index;
        Instant timestamp;
    }
}
To test text/event-stream use:
curl -v -H "Accept: text/event-stream" http://localhost:8080
> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: text/event-stream
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
< 
data:{"index":1,"timestamp":"2022-04-08T14:41:06.513352Z"}
data:{"index":2,"timestamp":"2022-04-08T14:41:07.527817Z"}
data:{"index":3,"timestamp":"2022-04-08T14:41:08.541706Z"}
data:{"index":4,"timestamp":"2022-04-08T14:41:09.553329Z"}
To test application/x-ndjson use:
curl -v -H "Accept: application/x-ndjson" http://localhost:8080
> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: application/x-ndjson
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/x-ndjson
< 
{"index":1,"timestamp":"2022-04-08T14:42:36.081269Z"}
{"index":2,"timestamp":"2022-04-08T14:42:37.094928Z"}
{"index":3,"timestamp":"2022-04-08T14:42:38.109378Z"}
{"index":4,"timestamp":"2022-04-08T14:42:39.121315Z"}
The above example will produce 1000 records with 1 seconds interval. You could also generate unbounded streams using something like
private Flux<DataEntry> stream() {
    return Flux.interval(Duration.ofSeconds(1))
            .map(i -> new DataEntry(i, Instant.now()));
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With