I'm using an ExecutorService to perform some computations on a thread pool :
@Override
public double predict(IModelInputData<EXTRACTER> modelInputData) throws PredictionFailureException {
try {
return threadPool.submit(() -> regressor.predict(modelInputData)).get();
} catch (InterruptedException|ExecutionException e) {
throw new PredictionFailureException("Error during prediction", e);
}
}
The executor service threadPool has been created using a bounded blocking queue and a custom policy :
private static class DiscardOldestWithWarningPolicy extends ThreadPoolExecutor.DiscardOldestPolicy {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
super.rejectedExecution(r, e);
LOG.warn("Discarded the oldest prediction task (too many tasks in the queue)");
}
}
I made a test to ensure this warning was actually logged, and it is, but I am quite unsure about what happens to the threads that have blocked on threadPool.submit(...).get() when the corresponding task is discarded. It looks to me like they stay blocked, but this would not make a lot of sense. The behaviour I would like to see hapenning is an exception being thrown to interrupt the thread but I have not seen any.
Am I missing something ?
Yep, it looks like the builtin DiscardOldestPolicy just drops the oldest one on the floor. The problem is that the ThreadPoolExecutor is holding a Runnable, and has no way of knowing what to do with it. You could implement your own handler which does something useful with the task (making assumptions about the type of the Runnable).
Something like:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
Runnable old = e.getQueue().poll();
if(old instanceof Future<?>) {
((Future<?>)old).cancel(true);
}
e.execute(r);
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With