Java线程池基础&ThreadLocal传递线程池&任务监控

目录

1、Jdk 线程池介绍

1.1 固定大小线程池(FixedThreadPool)

1.2 单线程线程池(SingleThreadPool)

1.3 可缓存的线程池(CachedThreadPool)

1.4 可调度线程池(ScheduledThreadPool)

1.5 工作窃取线程池(WorkStealingThreadPool)

1.6 自定义线程池(TheadPool)

2、Jdk 线程池遇到的两个问题

2.1 ThreadLocal无法传递

2.1.1  ThreadLocal 内存泄露 

2.1.2 FastThreadLocal为什么那么快?

2.1 线程池无法实现监控

2.1.1 原理

2.1.2 代码实现

1、Jdk 线程池介绍

1.1 固定大小线程池(FixedThreadPool)

 /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

FixedThreadPool线程池会创建一个线程数量固定的线程池,任务队列为无界队列,在业务高峰期间时,很可能出现工作线程数量不够,而任务队列大小超过堆空间,出现内存溢出的现象,所以这种线程池在业务量比较大,特别是有业务高峰期的业务场景下,
不太适合使用。


1.2 单线程线程池(SingleThreadPool)
 

    /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newFixedThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

SingleThreadPool线程池会创建一个单线程的线程池,任务队列为无界队列,它是一种特殊的FixedThreadPool,会面临和FixedThreadPool一样的风险,在业务高峰期时,堆内存被耗尽的问题。 

 

1.3 可缓存的线程池(CachedThreadPool)
 

   /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPool线程池会创建一个线程池,线程池初始化容量为0,最大为Integer.MAX_VALUE,工作队列大小为0,同步队列,线程存活时间为60s,这种线程池,也是适用于业务并发量不大的场景,没有业务高峰时的业务场景,当业务高峰期到来时,
JVM会创建大量的线程用于执行任务,这个时候,系统会因为创建过多的线程而出现StackOverFlow异常,导致无法创建新的线程。


1.4 可调度线程池(ScheduledThreadPool)

 


    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

下面对比一下,两个比较重要的调度方法:scheduleAtFixedRate()、scheduleWithFixedDelay()

//当任务执行时间比较长,而调度比较频繁时,出现多个任务交叉执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

 

//不会出现任务交叉现象,但是可能因为任务执行时间比较长,任务真正执行的时间,落后于调度时间
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

1.5 工作窃取线程池(WorkStealingThreadPool)

 /**
     * Creates a thread pool that maintains enough threads to support
     * the given parallelism level, and may use multiple queues to
     * reduce contention. The parallelism level corresponds to the
     * maximum number of threads actively engaged in, or available to
     * engage in, task processing. The actual number of threads may
     * grow and shrink dynamically. A work-stealing pool makes no
     * guarantees about the order in which submitted tasks are
     * executed.
     *
     * @param parallelism the targeted parallelism level
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code parallelism <= 0}
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

 

工作窃取原理是多个线程并行执行任务,每个线程都有一个自己独有的工作队列,当自己队列中的工作任务都执行完成之后,就会窃取其他线程工作队列中的任务,这个称之为“工作窃取”模式。工作窃取线程池一般用于任务分片合并的场景,与Hadoop MapReduce有生命的地方。

1.6 自定义线程池(TheadPool)

 


    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

一般我们不使用Excecutors提供的几种线程池,我们使用自定义的参数,使用自己定义的线程池,最重要是设置 corePoolSize、workQueue 大小,如何设置合适的大小,需要根据具体的业务场景,需要对任务io时间与cpu时间进行统计,然后根据公式计算:

线程数 = CPU利用率 * CPU核心数 * (1+线程等待时间/线程CPU时间 )     (0 < CPU占有率 <=1)
任务工作队列大小=预分配任务工作队列内存/平均每个任务大小

请参考文档:如何估算corePoolSize与workQueueSize
                      Java 线程池的异常处理机制

 

2、Jdk 线程池遇到的两个问题

2.1 ThreadLocal无法传递

我们知道ThreadLocal只能在同一线程上下文中传递变量,主要Call线程想将变量传递到线程池中,就无法实现了,为了解决这个问题,我们可以利用阿里的TTL组件来实现ThreadLocal线程穿越问题。
具体参考:TTL 组件原理

public class RunnableIdContext {
	private static TransmittableThreadLocal<String> RUNNABLE_ID = new TransmittableThreadLocal<String>();

	public static void set(String runnableId) {
		RUNNABLE_ID.set(runnableId);
	}

	public static String get() {
		return RUNNABLE_ID.get();
	}
}
@Test
	public void test_with_ttlRunnable() throws InterruptedException {

		ExecutorService executorService = executorServiceComponent.getExecutorService(true);
		RunnableIdContext.set("测试");
		executorService.submit(new Runnable() {
			@Override
			public void run() {
				log.info("执行任务测试1");

			}
		});
		TimeUnit.MINUTES.sleep(10);
	}
public static ExecutorService createWithTtl(int corePoolSize, int maximumPoolSize, long keepAliveTime,
			TimeUnit timeUnit, BlockingQueue<Runnable> workQueue, MyThreadFactory threadFactory,
			RejectedExecutionHandler handler) {
		ExecutorService executorService = TtlExecutors.getTtlExecutorService(
				create(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, workQueue, threadFactory, handler));
		return executorService;

	}

 

2.1.1  ThreadLocal 内存泄露 

2.1.2 FastThreadLocal为什么那么快?

参考文档:惊:FastThreadLocal吞吐量居然是ThreadLocal的3倍!!!

2.1 线程池无法实现监控


2.1.1 原理

 

 

2.1.2 代码实现

 

	@Test
	public void test_RunnableWrapper() throws InterruptedException {

		ExecutorService executorService = executorServiceComponent.getExecutorService(false);
		String id = "李桥-测试";
		RunnableWrapper runnableWrapper = new RunnableWrapper(new Runnable() {
			@Override
			public void run() {
				log.info("执行任务测试-test_RunnableWrapper");

			}
		}, id);

		executorService.submit(runnableWrapper);
		TimeUnit.MINUTES.sleep(10);
	}

 

package com.tianyou.homework.notify.component;

import java.lang.reflect.Field;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.alibaba.ttl.threadpool.TtlExecutors;
import com.tianyou.homework.notify.component.ExecutorServiceComponent.MyThreadFactory;
import com.tianyou.homework.notify.context.RunnableIdContext;
import com.tianyou.homework.notify.pojo.CallableWrapper;
import com.tianyou.homework.notify.pojo.RunnableWrapper;

import lombok.extern.slf4j.Slf4j;

/**
 * 
 * @ClassName: MonitoredThreadPoolExecutor
 * @author: QiaoLi
 * @date: Oct 13, 2020 5:10:48 PM
 */
