目录
1.ThreadPoolExecutor线程池原理及其execute方法
2.ThreadPoolExecutor线程池之submit方法
3.1 ExecutorService 中 shutdown()、shutdownNow()、awaitTermination() 含义和区别
1、shutdown() 和 shutdownNow() 的区别
2、shutdown() 和 awaitTermination() 的区别
1.ThreadPoolExecutor线程池原理及其execute方法
ThreadPoolExecutor是jdk自带的线程池。
对于线程池大部分人可能会用,也知道为什么用。无非就是任务需要异步执行,再者就是线程需要统一管理起来。对于从线程池中获取线程,大部分人可能只知道,我现在需要一个线程来执行一个任务,那我就把任务丢到线程池里,线程池里有空闲的线程就执行,没有空闲的线程就等待。实际上对于线程池的执行原理远远不止这么简单。
在Java并发包中提供了线程池类——ThreadPoolExecutor,实际上更多的我们可能用到的是Executors工厂类为我们提供的线程池:newFixedThreadPool、newSingleThreadPool、newCachedThreadPool,这三个线程池并不是ThreadPoolExecutor的子类,关于这几者之间的关系,我们先来查看ThreadPoolExecutor,查看源码发现其一共有4个构造方法。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
首先就从这几个参数开始来了解线程池ThreadPoolExecutor的执行原理。
corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去;
maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量;
keepAliveTime:线程活动保持时间,线程池的工作线程空闲后,保持存活的时间。
unit:线程活动保持时间的单位。
workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;
threadFactory:线程工厂,用于创建线程,一般用默认即可;
handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务;
用图来解释更清晰:
corePoolSize和maximumPoolSize都在指定线程池中的线程数量,好像平时用到线程池的时候最多就只需要传递一个线程池大小的参数就能创建一个线程池啊,Java为我们提供了一些常用的线程池类就是上面提到的newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,当然如果我们想要自己发挥创建自定义的线程池就得自己来“配置”有关线程池的一些参数。
当把一个任务交给线程池来处理的时候,线程池的执行原理如下图所示参考自《Java并发编程的艺术》
①首先会判断核心线程池里是否有线程可执行,有空闲线程则创建一个线程来执行任务。
②当核心线程池里已经没有线程可执行的时候,此时将任务丢到任务队列中去。
③如果任务队列(有界)也已经满了的话,但运行的线程数小于最大线程池的数量的时候,此时将会新建一个线程用于执行任务,但如果运行的线程数已经达到最大线程池的数量的时候,此时将无法创建线程执行任务。
所以实际上对于线程池不仅是单纯地将任务丢到线程池,线程池中有线程就执行任务,没线程就等待。
为巩固一下线程池的原理,现在再来了解上面提到的常用的3个线程池:
Executors.newFixedThreadPool:创建一个固定数量线程的线程池。
// Executors#newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
可以看到newFixedThreadPool中调用的是ThreadPoolExecutor类,传递的参数corePoolSize= maximumPoolSize=nThread。回顾线程池的执行原理,当一个任务提交到线程池中,首先判断核心线程池里有没有空闲线程,有则创建线程,没有则将任务放到任务队列(这里是有界阻塞队列LinkedBlockingQueue)中,如果任务队列已经满了的话,对于newFixedThreadPool来说,它的最大线程池数量=核心线程池数量,此时任务队列也满了,将不能扩展创建新的线程来执行任务。
Executors.newSingleThreadExecutor:创建只包含一个线程的线程池。
//Executors# newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegateExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
只有一个线程的线程池好像有点奇怪,并且并没有直接将返回ThreadPoolExecutor,甚至也没有直接将线程池数量1传递给newFixedThreadPool返回。那就说明这个只含有一个线程的线程池,或许并没有只包含一个线程那么简单。在其源码注释中这么写到:创建只有一个工作线程的线程池用于操作一个无界队列(如果由于前驱节点的执行被终止结束了,一个新的线程将会继续执行后继节点线程)任务得以继续执行,不同于newFixedThreadPool(1)不会有额外的线程来重新继续执行后继节点。也就是说newSingleThreadExecutor自始至终都只有一个线程在执行,这和newFixedThreadPool一样,但如果线程终止结束过后newSingleThreadExecutor则会重新创建一个新的线程来继续执行任务队列中的线程,而newFixedThreaPool则不会。
Executors.newCachedThreadPool:根据需要创建新线程的线程池。
//Executors#newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPooExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
可以看到newCachedThread返回的是ThreadPoolExecutor,其参数核心线程池corePoolSize = 0, maximumPoolSize = Integer.MAX_VALUE,这也就是说当任务被提交到newCachedThread线程池时,将会直接把任务放到SynchronousQueue任务队列中,maximumPool从任务队列中获取任务。注意SynchronousQueue是一个没有容量的队列,也就是说每个入队操作必须等待另一个线程的对应出队操作,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,newCachedThreadPool会不断创建线程,线程多并不是一件好事,严重会耗尽CPU和内存资源。
题外话:newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,这三者都直接或间接调用了ThreadPoolExecutor,为什么它们三者没有直接是其子类,而是通过Executors来实例化呢?这是所采用的静态工厂方法,在java.util.Connections接口中同样也是采用的静态工厂方法来创建相关的类。这样有很多好处,静态工厂方法是用来产生对象的,产生什么对象没关系,只要返回原返回类型或原返回类型的子类型都可以,降低API数目和使用难度,在《Effective Java》中的第1条就是静态工厂方法。
回到ThreadPoolExecutor,首先来看它的继承关系:
ThreadPoolExecutor它的顶级父类是Executor接口,只包含了一个方法——execute,这个方法也就是线程池的“执行”。
//Executor#execute
public interface Executor {
void execute(Runnable command);
}
Executor#execute的实现则是在ThreadPoolExecutor中实现的:
//ThreadPoolExecutor#execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
…
}
参考:12.ThreadPoolExecutor线程池原理及其execute方法 - OKevin - 博客园
java线程池ThreadPoolExecutor类使用详解 - bigfan - 博客园
2.ThreadPoolExecutor线程池之submit方法
对于一个任务的执行有时我们不需要它返回结果,但是有我们需要它的返回执行结果。对于线程来讲,如果不需要它返回结果则实现Runnable,而如果需要执行结果的话则可以实现Callable。在线程池同样execute提供一个不需要返回结果的任务执行,而对于需要结果返回的则可调用其submit方法。
回顾ThreadPoolExecutor的继承关系。
在Executor接口中只定义了execute方法,而submit方法则是在ExecutorService接口中定义的。
//ExecutorService
public interface ExecutorService extends Executor {
...
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
<T> Future<T> submit(Runnable task);
...
}
而在其子类AbstractExecutorService实现了submit方法。
//AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService {
...
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerExeption();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
...
}
在AbstractExecutorService实现的submit方法实际上是一个模板方法,定义了submit方法的算法骨架,其execute交给了子类。(可以看到在很多源码中,模板方法模式被大量运用,有关模板方法模式可参考《模板方法模式》)
尽管submit方法能提供线程执行的返回值,但只有实现了Callable才会有返回值,而实现Runnable的线程则是没有返回值的,也就是说在上面的3个方法中,submit(Callable<T> task)能获取到它的返回值,submit(Runnable task, T result)能通过传入的载体result间接获得线程的返回值或者准确来说交给线程处理一下,而最后一个方法submit(Runnable task)则是没有返回值的,就算获取它的返回值也是null。
参考:13.ThreadPoolExecutor线程池之submit方法 - OKevin - 博客园
3.ExecutorService
ExecutorService是ThreadPoolExecutor的父类的父类,基本用法都差不多。
3.1 ExecutorService 中 shutdown()、shutdownNow()、awaitTermination() 含义和区别
ExecutorService
是 Java 提供的线程池,也就是说,每次我们需要使用线程的时候,可以通过 ExecutorService
创建线程。
使用 ExecutorService
类时,经常用到 shutdown()
、shutdownNow()
、awaitTermination()
3个方法,下面我们来说说它们的含义和三者的区别 。
一、方法说明
1、shutdown():停止接收新任务,原来的任务继续执行
英文原意:关闭,倒闭;停工。 这里的意思是 关闭线程池。与使用数据库连接池一样,每次使用完毕后,都要关闭线程池。
1、停止接收新的submit的任务;
2、已经提交的任务(包括正在跑的和队列中等待的),会继续执行完成;
3、等到第2步完成后,才真正停止;
2、shutdownNow():停止接收新任务,原来的任务停止执行
1、跟 shutdown() 一样,先停止接收新submit的任务;
2、忽略队列里等待的任务;
3、尝试将正在执行的任务interrupt中断;
4、返回未执行的任务列表;
3、awaitTermination(long timeOut, TimeUnit unit):当前线程阻塞
说明:它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt() 方法是无法中断当前的线程的。所以,shutdownNow() 并不代表线程池就一定立即就能退出,它也可能必须要等待所有正在执行的任务都执行完成了才能退出。但是大多数时候是能立即退出的。
timeout 和 TimeUnit 两个参数,用于设定超时的时间及单位
当前线程阻塞,直到:
- 等所有已提交的任务(包括正在跑的和队列中等待的)执行完;
- 或者 等超时时间到了(timeout 和 TimeUnit设定的时间);
- 或者 线程被中断,抛出InterruptedException
然后会监测 ExecutorService 是否已经关闭,返回true(shutdown请求后所有任务执行完毕)或false(已超时)
二、区别
1、shutdown() 和 shutdownNow() 的区别
shutdown()
只是关闭了提交通道,用submit()是无效的;而内部该怎么跑还是怎么跑,跑完再停。shutdownNow()
能立即停止线程池,正在跑的和正在等待的任务都停下了。
2、shutdown() 和 awaitTermination() 的区别
shutdown()
后,不能再提交新的任务进去;但是 awaitTermination()
后,可以继续提交。
awaitTermination()
是阻塞的,返回结果是线程池是否已停止(true/false);shutdown()
不阻塞。
三、总结
1、优雅的关闭,用 shutdown()
2、想立马关闭,并得到未执行任务列表,用shutdownNow()
3、优雅的关闭,并允许关闭声明后新任务能提交,用 awaitTermination()
4、关闭功能 【从强到弱】 依次是:shuntdownNow() > shutdown() > awaitTermination()
四、关闭线程池使用方式
shutdown和awaitTermination为接口ExecutorService定义的两个方法,一般情况配合使用来关闭线程池。
如下:
案例1:
案例2:
4.ThreadPoolTaskExecutor
基本使用方式都和ThreadPoolExecutor一样。
4.1使用
1.配置一个线程池
2.注入ThreadPoolTaskExecutor类进行使用,线程池就用我们上面新建的线程池
3.使用
如下,使用方式都和之前的一样
4.2 和ThreadPoolExecutor的区别
后续补充...
5.CountDownLatch
他经常用于监听某些初始化操作,等初始化操作执行完毕后,通知主线程继续工作。
假设一个场景,例如你要使用Controller的某个方法,但是必须先实例化两个service,等两个service实例化好才能进行调用Controller的方法
5.1 概念
CountDownLatch允许一个或者多个线程去等待其他线程完成操作。
CountDownLatch接收一个int型参数,表示要等待的工作线程的个数。
当然也不一定是多线程,在单线程中可以用这个int型参数表示多个操作步骤。
CountDownLatch countDownLatch = new CountDownLatch(2); //构造函数的参数表示调用await的线程要等待多少个线程的唤醒
如下, CountDownLatch(2),需要有两个线程来唤醒才可以,这里是直接new了两个线程,当然也可用线程池。
5.2 方法
CountDownLatch 提供了一些方法:
5.3使用方式
一般这个会配合多线程使用,用来阻塞线程,等待多线程执行完毕,然后继续执行,具体请参考第6点:线程池具体使用方式中的案例2.
6.线程池具体使用
6.1 ExecutorService使用案例1
这个方法的作用是 通过多线程 把文件里面的数据更新到数据库。
比如这里有500个文件,一个文件里面有5000条数据,需要更新到数据库,这里files就是文件数组,里面有500个文件,所以这里循环次数就是文件的个数,每循环一次就是让线程池去处理文件里面的所有数据。
代码如下:
@Test
public void update() throws InterruptedException {
long start = System.currentTimeMillis();
this.before();
File dirFile = new File("D:\\code_data\\split\\");
final File[] files = dirFile.listFiles();
if (files == null) {
return;
}
ExecutorService e = Executors.newFixedThreadPool(2900);
for (File file : files) {
e.execute(() -> {
long timeMillis = System.currentTimeMillis();
List<Object[]> batchArgs = new ArrayList<>(6000);
try (BufferedReader br = new BufferedReader(new FileReader(file))) {
String s;
while ((s = br.readLine()) != null) {
final String[] split = s.split(",");
if (split.length == 2) {
batchArgs.add(new Object[]{split[1], split[0]});
}
}
} catch (IOException ex) {
ex.printStackTrace();
}
int[] updates = resJdbcTemplate.batchUpdate(SQL, batchArgs);
if (file.delete()) {
log.info("处理完成{},耗时:{}ms,总共{}条,成功更新{}条", file.getName(), System.currentTimeMillis() - timeMillis, batchArgs.size(), Arrays.stream(updates).filter(i -> i > 0).count());
}
});
}
e.shutdown();
if (!e.awaitTermination(1, TimeUnit.DAYS)) {
log.error("等待失败");
}
this.after();
log.info("执行完成,共耗时:{}ms", System.currentTimeMillis() - start);
}
6.2 ExecutorService使用案例2
将需要处理的数据放到 resList里面,定义如下:
中间有很多拼接sql的过程,这里省略...
6.3 ExecutorService使用案例3
如下,其实是一样的使用,只是在处理数据的时候方式有一点不同,实际上底层逻辑是一样的。
6.4
CountDownLatch使用案例
假如有三个线程,线程2和线程3执行完,线程1才会被唤醒继续执行
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(2); //构造函数的参数表示调用await的线程要等待多少个线程的唤醒
Thread t1 = new Thread(new Runnable(){
@Override
public void run() {
System.out.println("线程1进入准备执行状态。。。");
try {
countDownLatch.await();
System.out.println("线程1被唤醒,执行。。。");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread t2 = new Thread(new Runnable(){
@Override
public void run() {
System.out.println("线程2执行。。。。");
try {
Thread.sleep(3000);
countDownLatch.countDown();
System.out.println("线程2唤醒线程1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread t3 = new Thread(new Runnable(){
@Override
public void run() {
System.out.println("线程3执行。。。。");
try {
Thread.sleep(2000);
countDownLatch.countDown();
System.out.println("线程3唤醒线程1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
t3.start();
}
执行结果:可以看到,线程1确实是最后才执行的
线程1进入准备执行状态。。。
线程3执行。。。。
线程2执行。。。。
线程3唤醒线程1
线程2唤醒线程1
线程1被唤醒,执行。。。
那么我们假设如果将线程3的唤醒操作给注释掉,线程1还能执行吗。
//countDownLatch.countDown();
//System.out.println("线程3唤醒线程1");
执行结果:我们可以看到最后线程1并没有执行,而是一直在等待状态。这是因为初始化CountDownLatch的时候,传的值是2,意思就是说线程1必须被两个线程唤醒才能继续执行,现在线程3的唤醒操作被注释掉了,所以线程1只能一直在等待状态。
以线程1只能一直在等待状态。
线程1进入准备执行状态。。。
线程2执行。。。。
线程3执行。。。。
线程2唤醒线程1
那么我们现在又试一下其他不变,只是将CountDownLatch初始化的参数由2改为1
CountDownLatch countDownLatch = new CountDownLatch(1);
执行结果:我们可以看到,当有一个线程调用唤醒操作(这里是线程3),线程1就继续执行了。
线程3执行。。。。
线程1进入准备执行状态。。。
线程2执行。。。。
线程3唤醒线程1
线程1被唤醒,执行。。。
线程2唤醒线程1