I am updating a Kotlin (v1.9.25) Spring Boot (v3.3.1) project from Java 17 to Java 21 in order to enable Virtual Threads.
In our service, almost all requests acquire one database connection and hold to it for the whole request, while some very specific ones require more than one. To avoid database connection starvation, we set the maximum number of database connections to be just a little above the maximum concurrent requests.
spring.threads.virtual.enabled: true
spring.datasource.hikari.maximum-pool-size: 50
server.tomcat.threads.max: 4 # used to be 45 before virtual threads
Up to now, we control maximum concurrent requests by means of server.tomcat.threads.max
, but with virtual threads it all changes: the idea, as far as I understand, is to have a executor receiving an unlimited amount of tasks, so no limits here.
That leaves me to my question: how can I limit the maximum number of concurrent connections on my service while using virtual threads?
I thought of implementing a semaphore but something seems to be off with this approach, I though that it would be configurable.
Thank you very much!
While recommended way to restrict number of threads accessing a limited resource is to Use Semaphores to Limit Concurrency of Virtual Threads, Stack Overflow thread In java How to migrate from Executors.newFixedThreadPool(MAX_THREAD_COUNT()) to Virtual Thread discusses alternative way to do the restriction - a fixed thread pool and argues that it is equivalent to Semaphore
in terms of functionality and efficiency.
In addition, the latter way allows more Spring-"configurable" solutions.
To restrict the amount of Tomcat worker thread, like the OP requested, we could set our own Tomcat Protocol Handler Executor
:
@Configuration
public class TomcatFixedThreadsCustomizer
implements WebServerFactoryCustomizer<ConfigurableTomcatWebServerFactory>, Ordered {
@Value("${server.tomcat.threads.max}")
private int maxThreadCount;
@Override
public void customize(ConfigurableTomcatWebServerFactory factory) {
factory.addProtocolHandlerCustomizers((protocolHandler) ->
protocolHandler.setExecutor(Executors.newFixedThreadPool(maxThreadCount, Thread.ofVirtual().factory())));
}
@Override
public int getOrder() {
return 2;// need to be executed after TomcatWebServerFactoryCustomizer;
}
}
Note that the line
Executors.newFixedThreadPool(maxThreadCount, Thread.ofVirtual().factory())
actually overrides spring.threads.virtual.enabled
setting for this custom executor - the Tomcat worker threads will be virtual anyway. By other hand, if you don't want to have Tomcat worker threads be virtual, you don't have to - use a default fixed pool for that:
Executors.newFixedThreadPool(maxThreadCount)
Remember, however, that in this case all Tomcat worker threads will be under the restriction of maxThreadCount
, even those who don't use critical resources like DB Connections in the OP case.
Other solution is based upon Semaphore
, recommended by Oracle documentation, and Spring AOP. An AOP Aspect restricts an access to critical resource:
@Aspect
public class RestrictedResourceAspect {
private final Semaphore semaphore;
public RestrictedResourceAspect(int permits) {
semaphore = new Semaphore(permits);
}
@Around("within(com.github.webapp.controller..*)")
public Object accessRestrictedResource(ProceedingJoinPoint jp) throws Throwable {
semaphore.acquire();
try {
return jp.proceed();
} finally {
semaphore.release();
}
}
}
The pointcut expression above
"within(com.github.webapp.controller..*)"
applies the Aspect to each public method in com.github.webapp.controller
package as an example. It is possible to apply it to methods, annotated with certain annotation, RequestMapping
, for example; the only requirement is that the method should belong to a Spring Bean and be public. For pointcut syntax please see details in Spring AOP documentation.
Finally, we need to configure the Aspect:
@Configuration
@EnableAsync
@EnableAspectJAutoProxy
public class AppConfig {
@Value("${max.thread.count}")
private int maxThreadCount;
@Bean
public RestrictedResourceAspect restrictedResourceAspect() {
return new RestrictedResourceAspect(maxThreadCount);
}
}
This second approach is more flexible than Tomcat Protocol Handler Executor-based one as it allows to make a distinction between the methods, which do access critical resources, and methods, which do not access them.
For that exact purpose maxThreadCount
value is injected from the custom max.thread.count
property to differentiate it from the total amount of Tomcat worker threads, server.tomcat.threads.max
. Evidently, the former value should be smaller than the latter, if the protected resources are accessed only via Tomcat-handled HTTP requests; it's unrestricted if threads of other origination (Scheduler, Batch, custom) also access the protected resources.
EDIT
In terms of backpressure/memory usage, addressed by @Bernhard's comments, both approaches are roughly equivalent. With the first approach, if thread pool does not have spare thread, the submitted Runnable
will be stored in its workQueue
. With the second approach, if permits are not available, the thread will be placed in so-called "wait queue", a.k.a wait set, implemented by AbstractQueuedSynchronizer
which Semaphore
uses internally.
The only difference is that while AQS's wait set is virtually unbounded, the ThreadPoolExecutor
's workQueue
can limit amounts of Runnable
s it stores. In this case, the customization will look like
import org.apache.tomcat.util.threads.TaskQueue;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
@Configuration
public class TomcatFixedThreadsCustomizer
implements WebServerFactoryCustomizer<ConfigurableTomcatWebServerFactory>, Ordered {
private final int minSpareThreads;
private final int maxThreads;
private final int queueCapacity;
@Autowired
public TomcatFixedThreadsCustomizer(ServerProperties serverProperties) {
var threadsConfig = serverProperties.getTomcat().getThreads();
this.minSpareThreads = threadsConfig.getMinSpare();
this.maxThreads = threadsConfig.getMax();
if (this.minSpareThreads > this.maxThreads) {
throw new IllegalArgumentException("minSpareThreads must be >= maxThreads");
}
var queueCapacity = threadsConfig.getMaxQueueCapacity();
if (queueCapacity <= 0 || queueCapacity == Integer.MAX_VALUE) {
// no back pressure configured, set a good default for virtual threads
this.queueCapacity = this.maxThreads * 2;
} else {
this.queueCapacity = queueCapacity;
}
}
@Override
public void customize(ConfigurableTomcatWebServerFactory factory) {
factory.addProtocolHandlerCustomizers(
(protocolHandler) -> {
final TaskQueue workQueue = new TaskQueue(this.queueCapacity);
final ThreadPoolExecutor executor =
new ThreadPoolExecutor(
this.minSpareThreads,
this.maxThreads,
0,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(this.queueCapacity),
Thread.ofVirtual().factory());
workQueue.setParent(executor);
protocolHandler.setExecutor(executor);
});
}
@Override
public int getOrder() {
return 2; // need to be executed after TomcatWebServerFactoryCustomizer;
}
}
When the workQueue
is exhausted, ThreadPoolExecutor
by default throws RejectedExecutionException
, which org.apache.tomcat.util.net.AbstractEndpoint
just logs and ignores. ThreadPoolExecutor
's rejection behavior can also be customized by setting rejectedExecutionHandler
property.
The topic of configuration of Tomcat max connection, accept-count and other similar parameters is intentionally left out of scope.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With