Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to stream data from the server using WebFlux

My task is to simply make controller that gives me results immediately when they are ready (simple example below)

Imagine:

I want to get the exact number of Strings (for example 1000 Strings that are somehow made for 1 second) (actually I need to get result of func but to simplify the task just Strings)

So when I get some request in my controller I want it to give answers as soon as they are ready (without buffering results) in that way:

What I want is:

1 second

"some string" -> (send response to my frontend)

1 second

"another one" -> (send response to my frontend)

1 second

"third one" -> (send response to my frontend) ....

But what I get is:

1000 seconds

"some string"

.....

"thousand strings"

Here is my code:

@GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> get3() {
        System.out.println("get3 start");
        Flux<String> result = Flux.fromStream(IntStream.range(1, 10).mapToObj(i -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "flux data--" + i;
        }));
        System.out.println("get3 end");
        return result;

    }

Actually in my console I get

"get3 start" and "get3 end" immediately but response only goes after all strings are ready

My actual service for this task is similar (but I merge 2 Flux here) and I get Flux which is formed by interval so I want it to give me results as soon as they appear

public Flux<AnswerCalculationDto> calculate(CalculationDto calculationDto){
        String checkMsg = checkCalculationDto(calculationDto);
        if(checkMsg.equals("Success")){//valid
            Long quantity = Long.parseLong(calculationDto.getQuantity());

            Flux<AnswerCalculationDto> firstFunc =  Flux.interval(interval)//func 1
                    .onBackpressureDrop()
                    .takeWhile((i)-> i < quantity)
                    .map((i)->new AnswerCalculationDto(i,1,translateToJava(calculationDto.getFunc1(),i)))
                    ;
            Flux<AnswerCalculationDto> secondFunc = Flux.interval(interval) //func 2
                    .onBackpressureDrop()
                    .takeUntil((i)-> i > quantity-2)
                    .map((i)->new AnswerCalculationDto(i,2,translateToJava(calculationDto.getFunc2(),i)) )
                    ;
            return Flux.merge(firstFunc,secondFunc);
        }
        else {//invalid data from client
            return Flux.just(new AnswerCalculationDto("",checkMsg));
        }

    }
like image 811
lol Avatar asked Oct 26 '25 16:10

lol


1 Answers

There are several options to stream data from the server using WebFlux:

  • Server-sent events pushing individual events (media type: text/event-stream)
  • Streaming events separated by newlines (media type: application/x-ndjson)

Here is a complete example that exposes both text/event-stream & application/x-ndjson endpoints and returns data in json format. If you need plain text content - use text/event-stream.

@RestController
public class StreamingController {

    @GetMapping(produces = TEXT_EVENT_STREAM_VALUE)
    Flux<DataEntry> sse() {
        return stream();
    }

    @GetMapping(produces = APPLICATION_NDJSON_VALUE)
    Flux<DataEntry> ndjson() {
        return stream();
    }

    private Flux<DataEntry> stream() {
        return Flux.range(1, 1000)
                .delayElements(Duration.ofSeconds(1))
                .map(i -> new DataEntry(i, Instant.now()));
    }

    @Value
    @Builder
    private static class DataEntry {
        long index;
        Instant timestamp;
    }
}

To test text/event-stream use:

curl -v -H "Accept: text/event-stream" http://localhost:8080

> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: text/event-stream
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
< 
data:{"index":1,"timestamp":"2022-04-08T14:41:06.513352Z"}

data:{"index":2,"timestamp":"2022-04-08T14:41:07.527817Z"}

data:{"index":3,"timestamp":"2022-04-08T14:41:08.541706Z"}

data:{"index":4,"timestamp":"2022-04-08T14:41:09.553329Z"}

To test application/x-ndjson use:

curl -v -H "Accept: application/x-ndjson" http://localhost:8080

> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: application/x-ndjson
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/x-ndjson
< 
{"index":1,"timestamp":"2022-04-08T14:42:36.081269Z"}
{"index":2,"timestamp":"2022-04-08T14:42:37.094928Z"}
{"index":3,"timestamp":"2022-04-08T14:42:38.109378Z"}
{"index":4,"timestamp":"2022-04-08T14:42:39.121315Z"}

The above example will produce 1000 records with 1 seconds interval. You could also generate unbounded streams using something like

private Flux<DataEntry> stream() {
    return Flux.interval(Duration.ofSeconds(1))
            .map(i -> new DataEntry(i, Instant.now()));
}
like image 180
Alex Avatar answered Oct 28 '25 08:10

Alex



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!