I've been messing around with different strategies for thread pooling using ThreadPoolExecutor with JDK6. I have a Priority queue working but wasn't sure if I liked how the pool didn't size after keepAliveTime (what you get with an unbounded queue). So, I'm looking at a ThreadPoolExecutor using a LinkedBlockingQueue and the CallerRuns policy.
The issue I'm having with it now is that the pool ramps up, as the docs explain that it should, but after the tasks complete and the keepAliveTime comes into play getPoolSize shows the pool getting reduced to zero. The example code below should let you see the basis for my question:
public class ThreadPoolingDemo {
    private final static Logger LOGGER =
         Logger.getLogger(ThreadPoolingDemo.class.getName());
    public static void main(String[] args) throws Exception {
        LOGGER.info("MAIN THREAD:starting");
        runCallerTestPlain();   
    }
    private static void runCallerTestPlain() throws InterruptedException {
        //10 core threads, 
        //50 max pool size, 
        //100 tasks in queue, 
        //at max pool and full queue - caller runs task
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 50,
            5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100),
            new ThreadPoolExecutor.CallerRunsPolicy());
        //dump 5000 tasks on the queue
        for (int i = 0; i < 5000; i++) {
            tpe.submit(new Runnable() {
                @Override
                public void run() {
                    //just to eat some time and give a little feedback
                    for (int j = 0; j < 20; j++) {
                        LOGGER.info("First-batch Task, looping:" + j + "["
                               + Thread.currentThread().getId() + "]");
                    }
                }
            }, null);
        }
        LOGGER.info("MAIN THREAD:!!Done queueing!!");
        //check tpe statistics forever
        while (true) {
            LOGGER.info("Active count: " + tpe.getActiveCount() + " Pool size: "
                 + tpe.getPoolSize() + " Largest Pool: " + tpe.getLargestPoolSize());
            Thread.sleep(1000);
        }
    }
}
I found an old bug that seems to be this issue but it was closed: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6458662. Could this still be present in 1.6 or am I missing something?
It looks like I Rubber Ducked this one (http://www.codinghorror.com/blog/2012/03/rubber-duck-problem-solving.html). The bug I linked above is related to this one: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6576792, where the issue seems to be resolved in 1.7 (I loaded up 1.7 and verified - fixed...). I guess my main problem was that a bug this fundamental remained for almost a decade. I spent too much time writing this up to not post it now, hope it helps someone.
... after the tasks complete and the keepAliveTime comes into play getPoolSize shows the pool getting reduced to zero.
So this looks to be a race condition in the ThreadPoolExecutor.  I guess it is working according to design albeit not expected.  In the getTask() method which the worker threads loop through to get tasks from the blocking queue, you see this code:
if (state == SHUTDOWN)  // Help drain queue
    r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
    r = workQueue.take();
if (r != null)
    return r;
if (workerCanExit()) {
    if (runState >= SHUTDOWN) // Wake up others
        interruptIdleWorkers();
    return null;
}
If the poolSize grows above the corePoolSize then if the poll times out after keepAliveTime, the code falls down to workerCanExit() since r is null.  All of the threads can return true from that method since it is just testing the state of poolSize:
    mainLock.lock();
    boolean canExit;
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize)); << test poolSize here
    } finally {
        mainLock.unlock();                         << race to workerDone() begins
    }
Once that returns true then the worker thread exits and then the poolSize is decremented.  If all of the worker threads do that test at the same time then they will all exit because of the race between the testing of poolSize and the stopping of the worker when --poolSize occurs.
What surprises me is how consistent that race condition is.  If you add some randomization to the sleep() inside of the run() below then you can get some core threads to not quit but I would have thought the race condition would have been harder to hit.
You can see this behavior in the following test:
@Test
public void test() throws Exception {
    int before = Thread.activeCount();
    int core = 10;
    int max = 50;
    int queueSize = 100;
    ThreadPoolExecutor tpe =
            new ThreadPoolExecutor(core, max, 1L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(queueSize),
                    new ThreadPoolExecutor.CallerRunsPolicy());
    tpe.allowCoreThreadTimeOut(false);
    assertEquals(0, tpe.getActiveCount());
    // if we start 1 more than can go into core or queue, poolSize goes to 0
    int startN = core + queueSize + 1;
    // if we only start jobs the core can take care of, then it won't go to 0
    // int startN = core + queueSize;
    for (int i = 0; i < startN; i++) {
        tpe.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    while (true) {
        System.out.println("active = " + tpe.getActiveCount() + ", poolSize = " + tpe.getPoolSize()
                + ", largest = " + tpe.getLargestPoolSize() + ", threads = " + (Thread.activeCount() - before));
        Thread.sleep(1000);
    }
}
If you change the sleep line inside of the run() method to something like this:
private final Random random = new Random();
...
    Thread.sleep(100 + random.nextInt(100));
This will make the race condition harder to hit so some core threads will still be around.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With