目录
一个任务分成多个任务,然后汇总计算,不用fork/join怎么实现?
多线程基础知识
方法 | 作用 |
currentThread() | 返回代码段正在被哪个线程调用的信息 |
isAlive() | 判断当前的线程是否处于活动状态 |
sleep() | 在指定的毫秒数内让当前正在执行的线程休眠,这个正在执行的线程是指this.currentThread返回的线程 |
yield() | 放弃当前的cpu资源,将它让给其他的任务去占用cpu。 |
线程的优先级
java中线程的优先级分为1~10这10个等级。
最小优先级是1
最大优先级是10
正常优先级是5
优先级具有继承特性:若A线程启动B线程,则B线程的优先级与A是一样的。
高优先级的总是大部分先执行完,但不代表高优先级的线程全部先执行完。
守护线程
java线程分为两种:
- 用户线程
- 守护Daemon线程
守护线程是一种特殊的线程,当进程中不存在非守护线程了,则守护线程自动销毁。典型的守护线程是垃圾回收线程。
停止线程
关于停止线程的内容笔者单独写了一篇博客:[java] 停止线程
对象及变量的访问
关于synchronized关键字的内容,我单独写了一篇博客:[java] synchronized关键字总结
线程间通信
不使用等待通知机制,可使用共享资源+轮询的方式实现通信。这样做效率很低。
采用的机制是:等待通知机制,即wait/notify机制。
方法 | 作用 |
wait() | 使得当前执行代码的线程进行等待,该方法是Object类提供的方法,该方法用来将当前线程置于预执行队列中。
并且在wait()所在的代码行处停止执行,直到接到通知或被中断。 在调用wait之前,线程必须获得该对象的对象级别锁,即只能在同步方法或同步块中调用wait方法。
在执行wait方法后,当前线程锁释放。 |
notify() | 方法notify也要在同步方法或者同步块当中调用,即在调用wait之前,线程必须获得该对象的对象级别锁
该方法用来通知那些等待该对象锁的其他线程,如果有多个线程等待,则由线程规划器随机挑选一个呈wait状态的线程,对其发出notify通知,并使它获取该对象的对象锁
|
wait(long) | 等待某一时间内是否有线程对锁进行唤醒。如果超过这个时间则自动唤醒 |
join() | 等待线程对象销毁:方法join的作用是使join所属的线程对象x正常执行run()方法中的任务,而使当前线程无限期阻塞,等待线程x销毁后再继续执行线程z后面的代码。
join方法内部实现是使用wait方法实现的 |
join(long) VS sleep(long) | join内部实现使用wait方法,所以join方法具有释放锁的特点。而Thread.sleep方法却不释放锁。 |
问题:Java 中 sleep 方法和 wait 方法的区别?
虽然两者都是用来暂停当前运行的线程,但是 sleep() 实际上只是短暂停顿,因为它不会释放锁,而 wait() 意味着条件等待,这就是为什么该方法要释放锁,因为只有这样,其他等待的线程才能在满足条件时获取到该锁。由于join内部实现是使用wait方法,因此问join和sleep区别也是一样的
一句话总结:wait使线程停止允许,而notify使停止的线程继续运行。调用notify()方法一次只随机通知一个线程进行唤醒。若要唤醒全部线程,可以使用notifyAll()方法。
ThreadLocal类
关于ThreadLocal类我单独写了一篇博客分析:[java] ThreadLocal类解析
Java线程池
Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用,为我们在开发中处理线程的问题提供了非常大的帮助。
线程池定义
在一个应用程序中,我们需要多次使用线程,也就意味着,我们需要多次创建并销毁线程。而创建并销毁线程的过程势必会消耗内存。而在Java中,内存资源是及其宝贵的,所以,我们就提出了线程池的概念。
线程池:Java中开辟出了一种管理线程的概念,这个概念叫做线程池,从概念以及应用场景中,我们可以看出,线程池的好处,就是可以方便的管理线程,也可以减少内存的消耗。
线程池的作用
服务器经常出现处理大量单个任务处理的时间很短而请求的数目却是巨大的请求。
构建服务器应用程序的一个最简单的办法是:每当一个请求到达就创建一个新线程,然后在新线程中为请求服务。
实际上,对于原型开发这种方法工作得很好,但如果试图部署以这种方式运行的服务器应用程序,那么这种方法的严重不足就很明显。关键问题在于:为每个请求创建一个新线程的开销很大。为每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源要比花在处理实际的用户请求的时间和资源更多。使用线程池:
1.减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
2.可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而让服务器宕机。
3.提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行
4.提高线程的可管理性
几个重要的类:
ExecutorService | 真正的线程池接口。 |
ScheduledExecutorService | 能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。 |
ThreadPoolExecutor | ExecutorService的默认实现。 |
ScheduledThreadPoolExecutor | 继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。 |
java如何创建线程池&线程池的参数详解
Java中已经提供了创建线程池的一个类:Executor
而我们创建时,一般使用它的子类:ThreadPoolExecutor.
这个类提供了一个构造方法让我们创建线程池.
我们可以通过创建线程池的构造方法来了解线程池的参数,在ThreadPoolExecutor的构造方法当中,参数最多也是最重要的一个如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
牢记住这个构造方法,就能把握住java线程池的大部分东西。下图是一个分析:
一个一个来分析:
- corePoolSize(线程池的基本大小):corePoolSize,即核心线程池大小,是在线程池里保持运行的线程的数量,即使这些线程是处于空闲状态的。当然,我们可以通过设置allowCoreThreadTimeOut这个变量来控制核心线程的存活时间。
当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。 - maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
- keepAliveTime(线程活动保持时间):当线程数大于核心线程数corePoolSize时,多余空闲线程在终止之前等待新任务的最长时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
- TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
- workQueue(任务队列):用于保存等待执行的任务的队列。此队列将仅保存 xecute方法提交的Runnable任务。可以选择以下几个阻塞队列。任务缓存队列及排队策略
队列名称 | 描述 |
ArrayBlockingQueue | 是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序 |
LinkedBlockingQueue | 一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。 |
SynchronousQueue | 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。 |
PriorityBlockingQueue | 一个具有优先级得无限阻塞队列。 |
- ThreadFactory:在线程池实例executor创建新线程时使用的工厂。用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助。
- RejectedExecutionHandler(饱和策略,也叫拒绝策略):在达到了线程边界和队列容量, 执行被阻止时使用的handler。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。有如下的拒绝策略:
策略名称 | 描述 |
AbortPolicy | 线程池默认的策略,如果元素添加到线程池失败,会抛出RejectedExecutionException异常 |
DiscardPolicy | 如果添加失败,则放弃,并且不会抛出任何异常 |
DiscardOldestPolicy | 如果添加到线程池失败,会将队列中最早添加的元素移除,再尝试添加,如果失败则按该策略不断重试 |
CallerRunsPolicy | 如果添加失败,那么主线程会自己调用执行器中的execute方法来执行改任务 |
自定义 | 如果觉得以上几种策略都不合适,那么可以自定义符合场景的拒绝策略。需要实现RejectedExecutionHandler接口,并将自己的逻辑写在rejectedExecution方法内。 |
ThreadPoolExecutor类中重要的成员变量如下:
private final BlockingQueue<Runnable> workQueue; //任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock(); //线程池的主要状态锁,对线程池状态(比如线程池大小
//、runState等)的改变都要使用这个锁
private final HashSet<Worker> workers = new HashSet<Worker>(); //用来存放工作集
private volatile long keepAliveTime; //线程存货时间
private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间
private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int maximumPoolSize; //线程池最大能容忍的线程数
private volatile int poolSize; //线程池中当前的线程数
private volatile RejectedExecutionHandler handler; //任务拒绝策略
private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程
private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数
private long completedTaskCount; //用来记录已经执行完毕的任务个数
可以看到所谓线程池的这个池子,即是由workQueue这个成员变量来存储的,它是一个Runnable类型的泛型.
关于排队方式的详细信息
1. 直接提交
工作队列的默认选项是 SynchronousQueue
,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
使用直接提交策略,即SynchronousQueue。
首先SynchronousQueue是无界的,也就是说他存数任务的能力是没有限制的,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加。在这里不是核心线程便是新创建的线程,但是我们试想一样下,下面的场景。
new ThreadPoolExecutor(
2, 3, 30, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new RecorderThreadFactory("CookieRecorderPool"),
new ThreadPoolExecutor.CallerRunsPolicy());
当核心线程已经有2个正在运行.
1. 此时继续来了一个任务(A),根据前面介绍的“如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。”,所以A被添加到queue中。
2. 又来了一个任务(B),且核心2个线程还没有忙完,OK,接下来首先尝试1中描述,但是由于使用的SynchronousQueue,所以一定无法加入进去
3. 此时便满足了上面提到的“如果无法将请求加入队列,则创建新的线程,除非创建此线程超出maximumPoolSize,在这种情况下,任务将被拒绝。”,所以必然会新建一个线程来运行这个任务。
4. 暂时还可以,但是如果这三个任务都还没完成,连续来了两个任务,第一个添加入queue中,后一个呢?queue中无法插入,而线程数达到了maximumPoolSize,所以只好执行异常策略了。
所以在使用SynchronousQueue通常要求maximumPoolSize是无界的,这样就可以避免上述情况发生(如果希望限制就直接使用有界队列)。对于使用SynchronousQueue的作用jdk中写的很清楚:此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。
什么意思?如果你的任务A1,A2有内部关联,A1需要先运行,那么先提交A1,再提交A2,当使用SynchronousQueue我们可以保证,A1必定先被执行,在A1么有被执行前,A2不可能添加入queue中
2. 无界队列
使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列
使用无界队列策略,即LinkedBlockingQueue
这个就拿newFixedThreadPool来说,根据前文提到的规则:如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。那么当任务继续增加,会发生什么呢?
如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
这里就很有意思了,可能会出现无法加入队列吗?不像SynchronousQueue那样有其自身的特点,对于无界队列来说,总是可以加入的(资源耗尽,当然另当别论)。换句说,永远也不会触发产生新的线程!corePoolSize大小的线程数会一直运行,忙完当前的,就从队列中拿任务开始运行。所以要防止任务疯长,比如任务运行的实行比较长,而添加任务的速度远远超过处理任务的时间,而且还不断增加,如果任务内存大一些,不一会儿就爆了
3. 有界队列
当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
有界队列,使用ArrayBlockingQueue。
这个是最为复杂的使用,所以JDK不推荐使用也有些道理。与上面的相比,最大的特点便是可以防止资源耗尽的情况发生。
new ThreadPoolExecutor(
2, 4, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2),
new RecorderThreadFactory("CookieRecorderPool"),
new ThreadPoolExecutor.CallerRunsPolicy());
假设,所有的任务都永远无法执行完。对于首先来的A,B来说直接运行,接下来,如果来了C,D,他们会被放到queue中,如果接下来再来E,F,则增加线程运行E,F。但是如果再来任务,队列无法再接受了,线程数也到达最大的限制了,所以就会使用拒绝策略来处理。
常用线程池
要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。并且在JDK帮助文档中,有如此一段话:
”强烈建议程序员使用较为方便的Executors
工厂方法Executors.newCachedThreadPool()
(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)
(固定大小线程池)Executors.newSingleThreadExecutor()
(单个后台线程),它们均为大多数使用场景预定义了设置“
p.s. Executors和Arrays类,Collections类一样,是以字母s结尾的jdk提供的非常实用的工具类。
newFixedThreadPool
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool是具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源,如果需要关闭,需要手动关闭。
newSingleThreadExecutor
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newCachedThreadPool
创建一个可缓存的线程池(无界线程池)。如果线程池的大小超过了处理任务所需要的线程,
那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,返回的也是ExecutorService这个父类的实例,只不过参数都已配置好了。对比图表如下:
线程池 | | | |
固定大小 | 传入参数nThreads决定 | 传入参数nThreads决定 | LinkedBlockingQueue |
单线程线程池 | 1 | 1 | LinkedBlockingQueue |
无界限(可缓存的线程池) | 0 | Integer.MAX_VALUE | SynchronousQueue |
实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。
另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。
一个任务分成多个任务,然后汇总计算,不用fork/join怎么实现?
首先来看fork/join是什么
其实Fork/Join处理一定程度的数据,核心建立于目前水平发展的多核计算机技术,它表达了一种充分利用资源的概念。在如今的计算机领域多核处理器早已是主流,而且并发编程讲究多线程处理问题,对计算机资源利用达到一个新的高度。Fork/Join框架是Java 7提供了的一个用于并行执行任务的框架, 大概是怎样子的呢,就是一个把大任务分割成若干个小任务,最终把每个小任务结果汇总起来得到大任务结果的框架。
关键点:要提高效率对任务的拆分是关键。
多线程编程与多核的关系
回答这个问题事实上是在说并发和并行的问题。
https://baijiahao.baidu.com/s?id=1609465299690233434&wfr=spider&for=pc