I need to check some endpoints in different intervals, so I set up the Caffeine's cache builder .
this.localeWeatherCache = newBuilder().build();
this.currentWeatherCache=newBuilder().expireAfterWrite(Duration.ofHours(3)).build();
this.weatherForecastsCache = newBuilder().expireAfterWrite(Duration.ofHours(12)).build();
In my Service I call those 3 endpoints, in the end I return my object with all details usin Mono.zip().
During my tests, I noticed that climaTempoRepository.findLocaleByCityNameAndState is executed twice and after the currentWeather cache expires it makes another call to locale endpoint, the same happens with weatherForecast, it calls again the locale.
Why it fails? Shouldn't it use cache? Or the way I did is wrong?
Any help or pointers are greatly appreciated! :)
public Mono<Weather> weatherForecastByLocation(Location location) {
Mono<ClimaTempoLocale> locale =
CacheMono.lookup(key ->
Mono.justOrEmpty(localeWeatherCache.getIfPresent(key))
.map(Signal::next), location)
.onCacheMissResume(() -> climaTempoRepository.findLocaleByCityNameAndState(location.city(), location.state()))
.andWriteWith((key, signal) -> Mono.fromRunnable(() ->
Optional.ofNullable(signal.get())
.ifPresent(value -> localeWeatherCache.put(key, value))));
Mono<CurrentWeather> currentWeather =
CacheMono.lookup(key ->
Mono.justOrEmpty(currentWeatherCache.getIfPresent(key))
.map(Signal::next), location)
.onCacheMissResume(() -> locale.flatMap(climaTempoRepository::findCurrentWeatherByLocale)
.subscribeOn(Schedulers.elastic()))
.andWriteWith((key, signal) -> Mono.fromRunnable(() ->
Optional.ofNullable(signal.get())
.ifPresent(value -> currentWeatherCache.put(key, value))));
Mono<WeatherForecasts> weatherForecasts =
CacheMono.lookup(key ->
Mono.justOrEmpty(weatherForecastsCache.getIfPresent(key))
.map(Signal::next), location)
.onCacheMissResume(() -> locale.flatMap(climaTempoRepository::findDailyForecastByLocale)
.subscribeOn(Schedulers.elastic()))
.andWriteWith((key, signal) -> Mono.fromRunnable(() ->
Optional.ofNullable(signal.get())
.ifPresent(value -> weatherForecastsCache.put(key, value))));
return Mono.zip(currentWeather,
weatherForecasts,
(current, forecasts) ->
Weather.buildWith(builder -> {
builder.location = location;
builder.currentWeather = current;
builder.weatherForecasts = forecasts;
}));
}
A AsyncLoadingCache
can compute the value from the key and returns a CompletableFuture
of the result. This can be translated into a Mono
it's fromFuture
method. This will ensure only a single execution is in-flight for a given key, while not blocking due to storing the futures within the cache.
AsyncLoadingCache<Location, ClimaTempoLocale> localeWeatherCache =
Caffeine.newBuilder().buildAsync(location ->
climaTempoRepository.findLocaleByCityNameAndState(location.city(), location.state()));
AsyncLoadingCache<ClimaTempoLocale, CurrentWeather> currentWeatherCache =
Caffeine.newBuilder().buildAsync(climaTempoRepository::findCurrentWeatherByLocale);
AsyncLoadingCache<ClimaTempoLocale, WeatherForecasts> weatherForecastsCache =
Caffeine.newBuilder().buildAsync(climaTempoRepository::findDailyForecastByLocale);
public Mono<Weather> weatherForecastByLocation(Location location) {
var locale = Mono.fromFuture(localeWeatherCache.get(location));
var currentWeather = Mono.fromFuture(locale.map(localeWeatherCache::get));
var weatherForecasts = Mono.fromFuture(locale.map(weatherForecastsCache::get));
return Mono.zip(currentWeather, weatherForecasts, (current, forecasts) ->
Weather.buildWith(builder -> {
builder.location = location;
builder.currentWeather = current;
builder.weatherForecasts = forecasts;
}));
}
As exemplified here https://stackoverflow.com/a/52803247/11209784 the ClimaTempoLocale
could be computed as follows:
Cache<Location, ClimaTempoLocale> weatherLocaleCache = Caffeine.newBuilder().build();
private Mono<ClimaTempoLocale> findLocale(Location location) {
Mono<ClimaTempoLocale> locale;
ClimaTempoLocale cachedLocale = weatherLocaleCache.getIfPresent(location);
if (cachedLocale != null) {
locale = Mono.just(cachedLocale);
} else {
locale = climaTempoRepository.findLocaleByCityNameAndState(location.city(), location.state())
.doOnNext(climaTempoLocale -> weatherLocaleCache.put(location, climaTempoLocale));
}
return locale;
}
One side effect is that there can be consecutive writes to the same key when concurrent calls result in cache miss.
By doing this, the calls that depend on the ClimaTempoLocale
could continue the same way:
Cache<Location, CurrentWeather> currentWeatherCache = Caffeine.newBuilder().expireAfterWrite(Duration.ofHours(3)).build();
Cache<Location, WeatherForecasts> weatherForecastsCache = Caffeine.newBuilder().expireAfterWrite(Duration.ofHours(12)).build();
public Mono<Weather> weatherForecastByLocation(Location location) {
Mono<ClimaTempoLocale> locale = findLocale(location);
Mono<CurrentWeather> currentWeather =
CacheMono.lookup(
key -> Mono.justOrEmpty(currentWeatherCache.getIfPresent(key))
.map(Signal::next),
location)
.onCacheMissResume(
() -> locale.flatMap(climaTempoRepository::findCurrentWeatherByLocale)
.subscribeOn(Schedulers.elastic()))
.andWriteWith(
(key, signal) -> Mono.fromRunnable(
() -> Optional.ofNullable(signal.get())
.ifPresent(value -> currentWeatherCache.put(key, value))));
Mono<WeatherForecasts> weatherForecasts =
CacheMono.lookup(
key -> Mono.justOrEmpty(weatherForecastsCache.getIfPresent(key))
.map(Signal::next),
location)
.onCacheMissResume(
() -> locale.flatMap(climaTempoRepository::findDailyForecastByLocale)
.subscribeOn(Schedulers.elastic()))
.andWriteWith(
(key, signal) -> Mono.fromRunnable(
() -> Optional.ofNullable(signal.get())
.ifPresent(value -> weatherForecastsCache.put(key, value))));
return Mono.zip(currentWeather,
weatherForecasts,
(current, forecasts) ->
Weather.buildWith(builder -> {
builder.location = location;
builder.currentWeather = current;
builder.weatherForecasts = forecasts;
}));
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With