Java中的线程池
池化技术
线程池,连接池….
创建和销毁十分的耗费资源
池化技术:事先准备好一些资源,要使用就去池子里拿,用完了要还到池子里去(重复利用)
- 实现复用,降低频繁创建,销毁资源的消耗
- 提高响应速度,提交任务时不用等待创建线程就能立即执行
- 方便管理,控制并发数
线程池的创建
通过
ThreadPoolExecutor
创建线程池public ThreadPoolExecutor(int corePoolSize,//核心线程数量(初始化可以用多少数量的线程) int maximumPoolSize,//最大线程数量 long keepAliveTime,//超时时间 TimeUnit unit,//时间单位 BlockingQueue<Runnable> workQueue,//阻塞队列 ThreadFactory threadFactory,//创建线程工厂 RejectedExecutionHandler handler//拒绝策略 ) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue ** null || threadFactory ** null || handler ** null) throw new NullPointerException(); this.acc = System.getSecurityManager() ** null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
- corePoolSize核心线程数: 线程池基本线程数量
- 最开始提交任务到线程池时, 创建新工作线程执行任务 如果线程池种的的线程数未到达核心线程数,即使有空闲的核心线程也会创建
- 可以看成银行一开始可以使用自助ATM机的个数
- maximumPoolSize最大线程数 线程池允许创建的最大线程数
- 工作队列已满,但是以创建线程数小于最大线程数就会创建新工作线程执行任务
- 如果工作队列采用无界 这个参数就无效了
- 可以看成人爆满时,银行开放最大的ATM机数
- keepAliveTime超时时间和unit时间单位 工作线程空闲后存活的时间
- 经过这个时间就会关闭多余的线程
- 如果任务多,执行任务时间短,增大时间 可以提高线程利用率
- 可以看成ATM机一定时间没用就关掉这个ATM机
- workQueue任务队列 保持等待执行任务的阻塞队列
- 核心线程数都在工作时在阻塞队列中排队
- 可以看成没有ATM机使用时去休息区排队等待
阻塞队列 描述 ArrayBlockingQueue 数组有界阻塞队列,FIFO排序 LinkedBlockingQueue 链表有界阻塞队列,FIFO排序,吞吐量高于ArrayBlockingQueue Executors.newFixedThreadPool()
采用SynchronousQueue 不存储元素的阻塞队列,插入元素就必须等待移除元素,否则一直阻塞,吞吐量高于linkedBlockingQueue Executors.newCachedThreadPool()
采用PriorityQueue 优先级无界阻塞队列,按优先级排序 优先级高的任务先执行,优先级低的任务可能永远不能执行
- threadFactory线程工厂 线程工厂可以给每个创建出来的线程设置有意义的名字
- 一般不需要修改
- RejectedExecutionHandler拒绝策略 当线程不够用,阻塞队列也满了的时候的一种策略(有4种策略)
策略 作用 AbortPolicy 默认 抛出异常 CallerRunsPolicy 只用调用者线程来执行任务 DiscardPolicy 不处理,丢弃 DiscardOldestPolicy 丢弃队列中最近一个任务,并立即执行当前任务 将线程池看成银行
自定义创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor( //核心线程数:2 最大线程数:5 空闲超时:100s 2, 5, 100, TimeUnit.SECONDS, //容量为4的链表阻塞队列 new LinkedBlockingQueue<>(4), //默认线程工厂 Executors.defaultThreadFactory(), //默认拒绝策略 直接抛出异常 new ThreadPoolExecutor.AbortPolicy() );
向线程池提交任务
提交任务有2个方法:
executor(),submit()
executor()
threadPool.execute(()->{ System.out.println("use execute method"); });
无返回值,不知道任务是否被执行
submit()
Future<?> result = threadPool.submit(() -> { System.out.println("use submit method"); }); try { //isDone已完成返回true System.out.println(result.isDone());//false //isCancelled完全前取消返回true System.out.println(result.isCancelled());//false System.out.println(result.get());//null System.out.println(result.isDone());//true } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } /* false false use submit method null true */
有返回值Future,可通过Future的get方法获取返回值,get可以阻塞当前线程直到完成任务,可以使用带参数的get方法进行超时等待
关闭线程池
关闭线程池的2种方法:
shutdown(),shutdownNow()
它们的原理都是: 遍历线程池中的工作线程,逐个中断(调用线程的
interrupt
方法) 无法响应中断的任务可能永远无法终止shutdown() 任务会被执行完
- 将线程池状态设置为SHUTDOWN
- 中断所有未正在执行任务的线程
shutdownNow() 任务不一定会执行完
- 将线程池状态设置为STOP
- 尝试停止所有正在执行或暂停任务的线程
- 返回等待执行任务列表
通常使用shutdown(),如果任务不一定要执行完可以使用shutdown()
threadPool.shutdown(); List<Runnable> runnables = threadPool.shutdownNow();
合理配置线程池
线程池的最大线程数量该如何设置?
调优
通过
Runtime.getRuntime().availableProcessors()
可以获得CPU个数 (核数)设 N 为 CPU核数
- CPU密集型
- 设置最大线程数量为 N+1
- 设置少量线程,避免切换线程,提高CPU利用率,提高吞吐量
- IO密集型 或 依赖特殊资源型(数据库连接)
- 设置最大线程数量为 N*2
- 因为可能需要等待IO操作或依赖其他特殊资源(需要等待),CPU会空闲,设置大量线程,避免CPU空闲
线程池的监控
threadPool.getTaskCount();//获得要执行的任务数量 threadPool.getCompletedTaskCount();//获得已完成的任务数量 threadPool.getLargestPoolSize();//获得线程池曾经创建过最大线程数量 threadPool.getPoolSize();//获得线程池线程数量 threadPool.getActiveCount();//获得正在执行任务的线程数量
可以重写
ThreadPoolExecutor
中的这些方法进行监控//每个任务执行前 protected void beforeExecute(Thread t, Runnable r) { } //每个任务执行后 protected void afterExecute(Runnable r, Throwable t) { } //线程池关闭时 protected void terminated() { }
使用ThreadPoolExecetor中的钩子方法进行监控
public class HookMethodThreadPool extends ThreadPoolExecutor { /** * 处理任务数 */ private AtomicInteger taskCount = new AtomicInteger(); /** * 开始执行时间 */ private AtomicLong startTime = new AtomicLong(); /** * 总执行时间 */ private AtomicLong time = new AtomicLong(); public HookMethodThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } /** * 钩子方法 每个任务执行前 * @param t * @param r */ @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println(String.format("Thread: %s start executor task %s",t,r)); startTime.set(System.nanoTime()); } /** * 钩子方法 每个任务执行完成后 * @param r * @param t */ @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println(String.format("Throwable: %s end executor task %s",t,r)); taskCount.incrementAndGet(); time.addAndGet(System.nanoTime() - startTime.get()); } /** * 终止时执行 */ @Override protected void terminated() { System.out.println("executor task count:"+taskCount.get()); System.out.println("executor task consume time:"+time.get()+"ns"); } }
client
public class Client { public static void main(String[] args) { HookMethodThreadPool threadPool = new HookMethodThreadPool(5, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5)); for (int i = 0; i < 10; i++) { final int a = i; threadPool.submit(() -> { System.out.println(Thread.currentThread().getName() + "executor task" + a); }); } threadPool.shutdown(); } }
线程池实现原理
当有新任务到达线程池时,线程池会有一系列的流程处理任务
处理流程
判断线程池中的核心线程是否都在执行任务
如果不是则创建工作线程来执行任务
否则进入下一流程
判断工作队列是否已满
如果没满则将任务放在工作队列中
否则进入下一流程
判断线程池是否已满(最大线程数都在工作+工作队列已满)
如果没有则创建工作线程来执行
否则交给拒绝(饱和)策略来处理任务
提交任务的情况
提交任务一共会有4种情况
- 如果运行线程数 < 核心线程数 则创建新工作线程执行任务 获取全局锁
- 如果运行线程数 ≥ 核心线程数 则将任务放入工作队列中
- 如果核心线程数 < 运行线程数 < 最大线程数 且 工作队列已满 则创建新线程执行任务 获取全局锁
- 如果创建新线程会超过最大线程数 则使用拒绝策略处理任务
在执行
executor()或submit()
时,几乎都会执行步骤2 不用获取全局锁
public void execute(Runnable command) { //runnable对象为空抛出异常 if (command == null) throw new NullPointerException(); int c = ctl.get(); //如果工作线程 < 核心线程数 则创建新工作线程执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //工作线程数 > 核心线程数 任务放在工作队列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) ** 0) addWorker(null, false); } //采用拒绝策略 else if (!addWorker(command, false)) reject(command); } final void reject(Runnable command) { handler.rejectedExecution(command, this); }
通过源码可以看到工作线程执行完任务后会循环的去工作队列中获取任务执行