文章目录
参考文献:
- Java线程池实现原理及其在美团业务中的实践
- java并发编程的艺术
1. 概述
好处
使用线程池可以带来一系列好处:
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
2. 线程池核心设计与实现
Java中的线程池核心实现类是ThreadPoolExecutor,我们首先来看一下ThreadPoolExecutor的UML类图:
ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。
ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。
- Java线程池实现原理及其在美团业务中的实践
- 线程池addWorker中的类似goto的retry
- 关于源码解析:彻底理解Java线程池原理篇
- 线程池的实现原理以及使用流程
- 三种队列和4种拒绝策略和4中线程池工厂:
(1)有界、无界队列对ThreadPoolExcutor执行的影响
(2)一次Java线程池误用引发的血案和总结 - 线程池中任务队列SynchronousQueue的理解
说明:使用SynchronousQueue队列通常配合长度为无界队列(长度为最大值),来一个线程创建一个线程。
Alibaba命名规范的解释:
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor的方式,这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明: Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool : 允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM 。
2) CachedThreadPool 和 ScheduledThreadPool : 允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM 。
附:线程池创建通用工具:
/**
* Utility to create a {@link ThreadPoolExecutor}.
*
* @param corePoolSize - min threads in the pool, even if idle
* @param maxPoolSize - max threads in the pool
* @param keepAliveTimeSecs - max seconds beyond which excess idle threads
* will be terminated
* @param queue - the queue to use for holding tasks before they are executed.
* @param threadNamePrefix - name prefix for the pool threads
* @param runRejectedExec - when true, rejected tasks from
* ThreadPoolExecutor are run in the context of calling thread
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize,
int maxPoolSize, long keepAliveTimeSecs, BlockingQueue<Runnable> queue,
String threadNamePrefix, boolean runRejectedExec) {
Preconditions.checkArgument(corePoolSize > 0);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maxPoolSize, keepAliveTimeSecs, TimeUnit.SECONDS,
queue, new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName(threadNamePrefix + threadIndex.getAndIncrement());
return t;
}
});
if (runRejectedExec) {
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor
.CallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable runnable,
ThreadPoolExecutor e) {
LOG.info(threadNamePrefix + " task is rejected by " +
"ThreadPoolExecutor. Executing it in current thread.");
// will run in the current thread
super.rejectedExecution(runnable, e);
}
});
}
return threadPoolExecutor;
}
附:线程池测试用例:
public class ThreadPoolExecutorTest implements Runnable{
public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize,
int maxPoolSize, long keepAliveTimeSecs, BlockingQueue<Runnable> queue,
String threadNamePrefix, boolean runRejectedExec) {
Preconditions.checkArgument(corePoolSize > 0);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maxPoolSize, keepAliveTimeSecs, TimeUnit.SECONDS,
queue, new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName(threadNamePrefix + threadIndex.getAndIncrement());
return t;
}
});
if (runRejectedExec) {
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor
.CallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable runnable,
ThreadPoolExecutor e) {
super.rejectedExecution(runnable, e);
}
});
}
return threadPoolExecutor;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new RuntimeException();
}
public static void main(String[] args) {
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(5);
// BlockingQueue<Runnable> queue = new SynchronousQueue();
ThreadPoolExecutor threadPool = getThreadPoolExecutor(
2,
10,
60,
queue,
"Thread-", false);
threadPool.allowCoreThreadTimeOut(true);
for (int i = 0; i < 16 ; i++) {
threadPool.submit(new Thread(new ThreadPoolExecutorTest()));
System.out.println("线程池中活跃的线程数: " + threadPool.getPoolSize());
}
threadPool.shutdown();
try {//等待直到所有任务完成
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//结果
线程池中活跃的线程数: 1
Thread-0
线程池中活跃的线程数: 2
线程池中活跃的线程数: 2
线程池中活跃的线程数: 2
线程池中活跃的线程数: 2
线程池中活跃的线程数: 2
线程池中活跃的线程数: 2
Thread-1
线程池中活跃的线程数: 3
Thread-2
线程池中活跃的线程数: 4
Thread-3
线程池中活跃的线程数: 5
Thread-4
线程池中活跃的线程数: 6
Thread-5
线程池中活跃的线程数: 7
Thread-6
线程池中活跃的线程数: 8
Thread-7
线程池中活跃的线程数: 9
Thread-8
线程池中活跃的线程数: 10
Thread-9
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@6576fe71 rejected from java.util.concurrent.ThreadPoolExecutor@76fb509a[Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at threads.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:67)
Process finished with exit code 1