主要讲解Java线程池的基础知识。
目前书籍《Java并发编程实战》看到“第7章:取消与关闭”,里面涉及到部分线程池的内容,然后第8章就是线程池,所以打算把之前看的线程池的资料再整理一下,便于后面能更好理解书中的内容。
之前看过一篇博客,关于线程池的内容讲解的非常好,我只截取基础知识部分,把Java基础内容全部掌握后,再对里面的原理部分进行深入理解,后面会附上该篇博客的链接。
我们知道,线程的创建和销毁都需要映射到操作系统,因此其代价是比较高昂的。出于避免频繁创建、销毁线程以及方便线程管理的需要,线程池应运而生。
有句话叫做艺术来源于生活,编程语言也是如此,很多设计思想能映射到日常生活中,比如面向对象思想、封装、继承,等等。今天我们要说的线程池,它同样可以在现实世界找到对应的实体——工厂。先假想一个工厂的生产流程:
工厂中有固定的一批工人,称为正式工人,工厂接收的订单由这些工人去完成。当订单增加,正式工人已经忙不过来了,工厂会将生产原料暂时堆积在仓库中,等有空闲的工人时再处理(因为工人空闲了也不会主动处理仓库中的生产任务,所以需要调度员实时调度)。仓库堆积满了后,订单还在增加怎么办?工厂只能临时扩招一批工人来应对生产高峰,而这批工人高峰结束后是要清退的,所以称为临时工。当时临时工也招满后(受限于工位限制,临时工数量有上限),后面的订单只能忍痛拒绝了。我们做如下一番映射:
getTask()是一个方法,将任务队列中的任务调度给空闲线程。
映射后,形成线程池流程图如下,两者是不是有异曲同工之妙?
点评一下:感觉作者的这个类比,太TM经典了!!!直接抓住了线程调用的精髓。这就是为什么看书时不能只盯着书看,可能你看半天都不懂书中讲的啥,找一篇经典的博客,瞬间给你拨开云雾,而且还印象深刻。
这样,线程池的工作原理或者说流程就很好理解了,提炼成一个简图:
那么接下来,问题来了,线程池是具体如何实现这套工作机制的呢?从Java线程池Executor框架体系可以看出:线程池的真正实现类是ThreadPoolExecutor,因此我们接下来重点研究这个类。
研究一个类,先从它的构造方法开始。ThreadPoolExecutor提供了4个有参构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
感觉构造方法有点多,其实前面的基础参数都是一样的,就后面两个可选参数“线程工厂”和“拒绝策略”不一样,组合一下就是2*2=4种情况。
解释一下构造方法中涉及到的参数:
了解完“线程的设计思路”,再看构造方法的这些参数,感觉就非常容易懂了。
使用ThreadPoolExecutor需要指定一个实现了BlockingQueue接口的任务等待队列。在ThreadPoolExecutor线程池的API文档中,一共推荐了三种等待队列:
另外,Java还提供了另外4种队列:
感觉这提供的队列有些多啊,总共7个!说实话,我现在还不知道实际场景用哪个比较好,后面我们可以看看Java封装好的线程池,里面用的队列都是哪种。
线程池有一个重要的机制:拒绝策略。当线程池workQueue已满且无法再创建新线程池时,就要拒绝后续任务了。拒绝策略需要实现RejectedExecutionHandler接口,不过Executors框架已经为我们实现了4种拒绝策略:
线程工厂指定创建线程的方式,这个参数不是必选项,Executors类已经为我们非常贴心地提供了一个默认的线程工厂:
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
线程池有5种状态:
volatile int runState;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
runState表示当前线程池的状态,它是一个 volatile 变量用来保证线程之间的可见性。
下面的几个static final变量表示runState可能的几个取值,有以下几个状态:
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:
public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null); //注意传进去的参数是null
}
public int prestartAllCoreThreads() {
int n = 0;
while (addIfUnderCorePoolSize(null))//注意传进去的参数是null
++n;
return n;
}
ThreadPoolExecutor提供了两个方法,用于线程池的关闭:
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:
通过构造方法使用ThreadPoolExecutor是线程池最直接的使用方式,下面看一个实例:
public class ThreadPoolExecutorTest {
public static void main(String args[]) {
// 创建线程池(核心线程数是3,最大线程数是5,超时时间是5秒)
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,5,5, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(5));
// 向线程池提交任务
for (int i = 0; i < threadPool.getCorePoolSize(); i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 2; j++) {
System.out.println(Thread.currentThread().getName() + ":" + j);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
// 关闭线程池
threadPool.shutdown(); // 设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程
// threadPool.shutdownNow(); // 设置线程池的状态为STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,该方法要慎用,容易造成不可控的后果
}
}
// 输出:
// pool-1-thread-1:0
// pool-1-thread-3:0
// pool-1-thread-2:0
// pool-1-thread-2:1
// pool-1-thread-1:1
// pool-1-thread-3:1
另外,Executors封装好了4种常见的功能线程池(还是那么地贴心):
固定容量线程池。其特点是最大线程数就是核心线程数,意味着线程池只能创建核心线程,keepAliveTime为0,即线程执行完任务立即回收。任务队列未指定容量,代表使用默认值Integer.MAX_VALUE。适用于需要控制并发线程的场景。
// 使用默认线程工厂
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// 需要自定义线程工厂
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
使用示例:
public class FixedThreadPoolTest {
public static void main(String args[]) {
// 创建线程池对象,设置核心线程和最大线程数为5
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is running.");
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("Throw Exception.");
}
System.out.println(Thread.currentThread().getName() + " after sleep, is still running.");
}
});
//fixedThreadPool.shutdown();
fixedThreadPool.shutdownNow(); // 不建议这样使用,很危险,这里仅用于测试
}
}
// 输出:
// pool-1-thread-1 is running.
// Throw Exception.
// pool-1-thread-1 after sleep, is still running.
// java.lang.InterruptedException: sleep interrupted
// at java.lang.Thread.sleep(Native Method)
// at com.java.parallel.pool.FixedThreadPoolTest$1.run(FixedThreadPoolTest.java:15)
// at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
// at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
// at java.lang.Thread.run(Thread.java:748)
作者的这个示例,我加了一点料,我就想看看直接中断会是什么效果,结果发现直接中断后,线程直接抛出异常,我捕获异常后,输出了一些结果。正常情况下,捕获异常是需要做一些处理,我这里仅作测试。
单线程线程池。特点是线程池中只有一个线程(核心线程),线程执行完任务立即回收,使用有界阻塞队列(容量未指定,使用默认值Integer.MAX_VALUE)
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// 为节省篇幅,省略了自定义线程工厂方式的源码
使用示例:
public class SingleThreadExecutorTest {
public static void main(String args[]) {
// 创建线程池对象,设置核心线程和最大线程数为1
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
singleThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is running.");
}
});
singleThreadPool.shutdown();
}
}
// 输出:
// pool-1-thread-1 is running.
定时线程池。指定核心线程数量,普通线程数量无限,线程执行完任务立即回收,任务队列为延时阻塞队列。这是一个比较特别的线程池,适用于执行定时或周期性的任务。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// 继承了 ThreadPoolExecutor
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
implements ScheduledExecutorService {
// 构造函数,省略了自定义线程工厂的构造函数
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
// 延时执行任务
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
...
}
// 定时执行任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {...}
}
使用示例:
public class ScheduledThreadPoolTest {
public static void main(String args[]) {
// 创建定时线程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 向线程池提交任务
scheduledThreadPool.schedule(new Runnable(){
public void run() {
System.out.println(Thread.currentThread().getName() + "--->运行");
}
}, 5, TimeUnit.SECONDS); // 延迟5s后执行任务
scheduledThreadPool.shutdown();
}
}
// 输出:
// pool-1-thread-1--->运行
缓存线程池。没有核心线程,普通线程数量为Integer.MAX_VALUE(可以理解为无限),线程闲置60s后回收,任务队列使用SynchronousQueue这种无容量的同步队列。适用于任务量大但耗时低的场景。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
使用示例:
public class CachedThreadPoolTest {
public static void main(String args[]) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
cachedThreadPool.execute(new Runnable(){
public void run() {
System.out.println(Thread.currentThread().getName() + "--->运行");
}
});
cachedThreadPool.shutdown();
}
}
// 输出:
// pool-1-thread-1--->运行
最后总结一下Executors封装线程池,每种方式用的是哪种队列:
稍微解读一下:
这篇文章其实是我刚开始接触线程池时找到了,感觉非常经典,刚好现在需要系统学习Java并发编程,就把这篇文章重新整理一下,然后有些地方加入自己的解读,希望对大家有所帮助。
欢迎大家多多点赞,更多文章,请关注微信公众号“楼仔进阶之路”,点关注,不迷路~~