目录
1.4 可调度线程池(ScheduledThreadPool)
1.5 工作窃取线程池(WorkStealingThreadPool)
1、Jdk 线程池介绍
1.1 固定大小线程池(FixedThreadPool)
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool线程池会创建一个线程数量固定的线程池,任务队列为无界队列,在业务高峰期间时,很可能出现工作线程数量不够,而任务队列大小超过堆空间,出现内存溢出的现象,所以这种线程池在业务量比较大,特别是有业务高峰期的业务场景下,
不太适合使用。
1.2 单线程线程池(SingleThreadPool)
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadPool线程池会创建一个单线程的线程池,任务队列为无界队列,它是一种特殊的FixedThreadPool,会面临和FixedThreadPool一样的风险,在业务高峰期时,堆内存被耗尽的问题。
1.3 可缓存的线程池(CachedThreadPool)
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool线程池会创建一个线程池,线程池初始化容量为0,最大为Integer.MAX_VALUE,工作队列大小为0,同步队列,线程存活时间为60s,这种线程池,也是适用于业务并发量不大的场景,没有业务高峰时的业务场景,当业务高峰期到来时,
JVM会创建大量的线程用于执行任务,这个时候,系统会因为创建过多的线程而出现StackOverFlow异常,导致无法创建新的线程。
1.4 可调度线程池(ScheduledThreadPool)
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
下面对比一下,两个比较重要的调度方法:scheduleAtFixedRate()、scheduleWithFixedDelay()
//当任务执行时间比较长,而调度比较频繁时,出现多个任务交叉执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//不会出现任务交叉现象,但是可能因为任务执行时间比较长,任务真正执行的时间,落后于调度时间
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
1.5 工作窃取线程池(WorkStealingThreadPool)
/**
* Creates a thread pool that maintains enough threads to support
* the given parallelism level, and may use multiple queues to
* reduce contention. The parallelism level corresponds to the
* maximum number of threads actively engaged in, or available to
* engage in, task processing. The actual number of threads may
* grow and shrink dynamically. A work-stealing pool makes no
* guarantees about the order in which submitted tasks are
* executed.
*
* @param parallelism the targeted parallelism level
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code parallelism <= 0}
* @since 1.8
*/
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
工作窃取原理是多个线程并行执行任务,每个线程都有一个自己独有的工作队列,当自己队列中的工作任务都执行完成之后,就会窃取其他线程工作队列中的任务,这个称之为“工作窃取”模式。工作窃取线程池一般用于任务分片合并的场景,与Hadoop MapReduce有生命的地方。
1.6 自定义线程池(TheadPool)
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
一般我们不使用Excecutors提供的几种线程池,我们使用自定义的参数,使用自己定义的线程池,最重要是设置 corePoolSize、workQueue 大小,如何设置合适的大小,需要根据具体的业务场景,需要对任务io时间与cpu时间进行统计,然后根据公式计算:
线程数 = CPU利用率 * CPU核心数 * (1+线程等待时间/线程CPU时间 ) (0 < CPU占有率 <=1)
任务工作队列大小=预分配任务工作队列内存/平均每个任务大小
请参考文档:如何估算corePoolSize与workQueueSize
Java 线程池的异常处理机制
2、Jdk 线程池遇到的两个问题
2.1 ThreadLocal无法传递
我们知道ThreadLocal只能在同一线程上下文中传递变量,主要Call线程想将变量传递到线程池中,就无法实现了,为了解决这个问题,我们可以利用阿里的TTL组件来实现ThreadLocal线程穿越问题。
具体参考:TTL 组件原理
public class RunnableIdContext {
private static TransmittableThreadLocal<String> RUNNABLE_ID = new TransmittableThreadLocal<String>();
public static void set(String runnableId) {
RUNNABLE_ID.set(runnableId);
}
public static String get() {
return RUNNABLE_ID.get();
}
}
@Test
public void test_with_ttlRunnable() throws InterruptedException {
ExecutorService executorService = executorServiceComponent.getExecutorService(true);
RunnableIdContext.set("测试");
executorService.submit(new Runnable() {
@Override
public void run() {
log.info("执行任务测试1");
}
});
TimeUnit.MINUTES.sleep(10);
}
public static ExecutorService createWithTtl(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit timeUnit, BlockingQueue<Runnable> workQueue, MyThreadFactory threadFactory,
RejectedExecutionHandler handler) {
ExecutorService executorService = TtlExecutors.getTtlExecutorService(
create(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, workQueue, threadFactory, handler));
return executorService;
}
2.1.1 ThreadLocal 内存泄露
2.1.2 FastThreadLocal为什么那么快?
参考文档:惊:FastThreadLocal吞吐量居然是ThreadLocal的3倍!!!
2.1 线程池无法实现监控
2.1.1 原理
2.1.2 代码实现
@Test
public void test_RunnableWrapper() throws InterruptedException {
ExecutorService executorService = executorServiceComponent.getExecutorService(false);
String id = "李桥-测试";
RunnableWrapper runnableWrapper = new RunnableWrapper(new Runnable() {
@Override
public void run() {
log.info("执行任务测试-test_RunnableWrapper");
}
}, id);
executorService.submit(runnableWrapper);
TimeUnit.MINUTES.sleep(10);
}
package com.tianyou.homework.notify.component;
import java.lang.reflect.Field;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.alibaba.ttl.threadpool.TtlExecutors;
import com.tianyou.homework.notify.component.ExecutorServiceComponent.MyThreadFactory;
import com.tianyou.homework.notify.context.RunnableIdContext;
import com.tianyou.homework.notify.pojo.CallableWrapper;
import com.tianyou.homework.notify.pojo.RunnableWrapper;
import lombok.extern.slf4j.Slf4j;
/**
*
* @ClassName: MonitoredThreadPoolExecutor
* @author: QiaoLi
* @date: Oct 13, 2020 5:10:48 PM
*/
@Slf4j
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
private ConcurrentHashMap<String, Date> runnableStartDateMap;
/** key=runnableHashCode value=runnableId */
private ConcurrentHashMap<String, String> runnableHashCodeMap;
private String poolName;
private MonitoredThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit,
BlockingQueue<Runnable> workQueue, MyThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, workQueue, threadFactory, handler);
this.poolName = threadFactory.getPoolName();
this.runnableStartDateMap = new ConcurrentHashMap<String, Date>(maximumPoolSize * 10);
this.runnableHashCodeMap = new ConcurrentHashMap<String, String>(maximumPoolSize * 10);
}
public static ExecutorService create(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit,
BlockingQueue<Runnable> workQueue, MyThreadFactory threadFactory, RejectedExecutionHandler handler) {
MonitoredThreadPoolExecutor monitoredThreadPoolExecutor = new MonitoredThreadPoolExecutor(corePoolSize,
maximumPoolSize, keepAliveTime, timeUnit, workQueue, threadFactory, handler);
return monitoredThreadPoolExecutor;
}
public static ExecutorService createWithTtl(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit timeUnit, BlockingQueue<Runnable> workQueue, MyThreadFactory threadFactory,
RejectedExecutionHandler handler) {
ExecutorService executorService = TtlExecutors.getTtlExecutorService(
create(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, workQueue, threadFactory, handler));
return executorService;
}
public String getPoolName() {
return poolName;
}
@Override
public void shutdown() {
log.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName,
this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
log.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
return super.shutdownNow();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
String runnableHashCode = getRunnableHashCode(r);
String runnableId = getRunnableIdForBeforeExcecuteing(r);
this.runnableHashCodeMap.put(runnableHashCode, runnableId);
runnableStartDateMap.put(runnableHashCode, new Date());
log.info("runnableId:{},,runnableHashCode:{} 开始执行! ", runnableId,runnableHashCode);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
String runnableHashCode = getRunnableHashCode(r);
String runnableId = runnableHashCodeMap.get(runnableHashCode);
Date startDate = runnableStartDateMap.get(runnableHashCode);
Date finishDate = new Date();
long elapsedTimeInMs = finishDate.getTime() - startDate.getTime();
log.info(
"RunnableId:{},runnableHashCode:{},结束执行, "
+ "执行耗时: {} ms, ActiveThreadNum: {}, CurrentPoolSize: {}, CorePoolSize: {},MaximumPoolSize: {},LargestPoolSize: {},Task-Completed: {},"
+ "Task-In-Queue: {}, Task-Total: {}, Thead-KeepAliveTime: {}s",
runnableId,runnableHashCode, elapsedTimeInMs, this.getActiveCount(), this.getPoolSize(), this.getCorePoolSize(),
this.getMaximumPoolSize(), this.getLargestPoolSize(), this.getCompletedTaskCount(),
this.getQueue().size(), this.getTaskCount(), this.getKeepAliveTime(TimeUnit.SECONDS));
}
@SuppressWarnings("rawtypes")
private String getRunnableIdForBeforeExcecuteing(Runnable r) {
String runnableId = "未设置";
FutureTask futureTask = (FutureTask) r;
try {
Field field = futureTask.getClass().getDeclaredField("callable");
field.setAccessible(true);
Object callableObject = field.get(futureTask);
if (callableObject instanceof RunnableWrapper) {
RunnableWrapper runnableWrapper = (RunnableWrapper) callableObject;
runnableId = runnableWrapper.getId();
} else if (callableObject instanceof CallableWrapper) {
CallableWrapper callableWrapper = (CallableWrapper) callableObject;
runnableId = callableWrapper.getId();
}
} catch (Exception e) {
log.error("执行getRunnableId()异常", e);
}
return runnableId;
}
private String getRunnableHashCode(Runnable r) {
String runnableId = null;
String runnableIdPrefix = RunnableIdContext.get();
if (null != runnableIdPrefix) {
runnableId = runnableIdPrefix + "-" + String.valueOf(r.hashCode());
} else {
runnableId = String.valueOf(r.hashCode());
}
return runnableId;
}
}