@Slf4j
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {

	private ConcurrentHashMap<String, Date> runnableStartDateMap;
	/** key=runnableHashCode value=runnableId */
	private ConcurrentHashMap<String, String> runnableHashCodeMap;
	private String poolName;

	private MonitoredThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit,
			BlockingQueue<Runnable> workQueue, MyThreadFactory threadFactory, RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, workQueue, threadFactory, handler);
		this.poolName = threadFactory.getPoolName();
		this.runnableStartDateMap = new ConcurrentHashMap<String, Date>(maximumPoolSize * 10);
		this.runnableHashCodeMap = new ConcurrentHashMap<String, String>(maximumPoolSize * 10);

	}

	public static ExecutorService create(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit,
			BlockingQueue<Runnable> workQueue, MyThreadFactory threadFactory, RejectedExecutionHandler handler) {
		MonitoredThreadPoolExecutor monitoredThreadPoolExecutor = new MonitoredThreadPoolExecutor(corePoolSize,
				maximumPoolSize, keepAliveTime, timeUnit, workQueue, threadFactory, handler);
		return monitoredThreadPoolExecutor;

	}

	public static ExecutorService createWithTtl(int corePoolSize, int maximumPoolSize, long keepAliveTime,
			TimeUnit timeUnit, BlockingQueue<Runnable> workQueue, MyThreadFactory threadFactory,
			RejectedExecutionHandler handler) {
		ExecutorService executorService = TtlExecutors.getTtlExecutorService(
				create(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, workQueue, threadFactory, handler));
		return executorService;

	}

	public String getPoolName() {
		return poolName;
	}

	@Override
	public void shutdown() {
		log.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName,
				this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
		super.shutdown();
	}

	@Override
	public List<Runnable> shutdownNow() {
		log.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
				this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
		return super.shutdownNow();
	}

	@Override
	protected void beforeExecute(Thread t, Runnable r) {
		String runnableHashCode = getRunnableHashCode(r);
		String runnableId = getRunnableIdForBeforeExcecuteing(r);
		this.runnableHashCodeMap.put(runnableHashCode, runnableId);
		runnableStartDateMap.put(runnableHashCode, new Date());
		log.info("runnableId:{},,runnableHashCode:{} 开始执行! ", runnableId,runnableHashCode);
	}

	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		String runnableHashCode = getRunnableHashCode(r);
		String runnableId  = runnableHashCodeMap.get(runnableHashCode);
		Date startDate = runnableStartDateMap.get(runnableHashCode);
		
		Date finishDate = new Date();
		long elapsedTimeInMs = finishDate.getTime() - startDate.getTime();
		log.info(
				"RunnableId:{},runnableHashCode:{},结束执行, "
						+ "执行耗时: {} ms, ActiveThreadNum: {}, CurrentPoolSize: {}, CorePoolSize: {},MaximumPoolSize: {},LargestPoolSize: {},Task-Completed: {},"
						+ "Task-In-Queue: {}, Task-Total: {},  Thead-KeepAliveTime: {}s",
				runnableId,runnableHashCode, elapsedTimeInMs, this.getActiveCount(), this.getPoolSize(), this.getCorePoolSize(),
				this.getMaximumPoolSize(), this.getLargestPoolSize(), this.getCompletedTaskCount(),
				this.getQueue().size(), this.getTaskCount(), this.getKeepAliveTime(TimeUnit.SECONDS));
	}

	@SuppressWarnings("rawtypes")
	private String getRunnableIdForBeforeExcecuteing(Runnable r) {
		String runnableId = "未设置";
		FutureTask futureTask = (FutureTask) r;
		try {
			Field field = futureTask.getClass().getDeclaredField("callable");
			field.setAccessible(true);
			Object callableObject = field.get(futureTask);
			if (callableObject instanceof RunnableWrapper) {
				RunnableWrapper runnableWrapper = (RunnableWrapper) callableObject;
				runnableId = runnableWrapper.getId();
			} else if (callableObject instanceof CallableWrapper) {
				CallableWrapper callableWrapper = (CallableWrapper) callableObject;
				runnableId = callableWrapper.getId();
			}
		} catch (Exception e) {
			log.error("执行getRunnableId()异常", e);
		}

		return runnableId;

	}

	private String getRunnableHashCode(Runnable r) {
		String runnableId = null;
		String runnableIdPrefix = RunnableIdContext.get();
		if (null != runnableIdPrefix) {
			runnableId = runnableIdPrefix + "-" + String.valueOf(r.hashCode());
		} else {
			runnableId = String.valueOf(r.hashCode());
		}

		return runnableId;

	}
}

 

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

李桥s2008100262

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值