My task is to simply make controller that gives me results immediately when they are ready (simple example below)
I want to get the exact number of Strings (for example 1000 Strings that are somehow made for 1 second) (actually I need to get result of func but to simplify the task just Strings)
So when I get some request in my controller I want it to give answers as soon as they are ready (without buffering results) in that way:
1 second
"some string" -> (send response to my frontend)
1 second
"another one" -> (send response to my frontend)
1 second
"third one" -> (send response to my frontend) ....
1000 seconds
"some string"
.....
"thousand strings"
Here is my code:
@GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> get3() {
System.out.println("get3 start");
Flux<String> result = Flux.fromStream(IntStream.range(1, 10).mapToObj(i -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "flux data--" + i;
}));
System.out.println("get3 end");
return result;
}
Actually in my console I get
"get3 start" and "get3 end" immediately but response only goes after all strings are ready
My actual service for this task is similar (but I merge 2 Flux here) and I get Flux which is formed by interval so I want it to give me results as soon as they appear
public Flux<AnswerCalculationDto> calculate(CalculationDto calculationDto){
String checkMsg = checkCalculationDto(calculationDto);
if(checkMsg.equals("Success")){//valid
Long quantity = Long.parseLong(calculationDto.getQuantity());
Flux<AnswerCalculationDto> firstFunc = Flux.interval(interval)//func 1
.onBackpressureDrop()
.takeWhile((i)-> i < quantity)
.map((i)->new AnswerCalculationDto(i,1,translateToJava(calculationDto.getFunc1(),i)))
;
Flux<AnswerCalculationDto> secondFunc = Flux.interval(interval) //func 2
.onBackpressureDrop()
.takeUntil((i)-> i > quantity-2)
.map((i)->new AnswerCalculationDto(i,2,translateToJava(calculationDto.getFunc2(),i)) )
;
return Flux.merge(firstFunc,secondFunc);
}
else {//invalid data from client
return Flux.just(new AnswerCalculationDto("",checkMsg));
}
}
There are several options to stream data from the server using WebFlux:
text/event-stream)application/x-ndjson)Here is a complete example that exposes both text/event-stream & application/x-ndjson endpoints and returns data in json format.
If you need plain text content - use text/event-stream.
@RestController
public class StreamingController {
@GetMapping(produces = TEXT_EVENT_STREAM_VALUE)
Flux<DataEntry> sse() {
return stream();
}
@GetMapping(produces = APPLICATION_NDJSON_VALUE)
Flux<DataEntry> ndjson() {
return stream();
}
private Flux<DataEntry> stream() {
return Flux.range(1, 1000)
.delayElements(Duration.ofSeconds(1))
.map(i -> new DataEntry(i, Instant.now()));
}
@Value
@Builder
private static class DataEntry {
long index;
Instant timestamp;
}
}
To test text/event-stream use:
curl -v -H "Accept: text/event-stream" http://localhost:8080
> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: text/event-stream
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:{"index":1,"timestamp":"2022-04-08T14:41:06.513352Z"}
data:{"index":2,"timestamp":"2022-04-08T14:41:07.527817Z"}
data:{"index":3,"timestamp":"2022-04-08T14:41:08.541706Z"}
data:{"index":4,"timestamp":"2022-04-08T14:41:09.553329Z"}
To test application/x-ndjson use:
curl -v -H "Accept: application/x-ndjson" http://localhost:8080
> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: application/x-ndjson
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/x-ndjson
<
{"index":1,"timestamp":"2022-04-08T14:42:36.081269Z"}
{"index":2,"timestamp":"2022-04-08T14:42:37.094928Z"}
{"index":3,"timestamp":"2022-04-08T14:42:38.109378Z"}
{"index":4,"timestamp":"2022-04-08T14:42:39.121315Z"}
The above example will produce 1000 records with 1 seconds interval. You could also generate unbounded streams using something like
private Flux<DataEntry> stream() {
return Flux.interval(Duration.ofSeconds(1))
.map(i -> new DataEntry(i, Instant.now()));
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With