一、基础知识
因为运行在虚拟机上JAVA不能直接操作硬件,因此并不能真正的开启线程,而是去调用本地方法C++的方法进行开启的。
一.线程和进程
进程:进程可以理解为一个程序,一个正在进行的过程或者说任务,由CPU执行
线程:是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。
二.并发和并行
并发:CPU为一核,模拟出多条线程,进行快速交替
并行:多核CPU,多个线程可以同时执行。
二、JAVA线程的生命周期
一.wait和sleep的区别
1、来自不同的类
wait=>Object类
sleep=>Thread类
2、锁的释放
wait无论带不带参数会释放锁,sleep线程进行休眠但是不释放锁。
3、使用范围
wait只能用在同步代码块中
sleep都可以。
三、Lock锁
一.传统方式:同步代码块(Synchronized)
package JUC;
public class SaleTicketDemo {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(() -> {
for (int i = 0; i < 60; i++) {
ticket.sale();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 60; i++) {
ticket.sale();
}
}, "B").start();
}
}
class Ticket {
private int number = 50;
public synchronized void sale() {
if (number > 0) {
System.out.println(Thread.currentThread().getName().toString() + "卖出了" + number--);
}
}
}
二.Lock
1、公平锁
先来后到
Lock lock = new ReentrantLock(true);
2、非公平锁(默认)
可以插队,非公平锁其实比较合理。
Lock lock = new ReentrantLock(fales);
Lock lock = new ReentrantLock();
3、DEMO
package JUC;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class SaleTicketDemo2 {
public static void main(String[] args) {
Ticket2 ticket = new Ticket2();
new Thread(() -> {
for (int i = 0; i < 60; i++) {
ticket.sale();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 60; i++) {
ticket.sale();
}
}, "B").start();
}
}
class Ticket2 {
private int number = 50;
Lock lock = new ReentrantLock(false);
public void sale() {
//加锁
lock.lock();
try {
if (number > 0) {
System.out.println(Thread.currentThread().getName().toString() + "卖出了" + number--);
}
} finally {
//解锁
lock.unlock();
}
}
}
三.Lock和Synchronize区别
1、两者所处层面不同
synchronized是Java中的一个关键字,当我们调用它时会从在虚拟机指令层面加锁,关键字为monitorenter和monitorexit
Lock是Java中的一个接口,它有许多的实现类来为它提供各种功能,加锁的关键代码为大体为Lock和unLock;
2、获锁方式
synchronized可对实例方法、静态方法和代码块加锁,相对应的,加锁前需要获得实例对象的锁或类对象的锁或指定对象的锁。说到底就是要先获得对象的监视器(即对象的锁)然后才能够进行相关操作。
Lock的使用离不开它的实现类AQS,而它的加锁并不是针对对象的,而是针对当前线程的,并且AQS中有一个原子类state来进行加锁次数的计数
3、获锁失败
使用关键字synchronized加锁的程序中,获锁失败的对象会被加入到一个虚拟的等待队列中被阻塞,直到锁被释放;1.6以后加入了自旋操作
使用Lock加锁的程序中,获锁失败的线程会被自动加入到AQS的等待队列中进行自旋,自旋的同时再尝试去获取锁,等到自旋到一定次数并且获锁操作未成功,线程就会被阻塞
4、偏向或重入
synchronized中叫做偏向锁
当线程访问同步块时,会使用 CAS 将线程 ID 更新到锁对象的 Mark Word 中,如果更新成功则获得偏向锁,并且之后每次进入这个对象锁相关的同步块时都不需要再次获取锁了。
Lock中叫做重入锁
AQS的实现类ReentrantLock实现了重入的机制,即若线程a已经获得了锁,a再次请求锁时则会判断a是否持正有锁,然后会将原子值state+1来实现重入的计数操作
5、Lock独有的队列
condition队列是AQS中的一个Lock的子接口的内部现类,它一般会和ReentrantLock一起使用来满足除了加锁和解锁以外的一些附加条件,比如对线程的分组和临界数量的判断(阻塞队列)
6、解锁操作
synchronized:不能指定解锁操作,执行完代码块的对象会自动释放锁
Lock:可调用ulock方法去释放锁比synchronized更灵活
四、生产者和消费者
一.传统方式:同步代码块(Synchronized)
package JUC;
public class A {
public static void main(String[] args) {
Data d = new Data();
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
d.add();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
d.delete();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
d.add();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
d.add();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
class Data {
private int number = 0;
public synchronized void add() throws InterruptedException {
//if判断会有虚假唤醒
while (number != 0) {
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "add" + number);
this.notifyAll();
}
public synchronized void delete() throws InterruptedException {
//if判断会有虚假唤醒
while (number == 0) {
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "delete" + number);
this.notifyAll();
}
}
二.Condition(监听器)
Condition为接口类型,它将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。可以通过await(),signal()来休眠/唤醒线程。
1、Demo1(随机通知)
package JUC;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class B {
public static void main(String[] args) {
DataB d = new DataB();
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
d.add();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
d.delete();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
d.add();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
d.add();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
class DataB {
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void add() throws InterruptedException {
try {
lock.lock();
//if判断会有虚假唤醒
while (number != 0) {
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName() + "add" + number);
condition.signalAll();
} finally {
lock.unlock();
}
}
public void delete() throws InterruptedException {
try {
lock.lock();
//if判断会有虚假唤醒
while (number == 0) {
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + "delete" + number);
condition.signalAll();
} finally {
lock.unlock();
}
}
}
2、Demo2(精确唤醒)
package JUC;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class C {
public static void main(String[] args) {
DataC d = new DataC();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
d.printA();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
d.printB();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
d.printC();
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
d.printD();
}
}, "D").start();
}
}
class DataC {
Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
Condition condition4 = lock.newCondition();
int single = 0;
public void printA() {
try {
lock.lock();
while (single != 0) {
condition1.await();
}
System.out.println("A");
single = 1;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB() {
try {
lock.lock();
while (single != 1) {
condition2.await();
}
System.out.println("B");
single = 2;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC() {
try {
lock.lock();
while (single != 2) {
condition3.await();
}
System.out.println("C");
single = 3;
condition4.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printD() {
try {
lock.lock();
while (single != 3) {
condition4.await();
}
System.out.println("D");
single = 0;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
三.8锁现象(关于锁的八个问题)
1、标准情况下(都不进行休眠或等待),两个线程先打印那个?
package JUC;
import java.util.concurrent.TimeUnit;
public class Test1 {
public static void main(String[] args) {
Computer computer = new Computer();
new Thread(() -> {
computer.play();
}, "A").start();
try {//休眠
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
computer.code();
}, "B").start();
}
}
class Computer {
//synchronized锁的是方法的调用者
public synchronized void play() {
System.out.println("玩游戏");
}
public synchronized void code() {
System.out.println("写代码");
}
}
2、一个线程在执行的过程中休眠了(SLEEP),另一个不休眠,两个线程先打印那个?
package JUC;
import java.util.concurrent.TimeUnit;
public class Test1 {
public static void main(String[] args) {
Computer computer = new Computer();
new Thread(() -> {
computer.play();
}, "A").start();
new Thread(() -> {
computer.code();
}, "B").start();
}
}
class Computer {
//synchronized锁的是方法的调用者
public synchronized void play() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("玩游戏");
}
public synchronized void code() {
System.out.println("写代码");
}
}
1和2其实是同一个原理,synchronized锁的是方法的调用者,而方法的调用者只有一个,就是new出来的computer,因此谁先拿到锁,就先执行谁。
3、一个类一个非加锁方法和一个普通加锁方法,先执行哪个。
顺序执行,普通方法不受锁的影响。
package JUC;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) {
Computer2 computer = new Computer2();
new Thread(() -> {
computer.code();
}, "B").start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
computer.play();
}, "A").start();
}
}
class Computer2 {
//synchronized锁的是方法的调用者
public synchronized void play() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("玩游戏");
}
//普通方法,不受锁的影响。
public void code() {
System.out.println("写代码");
}
}
4、一个类两个普通加锁方法,有两个对象,怎么执行。
两个对象是两把锁,锁之间不影响,按代码顺序执行。
package JUC;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) {
//两个对象
Computer2 computer1 = new Computer2();
Computer2 computer2 = new Computer2();
new Thread(() -> {
computer1.code();
}, "B").start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
computer2.play();
}, "A").start();
}
}
class Computer2 {
//synchronized锁的是方法的调用者
public synchronized void play() {
System.out.println("玩游戏");
}
//普通方法
public synchronized void code() {
System.out.println("写代码");
}
}
5、两个静态加锁方法,有1个对象,怎么执行。
静态方法(初始化就进行加载了)加锁的时候,锁的是Computer3 这个类,这个类全局唯一,所以两个方法均受锁的影响,按锁的顺序执行。
package JUC;
public class Test3 {
public static void main(String[] args) {
new Thread(() -> {
Computer3 .code();
}, "B").start();
new Thread(() -> {
Computer3 .play();
}, "A").start();
}
}
//只有唯一的一个class对象。
class Computer3 {
//synchronized锁的是方法的调用者
public static synchronized void play() {
System.out.println("玩游戏");
}
//普通方法
public static synchronized void code() {
System.out.println("写代码");
}
}
6、两个静态加锁方法,怎么执行。
这段不写代码了,因为类中有静态方法,全局唯一,所以同样是代码的顺序执行。
7、一个普通加锁方法,一个静态加锁方法,有一个对象,怎么执行。
静态方法锁的是类,非静态方法锁的是调用者,不是同一把锁。锁之间不影响,按代码顺序执行。
package JUC;
public class Test3 {
public static void main(String[] args) {
//1个对象
Computer3 computer = new Computer3();
new Thread(() -> {
computer.code();
}, "B").start();
new Thread(() -> {
computer.play();
}, "A").start();
}
}
class Computer3 {
//synchronized锁的是方法的调用者
public synchronized void play() {
System.out.println("玩游戏");
}
//锁的是Computer3这个类
public static synchronized void code() {
System.out.println("写代码");
}
}
8、一个普通加锁方法,一个静态加锁方法,有二个对象,怎么执行。
静态方法锁的是类,非静态方法锁的是调用者,创建了两个对象,分属不同把锁。锁之间不影响,按代码顺序执行。
锁的本质分为三种
1、类锁(STATIC):全局唯一
2、方法对象锁(普通sychanized方法):有多少个对象有多少把锁
3、没有(普通方法):没有锁
五、不安全的集合类
一.List不安全
这段代码在单线程之下是安全的,在多线程之下会报错(如果不报错,多执行几次)
package connection;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
public class ListTest {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 0; i < 20; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 10));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}
此处代码会有并发修改异常:ConcurrentModificationException
1、使用Veactor
List<String> list=new Vector<>();
2、使用集合类的转换
List<String> list = Collections.synchronizedList(new ArrayList<>());
3、CopyOnwriteArrayList
List<String> list = new CopyOnWriteArrayList<>();
4、性能比较
写入操作:vector>=synchronizedList>CopyOnwriteArrayList
读取操作:vector=synchronizedList=CopyOnwriteArrayList几乎没差别
遍历操作:CopyOnwriteArrayList>synchronizedList>vector
二.SET不安全
1、使用集合类的转换
Set<String> list2 = Collections.synchronizedSet(new ArrayList());
2、CopyOnwriteArraySet
Set<String> lis3=new CopyOnwriteArraySet();
三.Map不安全
1、并发的hashmap
Map<String,String> map=new ConcurrentHashMap();
六、Callable
有两种创建线程的方法-一种是通过创建Thread类,另一种是通过使用Runnable创建线程。但是,Runnable缺少的一项功能是,当线程终止时(即run()完成时),无法使线程返回结果。为了支持此功能,Java中提供了Callable接口。
特征
- 为了实现Runnable,需要实现不返回任何内容的run()方法,而对于Callable,需要实现在完成时返回结果的call()方法。请注意,不能使用Callable创建线程,只能使用Runnable创建线程。
- 另一个区别是call()方法可以引发异常,而run()则不能。
- 为实现Callable而必须重写call方法。
一.DEMO
package JUC;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallAble {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//第三步 创建Callable实现类的对象
MyCallble myCallble = new MyCallble();
//第四步 创建FutureTask对象,并将Callable实现类的对象作为参数传递到FutureTask的构造器中
FutureTask<String> futureTask = new FutureTask<>(myCallble);
//第五步 创建Thread对象并将FutureTask对象作为参数传递Thread的构造器中
Thread thread = new Thread(futureTask);
//第六步 调用Thread的start方法
thread.start();
//第七步 调用FutureTask中的get()方法获取返回值
String str = futureTask.get(); //get方法(阻塞式的)后的代码都是在分线程执行完毕后再开始执行的
System.out.println(str);
for (int i = 0; i < 50; i++)
{
System.out.println(Thread.currentThread().getName() + " " + i);
}
}
}
// 第一步 创建一个类并实现Callable接口
class MyCallble implements Callable<String> {
// 第二步 重写call方法并返回数据
@Override
public String call() throws Exception {
for (int i = 0; i < 50; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
return "aaa";
}
}
七、辅助工具类/修饰符
一.CountDownLatch (减法计数器)
package JUC;
import java.util.concurrent.CountDownLatch;
/**
* 减法计数器
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//总计为8
CountDownLatch countDownLatch = new CountDownLatch(8);
for (int i = 1; i <= 8; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "out");
//减一
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
//必须等待计数器归零,才可以执行下面的代码。
countDownLatch.await();//看好方法是await不是wait
System.out.println("close");
}
}
二.CyclicBarrier(加法计数器)
package JUC;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* 召唤神龙
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
//等待下面的执行完了,才会执行后面的方法。
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("召唤神龙");
});
for (int i = 0; i < 7; i++) {
final int tmp = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "收集了" + tmp);
try {
//内部加1
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
三.Semaphore(信号量)
1、DEMO
package JUC;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
//意思是有三个信道
Semaphore semaphore = new Semaphore(3);
//共有12个线程,先是获取到该信道,之后释放掉,释放之后,才能被其他的线程占用。
for (int i = 1; i <= 12; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "获取到");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "释放");
} catch (InterruptedException e) {
e.printStackTrace();
}
finally {
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
2、Semaphore semaphore = new Semaphore(0)
默认是阻塞的,需要先进行释放,在进行获取。
我们提供了一个类:
public class Foo {
public void first() { print("first"); }
public void second() { print("second"); }
public void third() { print("third"); }
}
三个不同的线程 A、B、C 将会共用一个 Foo 实例。
一个将会调用 first() 方法
一个将会调用 second() 方法
还有一个将会调用 third() 方法
请设计修改程序,以确保 second() 方法在 first() 方法之后被执行,third() 方法在 second() 方法之后被执行。
示例 1:
输入: [1,2,3]
输出: "firstsecondthird"
解释:
有三个线程会被异步启动。
输入 [1,2,3] 表示线程 A 将会调用 first() 方法,线程 B 将会调用 second() 方法,线程 C 将会调用 third() 方法。
正确的输出是 "firstsecondthird"。
示例 2:
输入: [1,3,2]
输出: "firstsecondthird"
解释:
输入 [1,3,2] 表示线程 A 将会调用 first() 方法,线程 B 将会调用 third() 方法,线程 C 将会调用 second() 方法。
正确的输出是 "firstsecondthird"。
来源:力扣(LeetCode)
链接:https://leetcode-cn.com/problems/print-in-order
著作权归领扣网络所有。商业转载请联系官方授权,非商业转载请注明出处。
import java.util.concurrent.Semaphore;
class Foo {
Semaphore semaphore12, semaphore23;
public Foo() {
//初始的允许请求均设为0,均为阻塞。
semaphore12 = new Semaphore(0);
semaphore23 = new Semaphore(0);
}
public void first(Runnable printFirst) throws InterruptedException {
// printFirst.run() outputs "first". Do not change or remove this line.
printFirst.run();
//释放一个12的信号量
semaphore12.release();
}
public void second(Runnable printSecond) throws InterruptedException {
//获取一个12的信号量,没有则阻塞
semaphore12.acquire();
// printSecond.run() outputs "second". Do not change or remove this line.
printSecond.run();
//释放一个23的信号量
semaphore23.release();
}
public void third(Runnable printThird) throws InterruptedException {
//获取一个23的信号量,没有则阻塞
semaphore23.acquire();
// printThird.run() outputs "third". Do not change or remove this line.
printThird.run();
}
}
四.volatile(特征修饰符,保证数据都存在公共内存中,不进行重排,但是不保证线程是否进行切换。)
Java并发编程–01 | 可见性、原子性和有序性问题:并发编程Bug的源头
1、volatile的机制
java虚拟机提供的轻量级的同步机制
1.保证可见性(多核之间所属缓存不可见,会导致可见性问题)
单核时代
在单核时代,所有的线程都是在一颗 CPU 上执行,CPU 缓存与内存的数据一致性容易解决。因为所有线程都是操作同一个 CPU 的缓存,一个线程对缓存的写,对另外一个线程来说一定是可见的。线程 A 和线程 B 都是操作同一个 CPU 里面的缓存,所以线程 A 更新了变量 V 的值,那么线程 B 之后再访问变量 V,得到的一定是 V 的最新值(线程 A 写过的值)。
多核时代
多核时代,每颗 CPU 都有自己的缓存,这时 CPU 缓存与内存的数据一致性就没那么容易解决了,当多个线程在不同的 CPU 上执行时,这些线程操作的是不同的 CPU 缓存。线程 A 操作的是 CPU-1 上的缓存,而线程 B 操作的是 CPU-2 上的缓存,这个时候线程 A 对变量 V 的操作对于线程 B 而言就不具备可见性了。
因为不同CPU的缓存之间相互不可见,就会导致可见性的问题。
volatile是数据留存在公共内存区,就不会有可见性问题。
2.不保证原子性
由于 IO 太慢,早期的操作系统就发明了多进程,即便在单核的 CPU 上我们也可以一边听着歌,一边写 代码。
操作系统允许某个进程执行一小段时间,例如 50 毫秒,过了 50 毫秒操作系统就会重新选择一个进程来执行(我们称为“任务切换”),这个 50 毫秒称为“时间片”,任务切换的时机大多数是在时间片结束的时候。
原子性指的是线程在执行任务的过程中,不可以被打扰或者中断,要么同时成功,要么同时失败。
3.禁止指令重排(有序性)
有序性指的是程序按照代码的先后顺序执行。编译器为了优化性能,有时候会改变程序中语句的先后顺序,例如程序中:“a=6;b=7;”编译器优化后可能变成“b=7;a=6;”,在这个例子中,编译器调整了语句的顺序,但是不影响程序的最终结果。不过有时候编译器及解释器的优化可能导致意想不到的 Bug。
八、读写锁(ReadWriteLock)
写:只能被一个线程占用
读:可以被多个线程占用
package JUC;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCacheReadWriteLock myCache = new MyCacheReadWriteLock();
for (int i = 0; i < 10; i++) {
final int tmp = i;
new Thread(() -> {
myCache.put(tmp + "", tmp);
}, String.valueOf(i)).start();
}
for (int i = 0; i < 10; i++) {
final int tmp = i;
new Thread(() -> {
myCache.get(tmp + "");
}, String.valueOf(i)).start();
}
}
}
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
//写的时候只能执行一个操作,写入完毕之后,才能写入其他的。
public void put(String key, Object vaule) {
System.out.println(Thread.currentThread().getName() + "写入开始" + key);
map.put(key, vaule);
System.out.println(Thread.currentThread().getName() + "写入完毕" + key);
}
public void get(String key) {
System.out.println(Thread.currentThread().getName() + "写入开始" + key);
Object O = map.get(key);
System.out.println(Thread.currentThread().getName() + "写入完毕" + key);
}
}
class MyCachelock {
private volatile Map<String, Object> map = new HashMap<>();
Lock lock = new ReentrantLock();
//写的时候只能执行一个操作,写入完毕之后,才能写入其他的。
public void put(String key, Object vaule) {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "写入开始" + key);
map.put(key, vaule);
System.out.println(Thread.currentThread().getName() + "写入完毕" + key);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void get(String key) {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "写入开始" + key);
Object O = map.get(key);
System.out.println(Thread.currentThread().getName() + "写入完毕" + key);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
class MyCacheReadWriteLock {
private volatile Map<String, Object> map = new HashMap<>();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//写的时候只能执行一个操作,写入完毕之后,才能写入其他的。
public void put(String key, Object vaule) {
try {
//枷锁
readWriteLock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + "写入开始" + key);
map.put(key, vaule);
System.out.println(Thread.currentThread().getName() + "写入完毕" + key);
} catch (Exception e) {
e.printStackTrace();
} finally {
//解锁
readWriteLock.writeLock().unlock();
}
}
public void get(String key) {
try {
readWriteLock.readLock().lock();
System.out.println(Thread.currentThread().getName() + "写入开始" + key);
Object O = map.get(key);
System.out.println(Thread.currentThread().getName() + "写入完毕" + key);
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
九、阻塞队列(BlockQueue)
一.定义及demo
如果队列是满的,需要等有空位才能进。
如果队列是空的,需要等有数据才能取。
方式 | 包含异常 | 返回值 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add( java.lang.IllegalStateException: Queue full) | offer(false) | put | offer |
删除 | remove(java.util.NoSuchElementException) | poll(null) | take | poll |
判断队列首尾 | element(NoSuchElementException) | peek |
package JUC;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockQueueDemo {
public static void main(String[] args) throws InterruptedException {
// test1();
test4();
}
public static void test1() {
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
//add有异常
//remove有异常
// System.out.println(blockingQueue.add("a"));
// System.out.println(blockingQueue.add("b"));
// System.out.println(blockingQueue.add("c"));
// // System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.add("d"));
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
blockingQueue.element();//查看阻塞队列队首
}
public static void test2() {
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
// System.out.println(blockingQueue.offer("1"));
// System.out.println(blockingQueue.offer("2"));
// System.out.println(blockingQueue.offer("3"));
// System.out.println(blockingQueue.offer("4"));//返回false
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());//返回null
blockingQueue.peek();
}
public static void test3() throws InterruptedException {
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
//阻塞添加,添加不进去会一直等待
blockingQueue.put("A");
blockingQueue.put("B");
blockingQueue.put("C");
blockingQueue.put("D");
//阻塞获取,或得不到会一直等待
blockingQueue.take();
}
public static void test4() throws InterruptedException {
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
//阻塞添加,添加不进去会等待一定时间,等待超时会自动退出。
blockingQueue.offer("A");
blockingQueue.offer("B");
blockingQueue.offer("C");
//添加元素,等待时间,等待时间单位。
blockingQueue.offer("D", 2, TimeUnit.SECONDS);
//阻塞获取,或得不到会一直等待
blockingQueue.poll(2, TimeUnit.SECONDS);
}
}
二.家族成员
1、ArrayBlockingQueue
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
2、LinkedBlockingQueue
基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
3、DelayQueue
DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
4、PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
5、同步队列队列(synchronousQueue)
进入一个元素,必须等取出来之后,才能再进去。
//非同步代码,不等于ArrayBlockingQueue<>(1)
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1);
package JUC;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class synchronousQueueDemo {
public static void main(String[] args) {
test1();
}
public static void test1() {
SynchronousQueue synchronousQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "put1");
synchronousQueue.put(1);
System.out.println(Thread.currentThread().getName() + "put2");
synchronousQueue.put(2);
System.out.println(Thread.currentThread().getName() + "put3");
synchronousQueue.put(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(synchronousQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(synchronousQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
;
}
}
十、线程池
一.线程池的优点
- 降低资源消耗:线程池通常会维护一些线程(数量为corePoolSize),这些线程被重复使用来执行不同的任务,任务完成后不会销毁。在待处理任务量很大的时候,通过对线程资源的复用,避免了线程的频繁创建与销毁,从而降低了系统资源消耗。
- 提高响应速度:由于线程池维护了一批 alive状态的线程,当任务到达时,不需要再创建线程,而是直接由这些线程去执行任务,从而减少了任务的等待时间。
- 提高线程的可管理性:使用线程池可以对线程进行统一的分配,调优和监控。
二.Executors
Java通过Executors提供五种线程池,分别为:
- newCachedThreadPool创建一个可伸缩线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
- newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
- newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
- newSingleThreadExecutor创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
- newWorkStealingPool,JDK8新增的线程池
1、newSingleThreadExecutor(单线程化的线程池)
package JUC;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo {
public static void main(String[] args) {
//r创建一个单线程化的线程池
ExecutorService threadPool1 = Executors.newSingleThreadExecutor();
try {
for (int i = 0; i < 10; i++) {
threadPool1.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
} finally {
//关闭
threadPool1.shutdown();
}
}
}
2、newScheduledThreadPool (固定大小线程池,周期定时执行)
package JUC;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo {
public static void main(String[] args) {
ScheduledExecutorService threadPool1 = Executors.newScheduledThreadPool (5);
//定时执行,延迟十秒,每秒执行一次。
threadPool1.scheduleAtFixedRate(()->{
System.out.println(Thread.currentThread().getName()+":你好啊");
},0,1, TimeUnit.SECONDS);
//定时执行,延迟十秒,每秒执行一次。
threadPool1.scheduleAtFixedRate(()->{
System.out.println(Thread.currentThread().getName()+":你也好啊");
},0,1, TimeUnit.SECONDS);
}
}
3、newFixedThreadPool (固定大小线程池,阻塞等待)
package JUC;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo {
public static void main(String[] args) {
//r创建一个单线程化的线程池
ExecutorService threadPool1 = Executors.newFixedThreadPool (5);
try {
for (int i = 0; i < 20; i++) {
threadPool1.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
} finally {
//关闭
threadPool1.shutdown();
}
}
}
4、newCachedThreadPool(可伸缩线程池)
package JUC;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo {
public static void main(String[] args) {
//r创建一个单线程化的线程池
ExecutorService threadPool1 = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 20; i++) {
threadPool1.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
} finally {
//关闭
threadPool1.shutdown();
}
}
}
三.线程池参数
public ThreadPoolExecutor(int corePoolSize,//核心线程池大小
int maximumPoolSize,//最大线程池大小
long keepAliveTime,//超时时间,没有调用就会释放
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler) //拒绝策略
线程池不允许使用Executors去创建,因为默认值过大可能会导致资源耗尽。使用ThreadPoolExecutor创建线程池
1、现实场景
银行柜台的总数: int maximumPoolSize,//最大核心线程池大小
正在营业的柜台数:int corePoolSize,//核心线程池大小
等待区域: BlockingQueue workQueue,//阻塞队列
最长办理时间: long keepAliveTime,//线程耻中的线程等待时间,没有调用就会释放该线程,比如柜台超过一个小时没有业务,柜员休息去了。
2、DEMO
package JUC;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
4,//核心线程数
8,//最大线程数
1,//线程等待时间
TimeUnit.MINUTES,//等待时间单位
new ArrayBlockingQueue<>(8),//默认大小为8的阻塞队列
Executors.defaultThreadFactory(),//默认线程池工厂
new ThreadPoolExecutor.AbortPolicy());//拒绝策略
);
}
}
3、四种拒绝策略
1.AbortPolicy(候客区也满了,来了新客户抛出异常,基本只用这种)
线程池默认的拒绝策略,在任务不能再提交的时候,抛出异常,及时反馈程序运行状态。如果是比较关键的业务,推荐使用此拒绝策略,这样子在系统不能承载更大的并发量的时候,能够及时的通过异常发现。
2.DiscardPolicy(候客区也满了,来了新客户通通拒绝,不抛出异常)
ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。如果线程队列已满,则后续提交的任务都会被丢弃,阻塞队列队尾删除元素,且是静默丢弃。
3.DiscardOldestPolicy(将队列首的任务删除,再尝试添加队列。)
这个除了特定场景使用,其他很少用。
4.CallerRunsPolicy(候客区满了,让创建新线程的线程去执行,比如main)
让主线程去执行新来的客户。
3、如何设定最大线程池大小(伪标准)
1.CPU 密集型计算
对于 CPU 密集型的计算场景,理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。
2.I/O 密集型计算
最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]