Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cache not working properly using Reactor and Caffeine

I need to check some endpoints in different intervals, so I set up the Caffeine's cache builder .

this.localeWeatherCache = newBuilder().build();

this.currentWeatherCache=newBuilder().expireAfterWrite(Duration.ofHours(3)).build();

this.weatherForecastsCache = newBuilder().expireAfterWrite(Duration.ofHours(12)).build();

In my Service I call those 3 endpoints, in the end I return my object with all details usin Mono.zip().

During my tests, I noticed that climaTempoRepository.findLocaleByCityNameAndState is executed twice and after the currentWeather cache expires it makes another call to locale endpoint, the same happens with weatherForecast, it calls again the locale.

Why it fails? Shouldn't it use cache? Or the way I did is wrong?

Any help or pointers are greatly appreciated! :)

public Mono<Weather> weatherForecastByLocation(Location location) {

    Mono<ClimaTempoLocale> locale =
            CacheMono.lookup(key ->
                    Mono.justOrEmpty(localeWeatherCache.getIfPresent(key))
                            .map(Signal::next), location)
                    .onCacheMissResume(() -> climaTempoRepository.findLocaleByCityNameAndState(location.city(), location.state()))
                    .andWriteWith((key, signal) -> Mono.fromRunnable(() ->
                            Optional.ofNullable(signal.get())
                                    .ifPresent(value -> localeWeatherCache.put(key, value))));

    Mono<CurrentWeather> currentWeather =
            CacheMono.lookup(key ->
                    Mono.justOrEmpty(currentWeatherCache.getIfPresent(key))
                            .map(Signal::next), location)
                    .onCacheMissResume(() -> locale.flatMap(climaTempoRepository::findCurrentWeatherByLocale)
                            .subscribeOn(Schedulers.elastic()))
                    .andWriteWith((key, signal) -> Mono.fromRunnable(() ->
                            Optional.ofNullable(signal.get())
                                    .ifPresent(value -> currentWeatherCache.put(key, value))));

    Mono<WeatherForecasts> weatherForecasts =
            CacheMono.lookup(key ->
                    Mono.justOrEmpty(weatherForecastsCache.getIfPresent(key))
                            .map(Signal::next), location)
                    .onCacheMissResume(() -> locale.flatMap(climaTempoRepository::findDailyForecastByLocale)
                            .subscribeOn(Schedulers.elastic()))
                    .andWriteWith((key, signal) -> Mono.fromRunnable(() ->
                            Optional.ofNullable(signal.get())
                                    .ifPresent(value -> weatherForecastsCache.put(key, value))));

    return Mono.zip(currentWeather,
            weatherForecasts,
            (current, forecasts) ->
                    Weather.buildWith(builder -> {
                        builder.location = location;
                        builder.currentWeather = current;
                        builder.weatherForecasts = forecasts;
                    }));

}
like image 893
emerson Avatar asked Sep 06 '25 03:09

emerson


2 Answers

A AsyncLoadingCache can compute the value from the key and returns a CompletableFuture of the result. This can be translated into a Mono it's fromFuture method. This will ensure only a single execution is in-flight for a given key, while not blocking due to storing the futures within the cache.

AsyncLoadingCache<Location, ClimaTempoLocale> localeWeatherCache = 
    Caffeine.newBuilder().buildAsync(location -> 
        climaTempoRepository.findLocaleByCityNameAndState(location.city(), location.state()));

AsyncLoadingCache<ClimaTempoLocale, CurrentWeather> currentWeatherCache =
    Caffeine.newBuilder().buildAsync(climaTempoRepository::findCurrentWeatherByLocale);

AsyncLoadingCache<ClimaTempoLocale, WeatherForecasts> weatherForecastsCache =
    Caffeine.newBuilder().buildAsync(climaTempoRepository::findDailyForecastByLocale);

public Mono<Weather> weatherForecastByLocation(Location location) {
  var locale = Mono.fromFuture(localeWeatherCache.get(location));
  var currentWeather = Mono.fromFuture(locale.map(localeWeatherCache::get));
  var weatherForecasts = Mono.fromFuture(locale.map(weatherForecastsCache::get));

  return Mono.zip(currentWeather, weatherForecasts, (current, forecasts) ->
      Weather.buildWith(builder -> {
          builder.location = location;
          builder.currentWeather = current;
          builder.weatherForecasts = forecasts;
      }));
}
like image 114
Ben Manes Avatar answered Sep 07 '25 21:09

Ben Manes


As exemplified here https://stackoverflow.com/a/52803247/11209784 the ClimaTempoLocale could be computed as follows:

Cache<Location, ClimaTempoLocale> weatherLocaleCache = Caffeine.newBuilder().build();

private Mono<ClimaTempoLocale> findLocale(Location location) {
    Mono<ClimaTempoLocale> locale;
    ClimaTempoLocale cachedLocale = weatherLocaleCache.getIfPresent(location);
    if (cachedLocale != null) {
        locale = Mono.just(cachedLocale);
    } else {
        locale = climaTempoRepository.findLocaleByCityNameAndState(location.city(), location.state())
                .doOnNext(climaTempoLocale -> weatherLocaleCache.put(location, climaTempoLocale));
    }

    return locale;
}

One side effect is that there can be consecutive writes to the same key when concurrent calls result in cache miss.

By doing this, the calls that depend on the ClimaTempoLocale could continue the same way:

Cache<Location, CurrentWeather> currentWeatherCache = Caffeine.newBuilder().expireAfterWrite(Duration.ofHours(3)).build();

Cache<Location, WeatherForecasts> weatherForecastsCache = Caffeine.newBuilder().expireAfterWrite(Duration.ofHours(12)).build();

public Mono<Weather> weatherForecastByLocation(Location location) {
    Mono<ClimaTempoLocale> locale = findLocale(location);

    Mono<CurrentWeather> currentWeather =
            CacheMono.lookup(
                    key -> Mono.justOrEmpty(currentWeatherCache.getIfPresent(key))
                            .map(Signal::next),
                    location)
                    .onCacheMissResume(
                            () -> locale.flatMap(climaTempoRepository::findCurrentWeatherByLocale)
                                    .subscribeOn(Schedulers.elastic()))
                    .andWriteWith(
                            (key, signal) -> Mono.fromRunnable(
                                    () -> Optional.ofNullable(signal.get())
                                            .ifPresent(value -> currentWeatherCache.put(key, value))));

    Mono<WeatherForecasts> weatherForecasts =
            CacheMono.lookup(
                    key -> Mono.justOrEmpty(weatherForecastsCache.getIfPresent(key))
                            .map(Signal::next),
                    location)
                    .onCacheMissResume(
                            () -> locale.flatMap(climaTempoRepository::findDailyForecastByLocale)
                                    .subscribeOn(Schedulers.elastic()))
                    .andWriteWith(
                            (key, signal) -> Mono.fromRunnable(
                                    () -> Optional.ofNullable(signal.get())
                                            .ifPresent(value -> weatherForecastsCache.put(key, value))));

    return Mono.zip(currentWeather,
            weatherForecasts,
            (current, forecasts) ->
                    Weather.buildWith(builder -> {
                        builder.location = location;
                        builder.currentWeather = current;
                        builder.weatherForecasts = forecasts;
                    }));
}
like image 30
Geovane Shimizu Avatar answered Sep 07 '25 21:09

Geovane Shimizu