目录
1.概述
线程池的核心:
线程池的实现原理是个标准的生产消费者模型,调用方不停向线程池中丢任务,线程池中的线程不停去执行任务。
通过上图不难发现实现线程池的核心就两个:
- 用来存任务的队列
- 用来执行任务的工作线程
关于这两个核心我们要考虑以下几点:
-
线程的个数
-
队列并发操作的安全性
-
队列的长度
-
队列满后,后面来的线程如何处理
线程的个数:
线程的个数是不是固定的?还是说能弹性扩容?在JDK的实现里选择的策略是线程个数是能根据负载来弹性扩容的,负载压力大的时候扩容,压力降下来后收缩回去。
队列并发操作的安全性:
外界并发的向其中丢任务,是对其进行并发的写,多条工作线程并发的去里面取任务,是对其进行并发读,所以读写的安全性是必须要保证的,所以要用阻塞队列来实现。
队列的长度:
用来存线程这个队列的长度太小了可能会不够用,要是没有限制又可能导致机器的物理内存耗尽,所以最好的方式就是给这个队列一个初始化的长度,然后允许这个队列动态扩容。
队列满后,后面来的任务如何处理:
队列满了之后新来的任务如何处理?也就是拒绝策略,关于这个拒绝策略,是直接拒绝丢弃掉?还是把队列中的老任务丢弃掉给它让位置?还是说不走线程池,直接新开一条线程来执行?
继承体系:
可以看到顶级父接口提供了规范标准,真正干活儿的实现类只有ThreadPoolExecutor和ScheduleThreadPoolExecutor。
本文主要以ThreadPoolExcutor为切入聊一下线程池的核心概念,由于ScheduleThreadPoolExecutor主要是用来做延迟任务和周期任务的,以它为切入来聊线程池的核心概念并不是那么合适,后面会有文章专门聊一聊JDK基于线程池打造的一整套延迟任务、周期任务、异步任务等,这些任务调度体系。
2.ThreadPoolExecutor
2.1.参数
public class ThreadPoolExecutor extends AbstractExecutorService{
private final AtomicInteger ctl;//状态变量
private final BlockingQueue<Runnable> workQueue;//任务队列
private final ReentrantLock mainLock;//用于保证线程池中各变量之间的互斥
private final HashSet<ThreadPoolExecutor.Worker> workers;//线程组
}
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//被封装的线程
Runnable firstTask;//worker收到的第一个任务
volatile long completedTasks;//worker执行完毕的任务数
}
线程池的核心参数为
-
corePoolSize:在线程池中始终维护的线程个数.
-
maxPoolSize:在corePooSize已满、队列也满的情况下,扩 充线程至此值。
-
keepAliveTime/TimeUnit:maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize。
-
blockingQueue:线程池所用到的队列。
-
threadFactory:线程工厂,用来创建从队列中取任务执行的线程,可以自定义,也有一个默认的。线程池中的线程就是由它来维护。
-
RejectedExecutionHandler:corePoolSize 已满,队列已 满,maxPoolSize 已满,最后的拒绝策略。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2.2.任务提交流程
先一句话总结:
线程能执行就直接执行,否则就去队列中排队。
注意下面说的新开线程不是指新开一条线程,而是交给线程池里的线程来执行。
接下来是细节:
入口在ThreadPoolExecutor.execute(Runnable command)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果当前线程组中的线程数量小于核心线程数,直接执行该任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果当前线程组中的线程数量大于等于核心线程数,将该任务放入队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//放入队列失败,再尝试新开一个线程来执行该任务
else if (!addWorker(command, false))
//这时候再失败意味着线程组数量已经大于maxPoolSize且任务队列已满,直接执行拒绝策略
reject(command);
}
2.3.任务执行过程
先一句话总结:
线程执行当前任务或者去队列中拿任务。
接下来是细节:
前面的提交过程可以看到任务要么被放到队列中去排队,要么直接调用addWorker封装成Worker类去执行任务,所以任务执行的核心就在Worker的内部。
Worker的run方法会调用runWorker方法,这个方法就是整个ThreadPoolExector最核心的方法,下面我们剔除不相关的代码,来看看整个过程,放心,很简洁好读。
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//循环去队列中取任务,直到有任务为止
w.lock();
try {
try {
task.run();//执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
} finally {
processWorkerExit(w, completedAbruptly);//worker退出
}
}
2.4.拒绝策略
ThreadPoolExecutor一共提供了四种拒绝策略:
-
AbortPolicy,默认拒绝策略,直接抛出异常。
-
CallerRunsPolicy,让任务在调用者的线程中执行,线程池不对任务做处理。
-
DiscardPolicy,线程池直接把任务丢弃掉,就当什么都没有发生。
-
DiscardOldestPolicy,把队列中最老的任务删掉,将新任务放入队列。
2.5.任务如何交给线程执行
上面聊了线程池的核心,还有另一个实现上的关键点可能需要聊一聊,就是如何将任务交给线程执行?
其实很简单,实现Runnable接口,然后每个任务重新run方法。执行任务就是调用run方法。
要将这个任务交给线程,就是将这个Runanable交给线程:
2.6.状态控制
举个例子:
线程池中有一条线程中的任务执行了对整个线程池的关闭,其它线程该怎么办?这里就能发现线程池是需要状态的,需要通过状态来让全局所有线程都知道此时线程池到底能不能正常工作。
线程池总的状态有如下几个:
-
RUNNING:这是线程池的默认状态,此时线程池可以接收新任务并处理队列中的任务。
-
SHUTDOWN:线程池不再接收新任务,但会继续处理队列中的任务直到完成。
-
STOP:线程池不仅不再接收新任务,还会中断正在执行的任务
-
TIDYING_UP 和 TERMINATED:这两个状态用于线程池的关闭流程,确保所有资源被正确释放。
不同状态下的线程池有不同的任务处理策略:
-
当线程池处于RUNNING状态时,它可以正常处理任务,包括将任务分配给空闲线程或放入任务队列。
-
当线程池处于SHUTDOWN状态时,它会拒绝新任务,只处理队列中的遗留任务。
-
在STOP状态下,线程池会立即停止所有工作,甚至中断正在执行的任务。
在JDK1.7之后ThreaddPoolExector将线程池状态和线程数合并成一个int型的ctl,高3位表示线程池状态,低29位表示线程数量:
之所以这样是因为这两个变量在线程池中是会被连用的,比如先判断线程池状态再判断线程数量来新来的任务如何处理。把它们合在一起,一次锁争抢就能拿到两个关键值了,然后按位操作能很快得到高底位各自的状态,方便操作,也更高效。
2.7.代码示例
在使用线程池的时候并不需要我们手动去创建,JDK中有工具类来帮我们创建各种线程池,这个工具类只是包了一层,其底层还是创建的我们上面聊的这些线程池的实现类,以下是代码示例:
import java.util.concurrent.*;
public class ThreadPoolExamples {
public static void main(String[] args) throws InterruptedException {
// 固定大小的线程池示例
fixedThreadPoolExample();
// 单线程线程池示例
singleThreadExecutorExample();
// 缓存线程池示例
cachedThreadPoolExample();
// 定时线程池示例
scheduledThreadPoolExample();
}
/**
* 创建一个固定大小的线程池,该线程池中的线程数量固定,不会因为任务的增加而增加新的线程。
*/
private static void fixedThreadPoolExample() {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); // 创建一个包含5个线程的线程池
for (int i = 0; i < 10; i++) {
final int taskId = i;
fixedThreadPool.execute(() -> {
System.out.println("FixedThreadPool: Task ID " + taskId + " is running by " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("FixedThreadPool: Task ID " + taskId + " finished by " + Thread.currentThread().getName());
});
}
fixedThreadPool.shutdown(); // 关闭线程池
try {
fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 创建一个单线程线程池,所有的任务都将在同一个线程中依次执行。
*/
private static void singleThreadExecutorExample() {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); // 创建一个单线程的线程池
for (int i = 0; i < 10; i++) {
final int taskId = i;
singleThreadExecutor.execute(() -> {
System.out.println("SingleThreadExecutor: Task ID " + taskId + " is running by " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("SingleThreadExecutor: Task ID " + taskId + " finished by " + Thread.currentThread().getName());
});
}
singleThreadExecutor.shutdown(); // 关闭线程池
try {
singleThreadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 创建一个缓存线程池,该线程池会根据需要创建新线程,但会在线程闲置后回收线程。
*/
private static void cachedThreadPoolExample() {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); // 创建一个缓存线程池
for (int i = 0; i < 10; i++) {
final int taskId = i;
cachedThreadPool.execute(() -> {
System.out.println("CachedThreadPool: Task ID " + taskId + " is running by " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("CachedThreadPool: Task ID " + taskId + " finished by " + Thread.currentThread().getName());
});
}
cachedThreadPool.shutdown(); // 关闭线程池
try {
cachedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 创建一个定时线程池,可以安排任务在指定时间执行,或定期执行任务。
*/
private static void scheduledThreadPoolExample() {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); // 创建一个包含5个线程的定时线程池
Runnable task = () -> System.out.println("ScheduledThreadPool: Task executed at: " + System.currentTimeMillis());
// 安排在1秒后执行一次,然后每隔2秒重复执行
scheduledThreadPool.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
try {
Thread.sleep(10000); // 主线程休眠10秒,以便观察任务执行情况
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduledThreadPool.shutdown(); // 关闭线程池
}
}