I noticed that Schedulers.enableMetrics()
got deprecated but I don't know what I should I do to get all my schedulers metered in a typical use case (using Spring Boot application).
Javadoc suggests using timedScheduler but how should it be achieved for Spring Boot?
First off, here are my thoughts on why the Schedulers.enableMetrics()
approach was deprecated:
The previous approach was flawed in several ways:
MeterRegistry#globalRegistry()
without any way of using a different registry.ExecutorService
instances assumed to back the schedulers.ExecutorService
couldn't be instrumented.ExecutorService
(eg. a pool of workers) would produce multiple levels of metrics difficult to aggregate.A deliberate constraint of the new approach is that each Scheduler
must be explicitly wrapped, which ensures that the correct MeterRegistry
is used and that metrics are recognizable and aggregated for that particular Scheduler
(thanks to the mandatory metricsPrefix
).
I'm not a Spring Boot expert, but if you really want to instrument all the schedulers including the global ones here is a naive approach that will aggregate data from all the schedulers of same category, demonstrated in a Spring Boot app:
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Configuration
static class SchedulersConfiguration {
@Bean
@Order(1)
public Scheduler originalScheduler() {
// For comparison, we can capture a new original Scheduler (which won't be disposed by setFactory, unlike the global ones)
return Schedulers.newBoundedElastic(4, 100, "compare");
}
@Bean
public SimpleMeterRegistry registry() {
return new SimpleMeterRegistry();
}
@Bean
public Schedulers.Factory instrumentedSchedulers(SimpleMeterRegistry registry) {
// Let's create a Factory that does the same as the default Schedulers factory in Reactor-Core, but with instrumentation
return new Schedulers.Factory() {
@Override
public Scheduler newBoundedElastic(int threadCap, int queuedTaskCap, ThreadFactory threadFactory, int ttlSeconds) {
// The default implementation maps to the vanilla Schedulers so we can delegate to that
Scheduler original = Schedulers.Factory.super.newBoundedElastic(threadCap, queuedTaskCap, threadFactory, ttlSeconds);
// IMPORTANT NOTE: in this example _all_ the schedulers of the same type will share the same prefix/name
// this would especially be problematic if gauges were involved as they replace old gauges of the same name.
// Fortunately, for now, TimedScheduler only uses counters, timers and longTaskTimers.
String prefix = "my.instrumented.boundedElastic"; // TimedScheduler will add `.scheduler.xxx` to that prefix
return Micrometer.timedScheduler(original, registry, prefix);
}
@Override
public Scheduler newParallel(int parallelism, ThreadFactory threadFactory) {
Scheduler original = Schedulers.Factory.super.newParallel(parallelism, threadFactory);
String prefix = "my.instrumented.parallel"; // TimedScheduler will add `.scheduler.xxx` to that prefix
return Micrometer.timedScheduler(original, registry, prefix);
}
@Override
public Scheduler newSingle(ThreadFactory threadFactory) {
Scheduler original = Schedulers.Factory.super.newSingle(threadFactory);
String prefix = "my.instrumented.single"; // TimedScheduler will add `.scheduler.xxx` to that prefix
return Micrometer.timedScheduler(original, registry, prefix);
}
};
}
@PreDestroy
void resetFactories() {
System.err.println("Resetting Schedulers Factory to default");
// Later on if we want to disable instrumentation we can reset the Factory to defaults (closing all instrumented schedulers)
Schedulers.resetFactory();
}
}
@Service
public static class Demo implements ApplicationRunner {
final Scheduler forComparison;
final SimpleMeterRegistry registry;
final Schedulers.Factory factory;
Demo(Scheduler forComparison, SimpleMeterRegistry registry, Schedulers.Factory factory) {
this.forComparison = forComparison;
this.registry = registry;
this.factory = factory;
Schedulers.setFactory(factory);
}
public void generateMetrics() {
Schedulers.boundedElastic().schedule(() -> {});
Schedulers.newBoundedElastic(4, 100, "bounded1").schedule(() -> {});
Schedulers.newBoundedElastic(4, 100, "bounded2").schedule(() -> {});
Micrometer.timedScheduler(
forComparison,
registry,
"my.custom.instrumented.bounded"
).schedule(() -> {});
Schedulers.newBoundedElastic(4, 100, "bounded3").schedule(() -> {});
}
public String getCompletedSummary() {
return Search.in(registry)
.name(n -> n.endsWith(".scheduler.tasks.completed"))
.timers()
.stream()
.map(c -> c.getId().getName() + "=" + c.count())
.collect(Collectors.joining("\n"));
}
@Override
public void run(ApplicationArguments args) throws Exception {
generateMetrics();
System.err.println(getCompletedSummary());
}
}
}
Which prints:
my.instrumented.boundedElastic.scheduler.tasks.completed=4
my.custom.instrumented.bounded.scheduler.tasks.completed=1
Notice how the metrics for the four instrumentedFactory
-produced Scheduler
are aggregated together.
There's a bit of a hacky workaround for this: by default Schedulers uses ReactorThreadFactory
, an internal private class which happens to be a Supplier<String>
, supplying the "simplified name" (ie toString
but without the configuration options) of the Scheduler
.
One could use the following method to tentatively extract that name:
static String inferSimpleSchedulerName(ThreadFactory threadFactory, String defaultName) {
if (!(threadFactory instanceof Supplier)) {
return defaultName;
}
Object supplied = ((Supplier<?>) threadFactory).get();
if (!(supplied instanceof String)) {
return defaultName;
}
return (String) supplied;
}
Which can be applied to eg. the newParallel
method in the factory:
String simplifiedName = inferSimpleSchedulerName(threadFactory, "para???");
String prefix = "my.instrumented." + simplifiedName; // TimedScheduler will add `.scheduler.xxx` to that prefix
This can then be demonstrated by submitting a few tasks to different parallel schedulers in the Demo#generateMetrics()
part:
Schedulers.parallel().schedule(() -> {});
Schedulers.newParallel("paraOne").schedule(() -> {});
Schedulers.newParallel("paraTwo").schedule(() -> {});
And now it prints (blank lines for emphasis):
my.instrumented.paraOne.scheduler.tasks.completed=1
my.instrumented.paraTwo.scheduler.tasks.completed=1
my.instrumented.parallel.scheduler.tasks.completed=1
my.custom.instrumented.bounded.scheduler.tasks.completed=1
my.instrumented.boundedElastic.scheduler.tasks.completed=4
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With