线程池是程序员的 java 重要基本功之一, 也是在生产实践中最频繁使用的技术之一, 所以对线程池的深刻理解是一件十分必要的事情;
本文主要梳理与 ThreadPoolExecutor 相关的知识与使用注意点, 方便工作中参考与陷阱避免;
构造器
ThreadPoolExecutor 有很多重载构造器, 其中参数最全最完整的构造器签名如下:
1 | public ThreadPoolExecutor(int corePoolSize, |
重要的方法
1 | submit |
重要的内部类
1 | private final class Worker extends AbstractQueuedSynchronizer implements Runnable {...} |
Executors 类的陷阱
jdk 原生的 Executors
类提供了便捷的方法以构造各种各样的 ThreadPoolExecutor, 但是在方便之余, Executors 为我们设置的参数却存在着一些安全隐患, 其主要体现在以下两个方面:
(1) 线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,直至 OOM
问题典型方法代表: newCachedThreadPool
:
1 | public static ExecutorService newCachedThreadPool() { |
它们将 maximumPoolSize 设置为了 Integer.MAX_VALUE
, 当处理速度赶不上提交速度时, 极端情况下便会无限构造新线程, 直到 OOM;
(2) 工作队列的堆积可能会耗费非常大的内存,直至 OOM
问题典型方法代表: newFixedThreadPool
与 newSingleThreadExecutor
:
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
1 | public static ExecutorService newSingleThreadExecutor() { |
它们使用了无界的 LinkedBlockingQueue 作为 workQueue, 当处理速度赶不上提交速度时, 极端情况下任务便会在队列里无限堆积, 直到 OOM;newScheduledThreadPool
与 newSingleThreadScheduledExecutor
方法 new 出了 ScheduledThreadPoolExecutor
类, 它使用的 DelayedWorkQueue 也是无界队列, 也存在上述隐患:
1 | public ScheduledThreadPoolExecutor(int corePoolSize) { |
另外, ScheduledThreadPoolExecutor 的构造器也将 maximumPoolSize 设置为了 Integer.MAX_VALUE
, 不过因为工作队列已经是无界的, 所以无论如何线程池内都最多只有 corePoolSize 个线程, maximumPoolSize 实际被短路了;
比较合理的线程池构造方案
不使用 Executors
类, 那么最直接的方式就是自己动手, 丰衣足食:
(1) 普通 ThreadPoolExecutor 的构造
1 | // org.apache.commons.lang3.concurrent.BasicThreadFactory |
另外, spring 也有一个自己包装的线程池实现: ThreadPoolTaskExecutor
, 不过其给出的默认参数依然是不安全的: maximumPoolSize = Integer.MAX_VALUE, queueCapacity = Integer.MAX_VALUE; 所以要想安全使用 spring 线程池, 参数设置还是不能省:
1 | <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> |
(2) 调度器 ScheduledThreadPoolExecutor 的构造
想要安全构造 ScheduledThreadPoolExecutor 比较麻烦, 因为没有直接的公开接口给我们设置线程池和队列的属性; ScheduledThreadPoolExecutor 的四个构造器只允许用户设置 corePoolSize、threadFactory 和 rejectHandler, 而其使用了固定的无界 workQueue: DelayedWorkQueue, 不允许用户自定义;
一种可行的办法是包装一个提交方法, 在每次提交调度任务之前, 先检查一下 workQueue 的长度, 限制在其大于某一阈值时, 拒绝提交任务:
1 | public ScheduledFuture<?> schedule(Runnable r, long delay, TimeUnit unit) { |