Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

WebFlux chaining to call multiple services and response aggregation

I have recently started using WebFlux and need suggestion on how to chain multiple services and aggregate responses on the way. The 4 services and their Response POJO are similar to following example:

class Response1{
   String a1;
   String a2;
}

class Response2{
   String b1;
}

class Response3{
   String c1;
}

class Response4{
   String d1;
}

and the signature of 4 services:

Flux<Response1> service1(); 
Flux<Response2> service2(String a1); //output field of Response1(service 1)
Flux<Response3> service3(String b1); //output field of Response2(service 2)
Mono<Response4> service4(String a2); //output field of Response1(service 1)

So service2 needs to invoked for each Response1 in Flux, and service3 for each Response2. Relationship between models is:

Response1 <1-----*>Response2 (1 to many), 
Response2 <1-----*>Response3 (1 to many),
Response1 <1-----1>Response4 (1 to 1)

Aggregated final response should look like (JSON):

[
  {
    "a1": "",
    "a2": "",
    "d1": "",
    "response2s": [
      {
        "b1": "",
        "response3s": [
          {
            "c1": ""
          }
        ]
      }
    ]
  }
]

So first I need to call Service1 and then call service2 for each Response1, then call service3 for each Response2(returned by service2). Also, call service4 for each response1 returned by service1 (could be called in parallel to service2 and service3 calls). In order to update Aggregated final Response, I have added two additional POJOs to allow storing child responses, e.g. (relevant bits):

public class AggResponse extends Response1{
    List<AggResponse2> response2s;// populated from service2 response
    String d1; // populated from service4 response

    public void add(AggResponse2 res2){
        if(response2s == null)
            response2s = new ArrayList<>();
        response2s.add(res2);
    }
}

and

public class AggResponse2 extends Response2{
    List<Response3> response3s;// populated from service3 response

    public void add(Response3 res3) {
        if (response3s == null)
            response3s = new ArrayList<>();
        response3s.add(res3);
    }
}

How best to do chaining so that I retain previous response data and while combining operators retain all data in the AggResponse object? I tried following:

public Flux<AggResponse> aggregate() {
    return services.service1()
            .map(res1 -> new AggResponse(res1.getA1(), res1.getA2()))
            .flatMap(aggRes -> services.service2(aggRes.getA1())
                    .map(res2 -> {
                        AggResponse2 aggRes2 = new AggResponse2(res2.getB1());
                        aggRes.add(aggRes2);
                        return aggRes2;
                    })
                    .flatMap(aggRes2 -> services.service3(aggRes2.getB1())
                            .map(res3 -> {
                                aggRes2.add(res3);
                                return res3;
                            })
                            .reduce(aggRes2, (a, aggRes3) -> aggRes2)
                    )
                    .reduce(aggRes, (a, aggRes2) -> aggRes)
            )
            .flatMap(aggRes -> services.service4(aggRes.getA1())
                    .map(res4 -> {
                        aggRes.setD1(res4.getD1());
                        return aggRes;
                    })
            );
}

however, I get following incomplete response:

[ {
  "a1" : "a1v1",
  "a2" : "a2v1"
} ]

I see all services being called out as I see logs. Two questions: 1. why don't see aggregated response, could reduce be loosing it? 2. is there a better approach of achieving this?

like image 308
hmble Avatar asked Oct 20 '25 11:10

hmble


1 Answers

You could use merge method if you dont't want to wait service2's next signal for your service4. Something like this:

return service1().flatMap(response1 ->
        Flux.merge(service23Agg(response1.a1), service4Agg(response1.a2))
                .reduce((aggResponse, aggResponse2) -> new AggResponse(
                        response1.a1,
                        response1.a2,
                        Optional.ofNullable(aggResponse.d1)
                                .orElse(aggResponse2.d1),
                        Optional.ofNullable(aggResponse.response2s)
                                .orElse(aggResponse2.response2s))));

Utility classes and methods:

class AggContainer {
    final String b1;
    final List<Response3> response3s;

    AggContainer(String b1, List<Response3> response3s) {
        this.b1 = b1;
        this.response3s = response3s;
    }
}

class AggResponse {
    final String a1;
    final String a2;
    final String d1;
    final List<AggContainer> response2s;

    AggResponse(String a1, String a2, String d1, List<AggContainer> response2s) {
        this.a1 = a1;
        this.a2 = a2;
        this.d1 = d1;
        this.response2s = response2s;
    }

    AggResponse(String d1) {
        this.a1 = null;
        this.a2 = null;
        this.d1 = d1;
        this.response2s = null;
    }

    AggResponse(List<AggContainer> response2s) {
        this.a1 = null;
        this.a2 = null;
        this.d1 = null;
        this.response2s = response2s;
    }
}

private Mono<AggResponse> service23Agg(String a1) {
    return service2(a1).flatMap(response2 -> service3(response2.b1).collectList()
            .map(response3s -> new AggContainer(response2.b1, response3s)))
            .collectList()
            .map(AggResponse::new);
}

private Mono<AggResponse> service4Agg(String a2) {
    return service4(a2).map(response4 -> new AggResponse(response4.d1));
}

And you should be very careful with mutable collections in the asynchronous environment. Avoid to change it inside reactive pipeline.

like image 95
Alexander Pankin Avatar answered Oct 23 '25 01:10

Alexander Pankin



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!