Wednesday, March 09, 2011

ThreadPoolExecutor

I've been working with ThreadPoolExecutor and it's a little confusing so I thought I'd share my experience.

Some background - Executor is a simple interface:
An object that executes submitted Runnable tasks. This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc. An Executor is normally used instead of explicitly creating threads.
A ThreadPoolExecutor uses a pool of threads to execute the tasks. The trick is in configuring it. I wanted:
  • no threads when it's idle
  • a limited number of threads when it's busy
  • a bounded queue
You have to read the documentation for ThreadPoolExecutor very carefully. Forget about your preconceptions of how it "probably" works. 

You can specify "core pool size" and "max pool size". Normally "core" threads do not time out. You also specify the type of queue (bounded, unbounded, or synchronous), and the rejection policy.

There are a couple of gotcha's. 
  • Even if activity is very low and one thread could handle it, the full number of core threads will be created. 
  • Even if activity is high and all the core threads are busy, additional threads will not be created until the queue is full. 
Both of these are because it's very hard to know if a thread is idle. 

They combine to lead to a common mistake - setting core size to 0 because you want all the threads to time out, and using an unbounded queue, which will never be full. Using a large queue is almost as bad because additional threads won't be created until that many requests are queued up - ouch!

Here's what I ended up with:
private static final int CORE_THREADS = 0;
private static final int MAX_THREADS = 8;
private static final int KEEP_ALIVE = 1;
private static final ThreadPoolExecutor executor =
     new ThreadPoolExecutor(CORE_THREADS, MAX_THREADS,
          KEEP_ALIVE, TimeUnit.MINUTES,
          new SynchronousQueue<Runnable>(),
          threadFactory, 
          new ThreadPoolExecutor.CallerRunsPolicy());
A SynchronousQueue is a zero size queue, it hands off items directly. Because its remainingCapacity is always zero, it is always "full", so up to MAX_THREADS will be created as needed.

Without a queue and with a maximum number of threads, it's possible that tasks will need to be rejected. By specifying the CallerRunsPolicy rejected tasks will be executed by the calling thread. This is a simple feedback mechanism that will throttle requests since the caller won't be able to accept additional requests while it's busy executing a task.

I chose MAX_THREADS of 8 to be a bit bigger than the expected number of cpu cores (to allow for some threads being blocked). In our actual usage one or two threads seem to be sufficient.

There is a standard Executors.newCachedThreadPool but the documentation isn't specific about what its configuration is. From other sources it appears quite similar to my configuration - zero core threads and a SynchronousQueue. However, it has unlimited MAX_THREADS instead of CallerRunsPolicy. In my case, I wanted to limit the number of thread because each holds some limited resources.

I'm not an expert on this stuff, so suggestions or criticisms are welcome.

2 comments:

Unknown said...

Are you sure the SynchronousQueue might reject requests, therefore needing the CallerRunsPolicy? Doesn't it just wait for someone to take it out of the queue immediately, so it will block anyway until a thread completes?

Andrew McKinlay said...

No, I'm not sure, I didn't actually test, and my understanding could be wrong.