创建多少线程合适,要看多线程具体的应用场景。我们的程序一般都是 CPU 计算和 I/O 操作交叉执行的,由于 I/O 设备的速度相对于 CPU 来说都很慢,所以大部分情况下,I/O 操作执行的时间相对于 CPU 计算来说都非常长,这种场景我们一般都称为 I/O 密集型计算;和 I/O 密集型计算相对的就是 CPU 密集型计算了,CPU 密集型计算大部分场景下都是纯 CPU 计算。I/O 密集型程序和 CPU 密集型程序,计算最佳线程数的方法是不同的。下面我们对这两个场景分别说明。
对于 CPU 密集型计算,多线程本质上是提升多核 CPU 的利用率,所以对于一个 4 核的 CPU,每个核一个线程,理论上创建 4 个线程就可以了,再多创建线程也只是增加线程切换的成本。所以,对于 CPU 密集型的计算场景,理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。
对于 I/O 密集型的计算场景,比如前面我们的例子中,如果 CPU 计算和 I/O 操作的耗时是 1:1,那么 2 个线程是最合适的。如果 CPU 计算和 I/O 操作的耗时是 1:2,那多少个线程合适呢?是 3 个线程,这样 CPU 和 I/O 设备的利用率都达到了 100%。
此方式使用的是下方第一个计算公式进行计算的。
计算公式如下:最佳线程数 =CPU 核数 * 期待CPU利用率* [ 1 +(I/O 耗时 / CPU 耗时)]
但是这个公式只能起到一个知道作用,实际的业务场景一定要仔细考虑参数的配置,有没有一种计算公式,能够让开发同学很简易地计算出某种场景中的线程池应该是什么参数呢?
带着这样的疑问,我们调研了业界的一些线程池参数配置方案:
调研了以上业界方案后,我们并没有得出通用的线程池计算方式。并发任务的执行情况和任务类型相关,IO密集型和CPU密集型的任务运行起来的情况差异非常大,但这种占比是较难合理预估的,这导致很难有一个简单有效的通用公式帮我们直接计算出结果。
如果我们设置了最佳线程数,但是并发的请求实在超出线程可用范围系统会怎么处理呢,线程会触发相应的拒绝策略,请继续阅读。
ThreadPoolExecutor类实现了ExecutorService接口和Executor接口,可以设置线程池corePoolSize,最大线程池大小,AliveTime,拒绝策略等。常用构造方法:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略
当一个任务通过execute(Runnable)方法添加到线程池时:
l 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
2 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
3 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
4 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
2.2 拒绝任务handler有四个选择:
AbortPolicy策略将会 抛出java.util.concurrent.RejectedExecutionException异常
public class ThreadPool {
private static class Worker implements Runnable {
public void run() {
System.out.println(Thread.currentThread().getName() + " is running");
}
}
@Test
public void abortPolicy() {
int corePoolSize = 5;
int maxPoolSize = 8;
long keepAliveTime = 5;
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(5);
//拒绝策略1:将抛出 RejectedExecutionException.
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
keepAliveTime, TimeUnit.SECONDS,
queue, handler);
for (int i = 0; i < 10000; i++) {
executor.execute(new Worker());
System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" +
executor.getQueue().size() + ",已执行完的任务数目:" + executor.getCompletedTaskCount());
}
executor.shutdown();
}
}
截取运行结果
线程池中线程数目:8,队列中等待执行的任务数目:1,已执行完的任务数目:3708
线程池中线程数目:8,队列中等待执行的任务数目:1,已执行完的任务数目:3710
线程池中线程数目:8,队列中等待执行的任务数目:2,已执行完的任务数目:3710
线程池中线程数目:8,队列中等待执行的任务数目:3,已执行完的任务数目:3710
线程池中线程数目:8,队列中等待执行的任务数目:4,已执行完的任务数目:3710
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完的任务数目:3710
pool-2-thread-3 is running
pool-2-thread-3 is running
pool-2-thread-3 is running
pool-2-thread-3 is running
pool-2-thread-7 is running
java.util.concurrent.RejectedExecutionException:
AbortPolicy策略源码
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。
@Test
public void callerRunsPolicy() {
int corePoolSize = 5;
int maxPoolSize = 8;
long keepAliveTime = 5;
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(5);
//拒绝策略1:将抛出 RejectedExecutionException.
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
keepAliveTime, TimeUnit.SECONDS,
queue, handler);
for (int i = 0; i < 10000; i++) {
executor.execute(new Worker());
System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" +
executor.getQueue().size() + ",已执行完的任务数目:" + executor.getCompletedTaskCount());
}
executor.shutdown();
}
截取运行结果
线程池中线程数目:8,队列中等待执行的任务数目:0,已执行完的任务数目:602
线程池中线程数目:8,队列中等待执行的任务数目:1,已执行完的任务数目:604
线程池中线程数目:8,队列中等待执行的任务数目:2,已执行完的任务数目:604
线程池中线程数目:8,队列中等待执行的任务数目:3,已执行完的任务数目:604
线程池中线程数目:8,队列中等待执行的任务数目:4,已执行完的任务数目:604
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完的任务数目:604
pool-4-thread-4 is running
pool-4-thread-4 is running
pool-4-thread-4 is running
pool-4-thread-1 is running
main is running
CallerRunsPolicy策略源码
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
DiscardOldestPolicy,将会丢弃等待队列中的队头任务,e.getQueue().poll()
@Test
public void discardOldestPolicy() {
int corePoolSize = 5;
int maxPoolSize = 8;
long keepAliveTime = 5;
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(5);
//拒绝策略1:将抛出 RejectedExecutionException.
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
keepAliveTime, TimeUnit.SECONDS,
queue, handler);
for (int i = 0; i < 10000; i++) {
executor.execute(new Worker());
System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" +
executor.getQueue().size() + ",已执行完的任务数目:" + executor.getCompletedTaskCount());
}
executor.shutdown();
}
截取运行结果
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完的任务数目:7132
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完的任务数目:7132
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完的任务数目:7132
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完的任务数目:7132
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完的任务数目:7132
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完的任务数目:7132
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完的任务数目:7132
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完的任务数目:7132
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完的任务数目:7132
pool-1-thread-8 is running
pool-1-thread-3 is running
pool-1-thread-8 is running
pool-1-thread-3 is running
pool-1-thread-3 is running
pool-1-thread-2 is running
pool-1-thread-8 is running
pool-1-thread-4 is running
pool-1-thread-6 is running
pool-1-thread-5 is running
DiscardOldestPolicy策略源码
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
DiscardPolicy将会丢弃当前任务,通过源码可以看出,该策略不会执行任务操作。
@Test
public void discardPolicy() {
int corePoolSize = 5;
int maxPoolSize = 8;
long keepAliveTime = 5;
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(5);
//拒绝策略1:将抛出 RejectedExecutionException.
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
keepAliveTime, TimeUnit.SECONDS,
queue, handler);
for (int i = 0; i < 10000; i++) {
executor.execute(new Worker());
System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" +
executor.getQueue().size() + ",已执行完的任务数目:" + executor.getCompletedTaskCount());
}
executor.shutdown();
}
截取运行结果
线程池中线程数目:8,队列中等待执行的任务数目:1,已执行完的任务数目:6228
线程池中线程数目:8,队列中等待执行的任务数目:1,已执行完的任务数目:6229
pool-3-thread-5 is running
pool-3-thread-8 is running
pool-3-thread-1 is running
线程池中线程数目:8,队列中等待执行的任务数目:0,已执行完的任务数目:6231
线程池中线程数目:8,队列中等待执行的任务数目:1,已执行完的任务数目:6232
pool-3-thread-2 is running
pool-3-thread-7 is running
pool-3-thread-7 is running
线程池中线程数目:8,队列中等待执行的任务数目:0,已执行完的任务数目:6234
DiscardPolicy策略源码
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
四种拒绝策略是相互独立的。拒绝时是否抛出异常,如果抛出则选择(AbortPolicy),会抛出(RejectedExecutionException)异常。如果不希望抛出异常,则考虑是执行还是放弃一个任务,如果执行(CallerRunsPolicy)则会由调用的线程进行任务的执行。如果放弃则分为(DiscardOldestPolicy)放弃最旧的一个任务,和(DiscardPolicy)放弃当前任务两种。
AbortPolicy 为默认拒绝策略
参考博客:
《Java 并发编程实战》