线程池是程序员的 java 重要基本功之一, 也是在生产实践中最频繁使用的技术之一, 所以对线程池的深刻理解是一件十分必要的事情;
本文主要梳理与 ThreadPoolExecutor 相关的知识与使用注意点, 方便工作中参考与陷阱避免;
构造器
ThreadPoolExecutor 有很多重载构造器, 其中参数最全最完整的构造器签名如下:1
2
3
4
5
6
7public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {...}
重要的方法
1 | submit |
重要的内部类
1 | private final class Worker extends AbstractQueuedSynchronizer implements Runnable {...} |
Executors 类的陷阱
jdk 原生的 Executors
类提供了便捷的方法以构造各种各样的 ThreadPoolExecutor, 但是在方便之余, Executors 为我们设置的参数却存在着一些安全隐患, 其主要体现在以下两个方面:
(1) 线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,直至 OOM
问题典型方法代表: newCachedThreadPool
:1
2
3
4
5public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
它们将 maximumPoolSize 设置为了 Integer.MAX_VALUE
, 当处理速度赶不上提交速度时, 极端情况下便会无限构造新线程, 直到 OOM;
(2) 工作队列的堆积可能会耗费非常大的内存,直至 OOM
问题典型方法代表: newFixedThreadPool
与 newSingleThreadExecutor
:1
2
3
4
5public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
1 | public static ExecutorService newSingleThreadExecutor() { |
它们使用了无界的 LinkedBlockingQueue 作为 workQueue, 当处理速度赶不上提交速度时, 极端情况下任务便会在队列里无限堆积, 直到 OOM;newScheduledThreadPool
与 newSingleThreadScheduledExecutor
方法 new 出了 ScheduledThreadPoolExecutor
类, 它使用的 DelayedWorkQueue 也是无界队列, 也存在上述隐患:1
2
3public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
另外, ScheduledThreadPoolExecutor 的构造器也将 maximumPoolSize 设置为了 Integer.MAX_VALUE
, 不过因为工作队列已经是无界的, 所以无论如何线程池内都最多只有 corePoolSize 个线程, maximumPoolSize 实际被短路了;
比较合理的线程池构造方案
不使用 Executors
类, 那么最直接的方式就是自己动手, 丰衣足食:
(1) 普通 ThreadPoolExecutor 的构造1
2
3
4
5
6
7
8
9
10
11
12// org.apache.commons.lang3.concurrent.BasicThreadFactory
ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("xxx-%d").daemon(true).build();
// com.google.common.util.concurrent.ThreadFactoryBuilder
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("yyy-%d").build();
// 自己构造合理的参数
ExecutorService pool = new ThreadPoolExecutor(5,
200, // 有限的 maximumPoolSize
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), // 使用有界队列
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
另外, spring 也有一个自己包装的线程池实现: ThreadPoolTaskExecutor
, 不过其给出的默认参数依然是不安全的: maximumPoolSize = Integer.MAX_VALUE, queueCapacity = Integer.MAX_VALUE; 所以要想安全使用 spring 线程池, 参数设置还是不能省:1
2
3
4
5
6
7
8
9
10
11
12
13
14<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 线程池维护线程的最少数量 -->
<property name="corePoolSize" value="5" />
<!-- 线程池维护线程的最大数量 -->
<property name="maxPoolSize" value="200" />
<!-- 允许的空闲时间 -->
<property name="keepAliveSeconds" value="200" />
<!-- 缓存队列长度 -->
<property name="queueCapacity" value="20" />
<!-- 对拒绝task的处理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy" />
</property>
</bean>
(2) 调度器 ScheduledThreadPoolExecutor 的构造
想要安全构造 ScheduledThreadPoolExecutor 比较麻烦, 因为没有直接的公开接口给我们设置线程池和队列的属性; ScheduledThreadPoolExecutor 的四个构造器只允许用户设置 corePoolSize、threadFactory 和 rejectHandler, 而其使用了固定的无界 workQueue: DelayedWorkQueue, 不允许用户自定义;
一种可行的办法是包装一个提交方法, 在每次提交调度任务之前, 先检查一下 workQueue 的长度, 限制在其大于某一阈值时, 拒绝提交任务:1
2
3
4
5
6public ScheduledFuture<?> schedule(Runnable r, long delay, TimeUnit unit) {
if (scheduledThreadPool.getQueue().length() > = THRESHOLD_SIZE) {
throw new RejectedExecutionException("task " + r.toString() + " rejected due to queue fulled");
}
threadPoolExecutor.schedule(r, delay, unit);
}