1 JUC简述

介绍

JUC实际上就是我们对于jdk中java.util.concurrent工具包的简称。这个包下的类都是和 **Java多线程开发 **相关的类。

查看JDK的官方文档如下所示:

image-20230307084205373

2 线程的创建方式

2.1 继承Thread类

2.1.1 代码演示

通过继承Thread类来创建并启动多线程的步骤如下:

1、定义一个类,让其继承Thread类

2、重写Thread类中的的run方法(该run方法的方法体就代表了线程需要完成的任务。因此把run方法称之为线程执行体)

3、创建Thread子类的实例,即创建了线程对象

4、调用线程对象的start()方法来启动该线程

线程类:

1
2
3
4
5
6
7
8
9
10
11
public class MyThread extends Thread {
/**
* 该run方法的方法体就代表了线程需要完成的任务。因此把run方法称之为线程执行体
*/
@Override
public void run() {
for(int x = 0 ; x < 100 ; x++) {
System.out.println(x);
}
}
}

测试类:

1
2
3
4
5
6
7
8
9
10
11
public class ThreadDemo1 {
public static void main(String[] args) {
// 创建Thread子类的实例,即创建了线程对象
MyThread t1 = new MyThread() ;
// 调用线程对象的start()方法来启动该线程
t1.start();
/**
* 调用start方法启动线程,那么此时jvm会分配一个线程去调用MyThread类中的run方法
*/
}
}

如果我们需要再开启一个线程,那么怎么办呢?

  1. 再次创建一个线程对象

  2. 调用start方法启动线程

1
2
3
4
5
6
7
8
9
10
11
12
public class ThreadDemo2 {
public static void main(String[] args) {
// 创建Thread子类的实例,即创建了线程对象
MyThread t1 = new MyThread() ;
MyThread t2 = new MyThread() ;

// 调用线程对象的start()方法来启动该线程
// 多线程的执行结果具有随机性。
t1.start();
t2.start();
}
}

经过测试,查看控制台打印结果;发现多次运行程序的结果并不相同,原因是因为CPU在多个线程间进行切换,随机执行导致的结果。

结论:多线程的执行具有随机性。

2.1.2 执行图解

多线程程序执行图解

当我们执行main方法时候,此时jvm会开启一个线程去执行,一个线程可以看做是程序的一条执行路径;当我们在main方法中又开启了两个线程,并且将其启动起来,那么此时在程序中会存在3条执行路径。它们之间彼此都是独立的,进行同时执行。

image-20230307090757304

单线程程序执行图解

对比我们之前所线程的单线程程序,那么程序在执行的时候,只存在一条执行路径

image-20230307090908614

2.1.3 start和run区别

start方法和run方法的区别如下:

1、调用start方法,jvm会开启一个线程(新的路径)去执行run方法中代码;

​ 调用run方法就是把run方法当做普通方法去执行,jvm不会去开启一个新线程来执行。

2、start方法只能调用一次,run方法可以调用多次。

代码演示1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ThreadDemo3 {

// 程序入口
public static void main(String[] args) {

// 创建一个线程对象
MyThread t1 = new MyThread() ;

// 启动线程
// t1.start();

// 调用run方法
t1.run(); // jvm不会去开启一个线程执行run方法

// 执行for循环
for(int x = 0 ; x < 100 ; x++) {
System.out.println("main ------>>> " + x);
}

}

}

调用t1对象的run方法时,程序的执行结果就是每一次都是等待run方法执行完毕了,才去执行main方法中的for循环;

当调用t1对象的start方法时,程序的执行结果就具有随机性,因此就说明直接调用run方法jvm就没有去开启一个线程去执行。

代码演示2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ThreadDemo4 {

// 程序入口
public static void main(String[] args) {

// 创建一个线程对象
MyThread t1 = new MyThread() ;

// 调用start方法启动线程
t1.start();
t1.start();

}

}

以上程序会出现如下错误

1
2
3
Exception in thread "main" java.lang.IllegalThreadStateException
at java.base/java.lang.Thread.start(Thread.java:794)
at com.atguigu.javase.thread.create.demo01.ThreadDemo4.main(ThreadDemo4.java:13)

代码演示3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ThreadDemo5 {

// 程序入口
public static void main(String[] args) {

// 创建一个线程对象
MyThread t1 = new MyThread() ;

// 调用run方法
t1.run();
t1.run();

}

}

以上程序不会报错,可以进行正常执行。

2.2 Thread类核心API

2.2.1 线程名称

1
2
3
public final String getName()							// 获取线程名称
public final void setName(String name) // 调用setName方法设置线程名称
public Thread(String name) // 构造方法设置线程名称

2.2.2 线程对象

获取当前正在执行该方法的线程对象:

1
public static native Thread currentThread();			// 获取当前执行该线程体的线程对象

2.2.3 线程休眠

1
2
public static void sleep(long time)        				// 让线程休眠指定的时间,单位为毫秒
TimeUnit.时间单位.sleep(时间值); // 使用时间枚举类让线程休眠

2.2.4 线程加入

把某一个线程加入到当前线程的执行流程中。

1
public final void join() throws InterruptedException	

当某一个程序执行流程中调用了其他线程的join()方法,调用线程暂停执行,直到被join()方法加入的join线程执行完成为止。

1
注: 需要在线程启动以后在进行加入才有效

比如现在存在两个线程,一个t1线程 , 一个是t2线程,当我们t1线程执行到某一个时刻的时候,我们在t1线程的执行流中添加了t2线程,那么此时t1线程暂停执行,直到t2线程执行完毕以后t1线程才可以继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ThreadDemo01 {
public static void main(String[] args) {
// 我们在主线程的执行流中加入其它线程
for(int x = 0 ; x < 100 ; x++) {
// 当x的值等于20的执行加入其它线程
if(x == 10) {

// 创建MyThread线程对象
MyThread myThread = new MyThread();
myThread.setName("atguigu-01");
myThread.start();

// 调用join方法进行线程加入
try {
myThread.join(); // 需要在线程启动以后在进行加入才有效
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 主线程执行代码
System.out.println(Thread.currentThread().getName() + "--------->>" + x);
}
}
}

控制台输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
main--------->>0
main--------->>1
main--------->>2
main--------->>3
main--------->>4
main--------->>5
main--------->>6
main--------->>7
main--------->>8
main--------->>9
atguigu-01 ----->>> 0
atguigu-01 ----->>> 1
atguigu-01 ----->>> 2
atguigu-01 ----->>> 3
atguigu-01 ----->>> 4
....
atguigu-01 ----->>> 98
atguigu-01 ----->>> 99
main--------->>10
main--------->>11
main--------->>12
....

通过控制台的输出结果,我们可以看到当主线程输出完9以后,”atguigu-01”线程加入到了主线程的执行流程中,此时主线程处于等待状态。当被加入的线程执行完毕以后,主线程的阻塞状态才会被消除。

面试题:有三个线程 T1 , T2 , T3 ,如何保证顺序执行?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(()->{
System.out.println(111);
});

Thread t2 = new Thread(()->{
System.out.println(222);
});

Thread t3 = new Thread(()->{
System.out.println(333);
});

t1.start();
t1.join();

t2.start();
t2.join();


t3.start();
t3.join();

}

2.2.5 线程中断

(1)interrupt方法

当调用线程的sleep方法时,可以让该线程处于等待状态,调用该线程的interrupt()方法就可以打断该阻塞状态,中断阻塞状态以后,继续执行(前提是别throw),而不是让线程结束,并且此方法会抛出一个InterruptedException异常。

1
public void interrupt();					// 中断线程的阻塞状态

案例:演示中断sleep的等待状态

线程类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MyThread extends Thread {

@Override
public void run() {

for(int x = 0 ; x < 100 ; x++) {
System.out.println(Thread.currentThread().getName() + "----" + x );
if(x == 10) {
try {
TimeUnit.SECONDS.sleep(10000); // 线程休眠以后,该线程就处于阻塞状态
} catch (InterruptedException e) {
e.printStackTrace();//如果这里throw,则当前线程的sleep被中断后,后续代码也就无法执行了
}
}
}
}
}

测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ThreadDemo1 {

public static void main(String[] args) {

// 创建MyThread线程对象
MyThread t1 = new MyThread();
t1.setName("atguigu-01");

// 启动线程
t1.start();

try {
// 主线程休眠2秒
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 中断t1线程的休眠
t1.interrupt();

}

}

控制台输出结果

1
2
3
4
5
6
7
8
9
...
atguigu-01----10
java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep(Native Method)
at java.base/java.lang.Thread.sleep(Thread.java:339)
at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446)
at com.atguigu.javase.thread.api.demo14.MyThread.run(MyThread.java:14)
atguigu-01----11
...

通过控制台的输出结果,我们可以看到interrupted方法并没有去结束当前线程,而是将线程的阻塞状态中断了,中断阻塞状态以后,线程atguigu-01继续进行执行。

(2)stop方法

调用线程的stop方法可以让线程终止执行,没有异常。

1
public final void stop()  // 终止线程的执行

线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MyThread extends Thread {

@Override
public void run() {

for(int x = 0 ; x < 100 ; x++) {
System.out.println(Thread.currentThread().getName() + "----" + x );
if(x == 10) {
try {
TimeUnit.SECONDS.sleep(10000); // 线程休眠以后,该线程就处于阻塞状态
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ThreadDemo1 {

public static void main(String[] args) {

// 创建MyThread线程对象
MyThread t1 = new MyThread();
t1.setName("atguigu-01");

// 启动线程
t1.start();

try {
// 主线程休眠2秒
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 终止线程t1的执行
t1.stop();

}

}

控制台输出结果

1
2
3
...
atguigu-01----9
atguigu-01----10

控制台没有任何异常输出,程序结束,”atguigu-01”线程没有继续进行执行。

2.2.6 守护线程

(1)概述

有一种线程是在后台运行的,它的任务就是为其他的线程提供服务,这种线程被称之为”后台线程”,又被称之为”守护线程”。

1
JVM的垃圾回收线程就是典型的后台线程。

后台线程的特征:如果所有的前台线程都结束,后台线程会自动结束,前后台线程都结束了,JVM就退出了。

常见的前台线程:主线程、之前创建的自定义线程…

(2)创建守护线程

创建普通线程,调用其setDaemon(true)方法,即创建了守护线程。

1
public final void setDaemon(boolean on)    // 将某一个线程设置为后台/守护线程

Thread的子类

1
2
3
4
5
6
7
8
9
public class MyThread extends Thread {
@Override
public void run() {
// 在控制台输出0-100
for(int x = 0 ; x < 100 ; x++) {
System.out.println(Thread.currentThread().getName() + "----" + x );
}
}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ThreadDemo01 {

public static void main(String[] args) {

// 开启两个线程
MyThread t1 = new MyThread();
t1.setName("关羽");

MyThread t2 = new MyThread();
t2.setName("张飞");

// 将关羽线程设置为守护线程(将某一个线程设置为守护线程,必须在启动线程之前)
t1.setDaemon(true);
t2.setDaemon(true);

// 启动线程
t1.start();
t2.start();

// 在主线程中编写代码
Thread.currentThread().setName("---------------刘备");
for(int x = 0 ; x < 5 ; x++) {
System.out.println(Thread.currentThread().getName() + "-----" + x);
}

/**
* 主线程 == 前台线程
* t1和t2 == 守护线程
* 当主线程执行完毕以后,剩下的线程都是守护线程了jvm就会终止
*/
}

}

控制台输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
张飞----0
张飞----1
张飞----2
张飞----3
张飞----4
关羽----0
关羽----1
关羽----2
关羽----3
关羽----4
关羽----5
关羽----6
关羽----7
---------------刘备-----0
关羽----8
---------------刘备-----1
张飞----5
---------------刘备-----2
关羽----9
---------------刘备-----3
张飞----6
---------------刘备-----4 //主线程(前台线程)执行完成后,为啥守护线程还在执行??
关羽----10
关羽----11
关羽----12
关羽----13
关羽----14
关羽----15
关羽----16
关羽----17
关羽----18
关羽----19
关羽----20

Process finished with exit code 0

最理想的状态就是当主线程(所有前台线程)执行完毕以后,所有守护线程(t1线程和t2线程)就应该立即结束。

但是控制台的输出结果是主线程打印结束后,守护线程还在继续打印一些内容。

为什么呢?因为前台线程全部结束后,JVM会通知后台线程全部结束,但从它接收到指令到做出响应,需要一定时间,而在这一段时间内其他线程还可以继续执行。

2.3 Runnable接口

2.3.1 创建Runnable接口实现类

实现多线程的第二种方式就是借助于Runnable接口进行实现,具体的步骤如下:

1、定义Runnable接口的实现类,并重写该接口的run方法

2、创建Runnable实现类的实例

3、创建Thread对象,然后将第二步创建实例作为参数传递过来

4、调用start方法启动线程

1
2
3
4
5
6
7
8
9
10
11
12
public class MyRunnable implements Runnable {

/**
* 该run方法的方法体就代表了线程需要完成的任务。因此把run方法称之为线程执行体
*/
@Override
public void run() {
for(int x = 0 ; x < 100 ; x++) {
System.out.println(x);
}
}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ThreadDemo01 {
public static void main(String[] args) {
// 创建Runnable的实例对象
MyRunnable myRunnable = new MyRunnable() ;

// 创建Thread对象,然后将第二步创建实例作为参数传递过来
Thread t1 = new Thread(myRunnable) ;

// 创建线程对象
Thread t2 = new Thread(myRunnable) ;

// 调用start方法启动线程
t1.start();
t2.start();
}
}

2.3.2 匿名内部类实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ThreadDemo01 {

public static void main(String[] args) {

// 创建Thread对象,然后将第二步创建实例作为参数传递过来
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
// 调用资源方法,完成业务逻辑
}
}) ;

// 创建线程对象
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
// 调用资源方法,完成业务逻辑
}
}) ;

// 调用start方法启动线程
t1.start();
t2.start();

}

}

2.3.3 Lambda 表达式

1
2
3
4
Thread thread = new Thread(() -> {
System.out.println(Thread.currentThread().getName());
});
thread.start();

什么是函数式接口?

2.4 Callable接口

2.4.1 概述

从java5开始,Java提供了Callable接口,该接口提供了一个call方法,并且有返回值。

但是我们并不能直接将Callable对象作为Thread的构造方法的参数进行传递,为啥?类型不匹配。咋办?借助FutureTask。

FutureTask是Runnabla的子类。

image-20230307100333776

2.4.2 代码实现

需求:开启一个线程计算1-100之和

Callable接口的子类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MyCallable implements Callable<Integer> {

@Override
public Integer call() throws Exception {

// 定义统计变量
int result = 0 ;
for(int x = 1 ; x <= 100 ; x++) {
result += x ;
}
// 返回
return result;
}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class CallableDemo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {

// 创建Callable实现类的对象
MyCallable myCallable = new MyCallable() ;

// 创建FutureTask对象,把第二步创建的对象作为参数进行传递
FutureTask<Integer> futureTask = new FutureTask<Integer>(myCallable) ;

// 创建Thread对象,把第三步创建的FutureTask对象作为传递进行传递
Thread thread = new Thread(futureTask) ;

// 调用start方法启动线程
thread.start();

// 调用FutureTask对象的get方法获取线程执行结果
Integer integer = futureTask.get();
System.out.println(integer);
}
}

2.5 创建线程3种方式总结

1、直接继承Thread类,线程和任务合并在一起,代码简单,但扩展性差,因为Java是单继承。

2、实现Runnable接口或者Callable接口。线程和任务进行了分离,扩展性强,我们的任务类还可以继续继承某一个类。

3、Runnable接口中的run方法没有返回值也没有异常,Callable中的call方法存在返回值也声明了异常。

3 线程安全问题

线程安全问题概述:当多个线程对共享数据操作的时候此时就是出现数据错乱的问题,我们把这种问题就称之为线程安全问题!

3.1 线程安全问题演示-抢红包

案例演示:

需求:使用多线程模拟抢红包案例,4个人抢3个红包

代码演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 线程任务类
public class RedPackageThread implements Runnable {

// 共享资源。红包数量=3
private static int redPackageCount = 3 ;

@Override
public void run() {
//判断资源是否充足
if(redPackageCount <= 0) {
System.out.println(Thread.currentThread().getName() + "-->未抢到红包");
}else {
//模拟抢红包过程中网络卡顿
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//红包个数-1
System.out.println(Thread.currentThread().getName()+"-->抢到一个红包,剩余个数:"+ --redPackageCount);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 测试类
public class RedPackageThreadDemo01 {
public static void main(String[] args) {
// 创建任务类,定义共享资源
RedPackageThread redPackageThread = new RedPackageThread() ;

// 创建线程对象,模拟参与抢红包的人员
Thread t1 = new Thread(redPackageThread , "张三") ;
Thread t2 = new Thread(redPackageThread , "李四") ;
Thread t3 = new Thread(redPackageThread , "王五") ;
Thread t4 = new Thread(redPackageThread , "赵六") ;

// 启动线程
t1.start();
t2.start();
t3.start();
t4.start();
}
}
1
2
3
4
张三---->抢到了一个红包,剩余红包个数:2
赵六---->抢到了一个红包,剩余红包个数:1
李四---->抢到了一个红包,剩余红包个数:0
王五---->抢到了一个红包,剩余红包个数:-1

这里的-1怎么出现的?

3.2 解决问题思路

解决线程安全的思路: 就是将多个线程对共享数据的并发访问更改为串行访问

串行访问(同步访问)就是指:一个共享数据一次只能被一个线程访问,该线程访问完毕以后其他的线程才可以访问。

要实现共享数据的串行访问,我们就需要使用机制来完成。

相关概念

1、获取锁/申请锁:一个线程在访问共享数据之前,我们必须要申请锁,申请锁的这个过程我们将其称之为获取锁。

2、持有锁的线程: 一个线程获得了某一个锁,我们就将该线程称之为锁的持有线程。

3、临界区:获取锁 到 释放锁,这个区间称之为“临界区”,共享数据只能在临界区内进行访问,临界区一次只能被一个线程执行。

锁的持有线程可以对该锁所保护的共享数据进行访问,访问结束以后该线程就需要释放锁。

image-20230307102918664

3.3 隐式锁(synchronized)

1
synchronized 实现的加锁和释放锁是自动的,不需要显示执行,称之为“隐式锁”

3.3.1 synchronized

(1)synchronized概述

synchronized锁是Java中用于控制 多线程访问共享资源 的工具。

它可以用来修饰 代码块 或者 方法 ,确保在同一时刻只有一个线程可以执行被修饰的代码。

当线程尝试获取锁时,如果锁被其他线程持有,那么当前线程会被阻塞,直到锁被释放。

同步代码块的格式

1
2
3
4
synchronized (对象) {
// 在此代码块中访问共享数据
}
该对象可以是任意的对象,这个对象可以简单的理解就是一把锁,但是需要保证多个线程在访问的时候使用的是同一个对象。

同步方法的格式

1
2
public synchronized void sellTicket(){...}
public static synchronized void sellTicket(){...}

(2)同步代码块改造

需求:使用同步代码块改造上述的代码,保证多线程操作共享数据的的安全性!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 线程任务类
public class RedPackageThread implements Runnable {

// 共享资源。红包数量=3
private static int redPackageCount = 3 ;

//锁
private static final Object lock = new Object() ;

@Override
public void run() {
synchronized (lock){
//判断资源是否充足
if(redPackageCount <= 0) {
System.out.println(Thread.currentThread().getName() + "---->未抢到红包");
}else {
//模拟抢红包过程中网络卡顿
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//红包个数-1
System.out.println(Thread.currentThread().getName()+"---->抢到了一个红包,剩余红包个数:" + --redPackageCount);
}
}
}
}

注意:多个线程所使用的锁对象必须是同一个!

1
2
3
4
张三---->抢到了一个红包,剩余红包个数:2
王五---->抢到了一个红包,剩余红包个数:1
李四---->抢到了一个红包,剩余红包个数:0
赵六---->未抢到红包

(3)同步方法改造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public void run() {
getRedPackage();
}

private synchronized void getRedPackage() {
//判断资源是否充足
if(redPackageCount <= 0) {
System.out.println(Thread.currentThread().getName() + "---->未抢到红包");
}else {
//模拟抢红包过程中网络卡顿
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//红包个数-1
System.out.println(Thread.currentThread().getName()+"---->抢到了一个红包,剩余红包个数:" + --redPackageCount);
}
}

(4)静态同步方法改造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static synchronized void getRedPackage() {
//判断资源是否充足
if(redPackageCount <= 0) {
System.out.println(Thread.currentThread().getName() + "---->未抢到红包");
}else {
//模拟抢红包过程中网络卡顿
try {
TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {
e.printStackTrace();
}
//红包个数-1
System.out.println(Thread.currentThread().getName()+"---->抢到了一个红包,剩余红包个数:" + --redPackageCount);
}
}

(5)锁对象研究

思考问题:同步代码块、同步方法、静态同步方法的锁对象分别是谁?

1、普通同步方法,是实例锁,锁是当前实例对象。

2、静态同步方法,是类锁,锁是当前类的Class对象。

3、同步代码块,锁是synchonized括号里配置的对象。

3.3.2 死锁现象

线程死锁是指由于两个或者多个线程互相持有对方所需要的资源,导致这些线程处于等待状态,无法前往执行。

锁接口

1
2
3
4
5
public interface MyLock {
// 定义锁对象
public static final Object R1 = new Object() ;
public static final Object R2 = new Object() ;
}

Thread的子类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class DeadThread extends Thread {
// 定义成员变量,来切换线程去执行不同步代码块的执行
private boolean flag;

public DeadThread(boolean flag) {
this.flag = flag;
}

@Override
public void run() {
if (flag) {
synchronized (MyLock.R1) {
System.out.println(Thread.currentThread().getName() + "---获取到了R1锁,申请R2锁....");
synchronized (MyLock.R2) {
System.out.println(Thread.currentThread().getName() + "---获取到了R1锁,获取到了R2锁....");
}
}
} else {
synchronized (MyLock.R2) {
System.out.println(Thread.currentThread().getName() + "---获取到了R2锁,申请R1锁....");
synchronized (MyLock.R1) {
System.out.println(Thread.currentThread().getName() + "---获取到了R2锁,获取到了R1锁....");
}
}
}
}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
public class DeadThreadDemo1 {
public static void main(String[] args) {
// 创建线程对象
DeadThread deadThread1 = new DeadThread(true) ;
deadThread1.setName("线程1");
DeadThread deadThread2 = new DeadThread(false) ;
deadThread2.setName("线程2");

// 启动两个线程
deadThread1.start();
deadThread2.start();
}
}

控制台输出结果

1
2
Thread-0---获取到了R1锁,申请R2锁....
Thread-1---获取到了R2锁,申请R1锁....

此时程序并没有结束,这种现象就是死锁现象…线程Thread-0持有R1的锁等待获取R2锁,线程Thread-1持有R2的锁等待获取R1的锁。

3.4 显式锁(Lock)

从JDK5开始,JUC中提供的锁都提供了常用的锁操作,加锁和解锁的方法都是显式的,我们称他们为显式锁。

JUC中的Lock是一个锁接口,在它下面提供了各种各样的实现类,供我们灵活的使用!

image-20230307135806336

3.4.1 锁的分类

(1)可重入锁和不可重入锁

可重入锁也叫作递归锁,指的是一个线程可以多次抢占同一个锁。

例如,线程A在进入外层函数抢占了锁之后,当线程A继续进入内层函数时,线程A依然可以再抢到这把锁。

不可重入锁与可重入锁相反,指的是一个线程只能抢占一次同一个锁。

1
2
Java的synchronized关键字和ReentrantLock类提供的锁都是可重入的。
可重入锁可以有效避免因先后获取同一把锁而导致的死锁。

(2)悲观锁和乐观锁

悲观锁,每次进入临界区操作数据的时候都认为别的线程会修改,有安全问题,所以线程每次在读写数据时都会上锁,锁住同步资源,这样其他线程需要读写这个数据时就会阻塞,一直等到拿到锁。

总体来说,悲观锁适用于写多读少的场景,遇到高并发写时性能高。

乐观锁,每次进入临界区操作数据的时候都认为别的线程不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,采取在写时先读出当前版本号,然后加锁操作(比较跟上一次的版本号,如果一样就更新),如果失败就要重复 “读-比较-写”的操作。

总体来说,乐观锁适用于读多写少的场景,遇到高并发写时性能低。

1
synchronized和ReentrantLock都是悲观锁.

(3)公平锁和非公平锁

公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才能获得锁。

非公平锁则是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待,但如果此时锁刚好可用,那么这个线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先获取锁的场景。

1
synchronized 是非公平锁,ReentrantLock既可以实现公平也可以非公平。

(4)可中断锁和不可中断锁

可中断锁:线程在等待获取锁的过程中,如果收到中断信号,会立即响应中断,并结束等待状态,这种机制允许线程在等待期间执行其他任务或进行其他操作。

不可中断锁:线程一旦开始等待获取锁,除非成功获取到锁,否则不会被任何中断信号所打断。线程会一直等待,直到成功获取到锁或者线程本身被终止。在等待期间,线程无法响应中断信号,也无法执行其他任务。

1
2
Java中的synchronized关键字实现的就是一种不可中断锁的机制。
Java中的ReentrantLock类支持可中断锁,即线程在等待获取锁时可以被中断。

(5)共享锁和独占锁

独占锁(Exclusive Lock)和共享锁(Shared Lock)。

独占锁也叫互斥锁。当一个线程获得一个独占锁后,其他线程将无法获取该锁,直到该线程释放锁。

共享锁,允许多个线程同时获取同一个锁。

1
2
synchronized 是一种独占锁。
ReentrantLock可以独占也可共享。

3.4.2 使用ReentrantLock改造抢红包案例

ReentrantLock是Lock接口下的一个子类,被称之为可重入锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RedPackageThread implements Runnable {

// 共享资源
private static int redPackageCount = 3 ;
// 声明一把锁
private static final Lock lock = new ReentrantLock() ;

@Override
public void run() {
// 加锁(加锁失败则阻塞)
lock.lock();

if(redPackageCount <= 0) {
System.out.println(Thread.currentThread().getName() + "---->未抢到红包");
}else {
try {
TimeUnit.SECONDS.sleep(1); // 线程休眠模拟网络卡顿
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"---->抢到了一个红包,剩余红包个数:" + --redPackageCount);
}

//释放锁
lock.unlock();
}

}

3.4.3 ReentrantLock-可重入锁

可重入锁又名递归锁,Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁

给抢红包案例添加检查剩余红包功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class RedPackageThread implements Runnable {

private static int redPackageCount = 3 ; // 共享资源
private static final Lock lock = new ReentrantLock() ;

@Override
public void run() {

lock.lock(); // 加锁

if(redPackageCount <= 0) {
System.out.println(Thread.currentThread().getName() + "---->未抢到红包");
}else {
try {
TimeUnit.SECONDS.sleep(1); // 线程休眠模拟网络卡顿
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"---->抢到了一个红包,剩余红包个数:" + --redPackageCount);
checkReadPackage();
}

lock.unlock(); // 加锁

}

public void checkReadPackage() {
lock.lock(); // 加锁
System.out.println("检查剩余红包.......");
lock.unlock();
}

}

3.4.4 ReentrantLock-公平锁/非公平锁

ReentrantLock 默认实现公平锁(也可以实现非公平锁),多个线程等待同一个锁时,必须按照申请锁的时间顺序获得锁

1
2
private static final Lock lock1 = new ReentrantLock(true) ; //公平锁
private static final Lock lock2 = new ReentrantLock(false) ;//非公平锁

3.4.5 ReentrantLock-可中断锁

获取锁方法:lockInterruptibly() 。当有可用锁时会直接得到锁并立即返回,如果没有可用锁会一直阻塞等待直到获取锁,但和 lock() 方法不同的是,lockInterruptibly() 方法在等待获取时,如果遇到线程中断会放弃获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class RedPackageThread implements Runnable {

private static int redPackageCount = 3; // 共享资源
private static final Lock lock = new ReentrantLock();

@Override
public void run() {

try {

// 获取锁
lock.lockInterruptibly();

if (redPackageCount <= 0) {
System.out.println(Thread.currentThread().getName() + "---->未抢到红包");
} else {
try {
TimeUnit.SECONDS.sleep(10); // 线程休眠模拟网络卡顿
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"---->抢到了一个红包,剩余红包个数:" + --redPackageCount);
}

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放锁
lock.unlock();
}

}
}

// RedPackageThreadDemo01测试类
public class RedPackageThreadDemo01 {
public static void main(String[] args) throws InterruptedException {

// 创建任务类,定义共享资源
RedPackageThread redPackageThread = new RedPackageThread() ;

// 创建线程对象,模拟参与抢红包的人员
Thread t1 = new Thread(redPackageThread , "张三") ;
Thread t2 = new Thread(redPackageThread , "李四") ;
Thread t3 = new Thread(redPackageThread , "王五") ;
Thread t4 = new Thread(redPackageThread , "赵六") ;

// 启动线程
t1.start();
t2.start();

Thread.sleep(5000);
t2.interrupt(); // 中断线程,通知中断t2线程等待获取锁的操作

t3.start();
t4.start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java.lang.InterruptedException
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:959)
at java.base/java.util.concurrent.locks.ReentrantLock$Sync.lockInterruptibly(ReentrantLock.java:161)
at java.base/java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:372)
at com.atguigui.jvm.a.RedPackageThread.run(RedPackageThread.java:19)
at java.base/java.lang.Thread.run(Thread.java:833)
Exception in thread "李四" java.lang.IllegalMonitorStateException
at java.base/java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:175)
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1007)
at java.base/java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:494)
at com.atguigui.jvm.a.RedPackageThread.run(RedPackageThread.java:36)
at java.base/java.lang.Thread.run(Thread.java:833)
张三---->抢到了一个红包,剩余红包个数:2
王五---->抢到了一个红包,剩余红包个数:1
赵六---->抢到了一个红包,剩余红包个数:0

3.4.6 tryLock尝试获取锁/限时返回

1
2
3
4
5
6
7
8
9
10
11
12
13
//尝试获取锁,但不阻塞当前线程。如果获取成功,则返回true;如果获取失败(即锁已被其他线程获取),则返回false
boolean b = lock.tryLock();

//示例
if (lock.tryLock()) {
try {
// 执行业务逻辑
} finally {
lock.unlock();
}
} else {
// 未能获取锁,执行其他逻辑
}
1
2
3
4
5
6
7
8
9
10
11
// 尝试获取锁,最多等待 3 秒  
if (lock.tryLock(3, TimeUnit.SECONDS)) {
try {
// 执行业务逻辑,此时拥有锁
} finally {
lock.unlock(); // 确保释放锁
}
} else {
// 未能获取锁,可能是超时或锁被其他线程持有
// 执行其他逻辑,如记录日志、重试等
}

3.5 synchronized和Lock对比

(1)synchronized是Java的关键字,在jvm层面上实现的锁;Lock是JUC包下的一个接口。

(2)synchronizedReentrantLock 都是可重入,前者的加速和解锁是自动进行,不用操心锁是否释放;后者需要手动加锁和释放锁,且次数必须一致。

(3)synchronized是独占的,ReentrantLock可独占也可共享。

(4)synchronized 关键字在获取锁时是不可中断的,一个线程获取不到锁就一直等着。ReentrantLock可中断。

(5)在竞争不激烈的情况下,synchronized 的性能与 ReentrantLock 相当。但在高竞争的情况下,ReentrantLock 提供了更灵活的策略(如尝试锁、超时锁等),可能会提供更好的性能。

(6)synchronized 关键字在发生异常时会自动释放锁。使用 Lock 时,如果代码中没有正确处理异常(即没有在 finally 块中释放锁),则可能导致死锁。因此,使用 Lock 时需要更加小心。

3.6 ReentrantReadWriteLock

3.6.1 概述

JAVA中提供的关键字synchronized和JUC包中实现了Lock接口的ReentrantLock ,它们都是独占式获取锁,也就是在同一时刻只有一个线程能够获取锁==》独占锁。

而在一些业务场景中,大部分是读操作,少部分是写操作,如果只是并发读操作的话并不会影响数据正确性,而如果在这种业务场景下,依然使用独占锁的话,效率很差。

针对这种读多写少的情况,java还提供了另外一个实现Lock接口的ReentrantReadWriteLock(读写锁)。

读写锁的特点:

1、写写不可并发

2、读写不可并发

3、写读不可并发

4、读读可以并发

3.6.2 读写并发问题演示

接下来以缓存为例用代码演示读写锁,演示读写并发问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 缓存工具类
class MyCache{

private Map<String, String> cache= new HashMap<>();

public void put(String key, String value){
try {
System.out.println(Thread.currentThread().getName() + " 开始写入!");
Thread.sleep(300);
cache.put(key, value);
System.out.println(Thread.currentThread().getName() + " 写入成功!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
}
}

public void get(String key){
try {
System.out.println(Thread.currentThread().getName() + " 开始读出!");
Thread.sleep(300);
String value = cache.get(key);
System.out.println(Thread.currentThread().getName() + " 读出成功!" + value);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
}
}
}

// 测试类
public class ReentrantReadWriteLockDemo {

public static void main(String[] args) {

MyCache cache = new MyCache();

for (int i = 1; i <= 5; i++) {
String num = String.valueOf(i);
// 开启5个写线程
new Thread(()->{
cache.put(num, num);
}, num).start();
}
for (int i = 1; i <= 5; i++) {
String num = String.valueOf(i);
// 开启5个读线程
new Thread(()->{
cache.get(num);
}, num).start();
}
}
}

打印结果:多执行几次,就会出现数据读取问题

image-20230307150715839

3.6.3 读写锁的使用

使用读写锁对上述操作进行改造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class MyCache{

private Map<String, String> cache= new HashMap<>();

// 加入读写锁
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

public void put(String key, String value){

// 加写锁
rwl.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 开始写入!");
Thread.sleep(500);
cache.put(key, value);
System.out.println(Thread.currentThread().getName() + " 写入成功!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放写锁
rwl.writeLock().unlock();
}
}

public void get(String key){
// 加入读锁
rwl.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 开始读出!");
Thread.sleep(500);
String value = cache.get(key);
System.out.println(Thread.currentThread().getName() + " 读出成功!" + value);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放读锁
rwl.readLock().unlock();
}
}
}

3.6.4 锁降级

锁降级:锁降级就是从写锁降级成为读锁

在当前线程拥有写锁的情况下,之后再获取到读锁,随后释放写锁的过程就是锁降级。

1
锁降级使用场景:当多线程情况下,更新完数据后,立刻查询刚更新完的数据。

ReentrantReadWriteLock 支持锁降级,不支持锁升级。

4 线程间通讯

线程间通讯概述:线程间通信指的就是让多个线程进行协同工作,来完成特定的任务。

线程间通信,方案一: synchronized + wait() + notify()/notifyAll() 方法二:Lock + Condition

1.1 单生产单消费

1.1.1 需求分析

需求:两个线程操作一个初始值为0的变量,实现一个线程对变量增加1,一个线程对变量减少1,交替10轮。

1.1.2 代码演示

共享变量的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 线程
class ShareDataOne {

private Integer number = 0;

// 加1方法
public synchronized void increment() throws InterruptedException {

// 1. 判断
if (number != 0) {
this.wait();//释放该对象的锁,并使自己进入等待状态
}

// 2. 干活
number++;
System.out.println(Thread.currentThread().getName() + ": " + number);

// 3. 通知(由于方法执行结束,所以自动释放锁)
this.notifyAll();//所有等待的线程被唤醒,然后它们会竞争获取对象的锁
}

// 减1方法
public synchronized void decrement() throws InterruptedException {

// 1. 判断
if (number != 1) {
this.wait();
}

// 2. 干活
number--;
System.out.println(Thread.currentThread().getName() + ": " + number);

// 3. 通知
this.notifyAll();
}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class NotifyWaitDemo {

public static void main(String[] args) {

// 创建ShareDataOne对象
ShareDataOne shareDataOne = new ShareDataOne();

// 单线程对number变量进行+1操作10次
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
shareDataOne.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "AAA").start();

// 单线程对number变量进行-1操作10次
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
shareDataOne.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "BBB").start();

}
}

1.1.3 sleep和wait的区别

区别:

1、sleep是Thread类中方法,wait方法是Object类中的方法

2、sleep方法不会释放同步锁,wait方法会释放同步锁

1.2 多生产多消费

1.2.1 虚假唤醒问题

(1)问题演示

上述的等待唤醒机制只适用于单生产者和单消费者模型。如果是多生产者多消费者模式,此时就会出现问题。

改造mian方法,加入CCC和DDD两个线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class NotifyWaitDemo {

public static void main(String[] args) {

// 创建ShareDataOne对象
ShareDataOne shareDataOne = new ShareDataOne();

// 单线程对number变量进行+1操作10次
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
shareDataOne.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "AAA").start();

// 单线程对number变量进行-1操作10次
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
shareDataOne.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "BBB").start();


// 单线程对number变量进行+1操作10次
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
shareDataOne.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "CCC").start();

// 单线程对number变量进行-1操作10次
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
shareDataOne.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "DDD").start();

}
}

(2)虚假唤醒说明

换成4个线程会导致错误,虚假唤醒

原因:在java多线程判断时,不能用if,程序出事出在了判断上面。

注意,消费者被唤醒后是从wait()方法(被阻塞的地方)后面执行,而不是重新从同步块开头。

如下图: 出现-1的情况分析!

image-20230307162744014

解决虚假唤醒:查看API,java.lang.Object的wait方法

image-20230307162853450

中断和虚假唤醒是可能产生的,所以要用loop循环,if只判断一次,while是只要唤醒就要拉回来再判断一次。

1.2.2 问题解决

if换成while

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 线程
class ShareDataOne {

private Integer number = 0;

// 加1方法
public synchronized void increment() throws InterruptedException {

// 1. 判断
while (number != 0) {
this.wait();
}

// 2. 干活
number++;
System.out.println(Thread.currentThread().getName() + ": " + number);

// 3. 通知
this.notifyAll();
}

// 减1方法
public synchronized void decrement() throws InterruptedException {

// 1. 判断
while (number != 1) {
this.wait();
}

// 2. 干活
number--;
System.out.println(Thread.currentThread().getName() + ": " + number);

// 3. 通知
this.notifyAll();

}

}

1.3 Lock+Condition实现通信

condition.await(); 线程等待

condition.signalAll(); 唤醒在此Condition上等待的所有线程

1.3.1 案例改造

代码改造:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// 线程
class ShareDataOne {

private Integer number = 0;

//创建Lock锁
private static final ReentrantLock reentrantLock = new ReentrantLock() ;

//创建Lock锁对应的condition,用于线程等待和唤醒
private static final Condition condition = reentrantLock.newCondition() ;

// 加1方法
public void increment() throws InterruptedException {

//获取锁
reentrantLock.lock();


// 1. 判断
while (number != 0) {
condition.await();//释放锁
}

// 2. 干活
number++;
System.out.println(Thread.currentThread().getName() + ": " + number);

// 3. 通知。唤醒所有等待该Condition的线程(不会释放锁,所以下一步需要unlock)
condition.signalAll();

// 释放锁
reentrantLock.unlock();
}

// 减1方法
public void decrement() throws InterruptedException {

reentrantLock.lock(); // 获取锁

// 1. 判断
while (number != 1) {
condition.await();
}

// 2. 干活
number--;
System.out.println(Thread.currentThread().getName() + ": " + number);

// 3. 通知
condition.signalAll();

// 释放锁
reentrantLock.unlock();

}

}

1.3.2 打印数字和字母

有两个线程,一个线程打印1-52,另一个打印字母A-Z,打印顺序为 1 2 A 3 4 B … 51 52 Z,要求用线程间通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class PrinterData {

// 定义锁对象
private static final Lock lock = new ReentrantLock() ;
private static final Condition condition = lock.newCondition() ;

// 打印数字的方法
public void printNumber() {
// 获取锁
lock.lock();
for(int x = 1 ; x <= 52 ; x++) {

System.out.print(x);

if(x % 2 == 0) {
try {
condition.signalAll(); // 打印完毕,唤醒其他线程
condition.await(); // 释放锁,并等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 释放锁
lock.unlock();
}

// 打印字母
public void printChar() {
// 获取锁
lock.lock();

for(char x = 'A' ; x <= 'Z' ; x++) {

System.out.print(x);

try {
condition.signalAll(); //唤醒其他线程
condition.await(); //释放锁并等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 释放锁
lock.unlock();
}
}


public class PrinterDataDemo01 {
public static void main(String[] args) {

// 创建打印数据的对象
PrinterData printerData = new PrinterData() ;

// 创建两个线程
new Thread(() -> {
printerData.printNumber();
}).start();

new Thread(() -> {
printerData.printChar();
}).start();

}
}

5 线程状态

2.1 状态介绍

Java中的线程状态被定义在了java.lang.Thread.State枚举类中,State枚举类的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Thread {

public enum State {

/* 新建 */
NEW ,

/* 可运行状态 */
RUNNABLE ,

/* 阻塞状态 */
BLOCKED ,

/* 无限等待状态 */
WAITING ,

/* 计时等待 */
TIMED_WAITING ,

/* 终止 */
TERMINATED;

}

// 获取当前线程的状态
public State getState() {
return jdk.internal.misc.VM.toThreadState(threadStatus);
}

}

通过源码我们可以看到Java中的线程存在6种状态,每种线程状态的含义如下

线程状态 具体含义
NEW 一个尚未启动的线程的状态。也称之为初始状态、开始状态。线程刚被创建,但是并未启动。还没调用start方法。MyThread t = new MyThread()只有线程象,没有线程特征。
RUNNABLE 调用线程对象的start方法,此时线程进入了RUNNABLE状态。(就绪状态)
线程一经启动并不是立即得到执行,线程的运行与否要听令于CPU的调度,可执行状态(RUNNABLE)也就是说它具备执行的资格,但是并没有真正的执行起来而是在等待CPU的调度。
BLOCKED 当一个线程试图获取一个对象锁,而该对象锁被其他的线程持有,则该线程进入Blocked状态;
当该线程获取到锁时,该线程将变成Runnable状态。
WAITING 一个正在等待的线程的状态,也称之为等待状态。
造成线程等待的原因有两种,分别是调用wait()、join()方法。
处于等待状态的线程,正在等待其他线程去执行一个特定的操作。
例如:因为wait()而等待的线程正在等待另一个线程去调用notify()或notifyAll();
一个因为join()而等待的线程正在等待另一个线程结束。
TIMED_WAITING 一个在限定时间内等待的线程的状态,也称之为限时等待状态。造成线程限时等待状态的原因有三种,分别是:sleep(long)、wait(long)、join(long)。
TERMINATED 一个完全运行完成的线程的状态。也称之为终止状态、结束状态。

各个状态的转换,如下图所示:

1571652681276

2.2 线程状态转换

2.2.1 案例一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ThreadStateDemo01 {

public static void main(String[] args) throws InterruptedException {

//定义一个内部线程
Thread thread = new Thread(() -> {
System.out.println("2.执行thread.start()之后,线程的状态:" + Thread.currentThread().getState());
try {
//休眠100毫秒
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("4.执行Thread.sleep(long)完成之后,线程的状态:" + Thread.currentThread().getState());
});

//获取start()之前的状态
System.out.println("1.通过new初始化一个线程,但是还没有start()之前,线程的状态:" + thread.getState());

//启动线程
thread.start();

//休眠50毫秒
Thread.sleep(50);

//因为thread1需要休眠100毫秒,所以在第50毫秒,thread处于sleep状态
System.out.println("3.执行Thread.sleep(long)时,线程的状态:" + thread.getState());

//thread1和main线程主动休眠150毫秒,所以在第150毫秒,thread早已执行完毕
Thread.sleep(100);

System.out.println("5.线程执行完毕之后,线程的状态:" + thread.getState() + "\n");

}

}

2.2.2 案例二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ThreadStateDemo02 {

public static void main(String[] args) throws InterruptedException {

//定义一个对象,用来加锁和解锁
Object obj = new Object();

//定义一个内部线程
Thread thread1 = new Thread(() -> {
System.out.println("2.执行thread.start()之后,线程的状态:" + Thread.currentThread().getState());
synchronized (obj) {
try {

//thread1需要休眠100毫秒
Thread.sleep(100);

//thread1100毫秒之后,通过wait()方法释放obj对象是锁
obj.wait();

} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("4.被object.notify()方法唤醒之后,线程的状态:" + Thread.currentThread().getState());
});

//获取start()之前的状态
System.out.println("1.通过new初始化一个线程,但是还没有start()之前,线程的状态:" + thread1.getState());

//启动线程
thread1.start();

//main线程休眠150毫秒
Thread.sleep(150);

//因为thread1在第100毫秒进入wait等待状态,所以第150秒肯定可以获取其状态
System.out.println("3.执行object.wait()时,线程的状态:" + thread1.getState());

//声明另一个线程进行解锁
new Thread(() -> {
synchronized (obj) {
//唤醒等待的线程
obj.notify();
}
}).start();

//main线程休眠10毫秒等待thread1线程能够苏醒
Thread.sleep(10);

//获取thread1运行结束之后的状态
System.out.println("5.线程执行完毕之后,线程的状态:" + thread1.getState() + "\n");

}

}

6 线程池

3.1 线程池概述

线程池(Thread Pool)是一种多线程处理形式,处理过程中将任务提交给线程池,线程池中的线程会异步地执行这些任务。线程池的主要目的是复用线程,减少线程的创建和销毁开销,提高程序的响应速度和吞吐量。

在Java中,java.util.concurrent 包提供了对线程池的支持,包括 ExecutorService 接口和几个实现了该接口的类,如 ThreadPoolExecutorExecutors

优势:

  1. 降低资源消耗:通过复用线程,减少了线程的创建和销毁次数,降低了线程的创建和销毁的开销。
  2. 提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行。
  3. 提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性。使用线程池可以对线程进行统一分配、调优和监控。

3.2 JDK中线程池

3.2.1 线程池的体系结构

JDK提供的线程池的体系结构如下所示:

image-20240217144728987

Java中的线程池是通过Executor框架实现的,该框架用到了Executor , Executors , ExecutorService , ThreadPoolExecutor。

3.2.2 Executors工具类

(1)创建线程池方法

Executors 是 JDK 所提供的线程池 工具类,在该类中提供了很多的静态方法供我们快速的创建线程池对象。

1
2
3
4
5
6
7
8
9
10
11
// 创建一个可缓存线程池,可灵活的去创建线程,并且灵活的回收线程,若无可回收,则新建线程。
ExecutorService newCachedThreadPool()

// 初始化一个具有固定数量线程的线程池
ExecutorService newFixedThreadPool(int nThreads)

// 初始化一个具有一个线程的线程池
ExecutorService newSingleThreadExecutor()

// 初始化一个具有一个线程的线程池,支持定时及周期性任务执行
ScheduledExecutorService newSingleThreadScheduledExecutor()

返回值就是线程池对象ExecutorService,ScheduledExecutorService。

ExecutorService中的常见方法

1
2
Future<?> submit(Runnable task):	向线程池提交任务
void shutdown(): 关闭线程池

(2)newCachedThreadPool

无限大线程池:newCachedThreadPool的线程池大小理论上可以是无限的,因为它没有设置最大线程数。当执行第二个任务时,如果第一个任务的线程已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ExecutorsDemo01 {

// 演示Executors中的newCachedThreadPool返回的线程池的特点
public static void main(String[] args) throws InterruptedException {

// 获取线程池对象
ExecutorService threadPool = Executors.newCachedThreadPool();

// 提交任务
threadPool.submit(() -> {
System.out.println( Thread.currentThread().getName() + "---执行了任务");
});

// 提交任务
threadPool.submit(() -> {
System.out.println( Thread.currentThread().getName() + "---执行了任务");
});

// 不使用线程池了,还可以将线程池关闭
threadPool.shutdown();
}
}

控制台输出结果

1
2
pool-1-thread-2---执行了任务
pool-1-thread-1---执行了任务

针对每一个任务,线程池为其分配一个线程去执行,我们可以在第二次提交任务的时候,让主线程休眠一小会儿,看程序的执行结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ExecutorsDemo02 {

// 演示Executors中的newCachedThreadPool返回的线程池的特点
public static void main(String[] args) throws InterruptedException {

// 获取线程池对象
ExecutorService threadPool = Executors.newCachedThreadPool();

// 提交任务
threadPool.submit(() -> {
System.out.println( Thread.currentThread().getName() + "---执行了任务");
});

// 线程休眠2秒,主线程休眠2秒,此时之前提交的任务应该已经执行完毕
TimeUnit.SECONDS.sleep(2);

// 提交任务
threadPool.submit(() -> {
System.out.println( Thread.currentThread().getName() + "---执行了任务");
});

// 不使用线程池了,还可以将线程池关闭
threadPool.shutdown();

}

}

控制台输出结果

1
2
pool-1-thread-1---执行了任务
pool-1-thread-1---执行了任务

我们发现是通过一个线程执行了两个任务。此时就说明线程池中的线程”pool-1-thread-1”被线程池回收了,成为了空闲线程,当我们再次提交任务的时候,该线程就去执行新的任务。

(3)newFixedThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ExecutorsDemo03 {

// 演示newFixedThreadPool方法所获取到的线程池的特点
public static void main(String[] args) {

// 获取线程池对象,初始化一个具有固定数量线程的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(3); // 在该线程池中存在3个线程

// 提交任务
for(int x = 0 ; x < 5 ; x++) {
threadPool.submit( () -> {
System.out.println(Thread.currentThread().getName() + "----->>>执行了任务" );
});
}

// 关闭线程池
threadPool.shutdown();
}

}

控制台输出结果

1
2
3
4
5
pool-1-thread-1----->>>执行了任务
pool-1-thread-2----->>>执行了任务
pool-1-thread-2----->>>执行了任务
pool-1-thread-2----->>>执行了任务
pool-1-thread-3----->>>执行了任务

通过控制台的输出结果,我们可以看到5个任务是通过3个线程进行执行的,说明此线程池中存在三个线程对象。

(4)newSingleThreadExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ExecutorsDemo04 {

// 演示newSingleThreadExecutor方法所获取到的线程池的特点
public static void main(String[] args) {

// 获取线程池对象,初始化一个具有一个线程的线程池
ExecutorService threadPool = Executors.newSingleThreadExecutor();

// 提交任务
for(int x = 0 ; x < 5 ; x++) {
threadPool.submit(() -> {
System.out.println(Thread.currentThread().getName() + "----->>>执行了任务");
});
}

// 关闭线程池
threadPool.shutdown();
}

}

控制台输出结果

1
2
3
4
5
pool-1-thread-1----->>>执行了任务
pool-1-thread-1----->>>执行了任务
pool-1-thread-1----->>>执行了任务
pool-1-thread-1----->>>执行了任务
pool-1-thread-1----->>>执行了任务

通过控制台的输出结果,我们可以看到5个任务是通过1个线程进行执行的,说明此线程池中只存在一个线程对象。

(5)newSingleThreadScheduledExecutor

支持定时及周期性任务执行。

测试1(演示定时执行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ExecutorsDemo05 {

// 演示newSingleThreadScheduledExecutor方法所获取到的线程池的特点(支持定时及周期性任务执行)
public static void main(String[] args) {

// 获取线程池对象
ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();

// 提交任务,10s以后开始执行该任务
threadPool.schedule( () -> {
System.out.println(Thread.currentThread().getName() + "---->>>执行了该任务");
} , 10 , TimeUnit.SECONDS) ;

// 关闭线程池
threadPool.shutdown();
}

}

测试2(演示周期性执行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ExecutorsDemo06 {

// 演示newSingleThreadScheduledExecutor方法所获取到的线程池的特点(支持定时及周期性任务执行)
public static void main(String[] args) {

// 获取线程池对象
ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();

// 提交任务,10s以后开始第一次执行该任务,然后每隔1秒执行一次
threadPool.scheduleAtFixedRate( () -> {
System.out.println(Thread.currentThread().getName() + "---->>>执行了该任务");
} , 10 ,1, TimeUnit.SECONDS) ;

}

}

3.3.3 ThreadPoolExecutor

(1)基本介绍

刚才我们是通过Executors中的静态方法去创建线程池的,通过查看源代码我们发现,其底层都是通过ThreadPoolExecutor构建的。

比如:newFixedThreadPool方法的源码

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {

// 创建了ThreadPoolExecutor对象,然后直接返回
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

ThreadPoolExecutor最完整的构造方法:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

参数说明

1
2
3
4
5
6
7
corePoolSize:   核心线程的最大值,不能小于0
maximumPoolSize:最大线程数,不能小于等于0,maximumPoolSize >= corePoolSize
keepAliveTime: 空闲线程最大存活时间,不能小于0
unit: 时间单位
workQueue: 任务队列,不能为null
threadFactory: 创建线程工厂,不能为null
handler: 任务的拒绝策略,不能为null

(2)阻塞队列简介

jdk1.5 提供了一个BlockingQueue接口,它主要的用途不是作为容器,而是作为多线程协作的工具。

继承体系图:

image-20230308095302524

BlockingQueue的特征:

1、当生产者线程试图向BlockingQueue中放入元素时,如果队列已满,则线程被阻塞。

2、当消费者线程试图从BlockingQueue中取元素时,如果队列没有元素,则该线程被阻塞。

和阻塞相关的两个方法:

1
2
void put(E e) throws InterruptedException;			// 存数据
E take() throws InterruptedException; // 取出数据,并移除

BlockingQueue的常用实现类:

1
2
ArrayBlockingQueue:    基于数组实现的有界阻塞队列,有界指的是队列有大小限制
LinkedBlockingQueue: 基于链表实现的无界阻塞队列,并不是绝对的无界(容量:Integer.MAX_VALUE)

我们以ArrayBlockingQueue举例来演示一下阻塞队列的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ArrayBlockingQueueDemo {

public static void main(String[] args) throws InterruptedException {

// 创建一个容量为1的阻塞队列
ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<String>(1) ;

// 存储元素
// arrayBlockingQueue.put("java");
// arrayBlockingQueue.put("world");

// 取元素
arrayBlockingQueue.put("java");
System.out.println(arrayBlockingQueue.take());

System.out.println(arrayBlockingQueue.take());

// 输出
System.out.println("程序结束了....");

}

}

(3)自定义线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ThreadPoolExecutorDemo01 {

// 演示基本使用
public static void main(String[] args) {

// 通过ThreadPoolExecutor创建一个线程池对象
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

/**
* 以上代码表示的意思是:核心线程池中的线程数量最大为1,整个线程池中最多存在3个线程,空闲线程最大的存活时间为60,时间单位为秒,阻塞队列使用的是有界阻塞队列,容量为3,使用默认的线程工厂;以及默认的任务处理策略
*/

// 提交任务
threadPoolExecutor.submit(() -> {
System.out.println(Thread.currentThread().getName() + "------>>>执行了任务");
});

// 关闭线程池
threadPoolExecutor.shutdown();

}
}

3.3.4 线程池工作原理

如下图所示

image-20230308100801822

当我们通过submit方法向线程池中提交任务的时候,具体的工作流程如下:

1、客户端每次提交一个任务,线程池就会在核心线程池中创建一个工作线程来执行这个任务。当核心线程池中的线程已满时,则进入下一步操作。

2、把任务试图存储到工作队列中。如果工作队列没有满,则将新提交的任务存储在这个工作队列里,等待核心线程池中的空闲线程执行。如果工作队列满了,则进入下个流程。

3、线程池会再次在非核心线程池区域去创建新工作线程来执行任务,直到当前线程池总线程数量超过最大线程数时,就是按照指定的任务处理策略处理多余的任务。

举例说明:

假如有一个工厂,工厂里面有10个工人(正式员工),每个工人同时只能做一件任务。因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;

如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;

然后就将任务也分配给这4个临时工人做;如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。

这里的工厂可以看做成是一个线程池,每一个工人可以看做成是一个线程。其中10个正式员工,可以看做成是核心线程池中的线程,临时工就是非核心线程池中的线程。当临时工处于空闲状态的时候,那么如果空闲的时间超过keepAliveTime所指定的时间,那么就会被销毁。

3.3.5 线程池任务的拒绝策略

RejectedExecutionHandler是jdk提供的一个任务拒绝策略接口,它下面存在4个子类。

1
2
3
4
ThreadPoolExecutor.AbortPolicy: 		    丢弃任务并抛出RejectedExecutionException异常。是默认的策略。
ThreadPoolExecutor.DiscardPolicy: 丢弃任务,但是不抛出异常 这是不推荐的做法。
ThreadPoolExecutor.DiscardOldestPolicy: 抛弃队列中等待最久的任务 然后把当前任务加入队列中。
ThreadPoolExecutor.CallerRunsPolicy: 调用任务的run()方法绕过线程池直接执行。

注:明确线程池最多可执行的任务数 = 队列容量 + 最大线程数

案例演示1:演示ThreadPoolExecutor.AbortPolicy任务处理策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ThreadPoolExecutorDemo03 {

public static void main(String[] args) {

/**
* 核心线程数量为1 , 最大线程池数量为3, 任务容器的容量为1 ,空闲线程的最大存在时间为20s
*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1 , 3 , 20 , TimeUnit.SECONDS ,
new ArrayBlockingQueue<>(1) , Executors.defaultThreadFactory() , new ThreadPoolExecutor.AbortPolicy()) ;

// 提交5个任务,而该线程池最多可以处理4个任务,当我们使用AbortPolicy这个任务处理策略的时候,就会抛出异常
for(int x = 0 ; x < 5 ; x++) {
threadPoolExecutor.submit(() -> {
System.out.println(Thread.currentThread().getName() + "---->> 执行了任务");
});
}

}

}

控制台输出结果

1
2
3
4
5
6
7
8
9
10
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@566776ad[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@edf4efb[Wrapped task = com.atguigu.javase.thread.pool.demo04.ThreadPoolExecutorDemo01$$Lambda$14/0x0000000100066840@2f7a2457]] rejected from java.util.concurrent.ThreadPoolExecutor@6108b2d7[Running, pool size = 3, active threads = 3, queued tasks = 1, completed tasks = 0]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
at com.atguigu.javase.thread.pool.demo04.ThreadPoolExecutorDemo01.main(ThreadPoolExecutorDemo01.java:20)
pool-1-thread-1---->> 执行了任务
pool-1-thread-3---->> 执行了任务
pool-1-thread-2---->> 执行了任务
pool-1-thread-3---->> 执行了任务

控制台报错,仅仅执行了4个任务,有一个任务被丢弃了

案例演示2:演示ThreadPoolExecutor.DiscardPolicy任务处理策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ThreadPoolExecutorDemo02 {

public static void main(String[] args) {

/**
* 核心线程数量为1 , 最大线程池数量为3, 任务容器的容量为1 ,空闲线程的最大存在时间为20s
*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1 , 3 , 20 , TimeUnit.SECONDS ,
new ArrayBlockingQueue<>(1) , Executors.defaultThreadFactory() , new ThreadPoolExecutor.DiscardPolicy()) ;

// 提交5个任务,而该线程池最多可以处理4个任务,当我们使用DiscardPolicy这个任务处理策略的时候,控制台不会报错
for(int x = 0 ; x < 5 ; x++) {
threadPoolExecutor.submit(() -> {
System.out.println(Thread.currentThread().getName() + "---->> 执行了任务");
});
}

}
}

控制台输出结果

1
2
3
4
pool-1-thread-1---->> 执行了任务
pool-1-thread-1---->> 执行了任务
pool-1-thread-3---->> 执行了任务
pool-1-thread-2---->> 执行了任务

控制台没有报错,仅仅执行了4个任务,有一个任务被丢弃了

案例演示3:演示ThreadPoolExecutor.DiscardOldestPolicy任务处理策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ThreadPoolExecutorDemo03 {

public static void main(String[] args) {

/**
* 核心线程数量为1 , 最大线程池数量为3, 任务容器的容量为1 ,空闲线程的最大存在时间为20s
*/
ThreadPoolExecutor threadPoolExecutor;
threadPoolExecutor = new ThreadPoolExecutor(1 , 3 , 20 , TimeUnit.SECONDS ,
new ArrayBlockingQueue<>(1) , Executors.defaultThreadFactory() , new ThreadPoolExecutor.DiscardOldestPolicy());

// 提交5个任务
for(int x = 0 ; x < 5 ; x++) {

// 定义一个变量,来指定指定当前执行的任务;这个变量需要被final修饰
final int y = x ;
threadPoolExecutor.submit(() -> {
System.out.println(Thread.currentThread().getName() + "---->> 执行了任务" + y);
});

}

}
}

控制台输出结果

1
2
3
4
pool-1-thread-2---->> 执行了任务2
pool-1-thread-1---->> 执行了任务0
pool-1-thread-3---->> 执行了任务3
pool-1-thread-1---->> 执行了任务4

由于任务1在线程池中等待时间最长,因此任务1被丢弃。

案例演示4:演示ThreadPoolExecutor.CallerRunsPolicy任务处理策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ThreadPoolExecutorDemo04 {

public static void main(String[] args) {

/**
* 核心线程数量为1 , 最大线程池数量为3, 任务容器的容量为1 ,空闲线程的最大存在时间为20s
*/
ThreadPoolExecutor threadPoolExecutor;
threadPoolExecutor = new ThreadPoolExecutor(1 , 3 , 20 , TimeUnit.SECONDS ,
new ArrayBlockingQueue<>(1) , Executors.defaultThreadFactory() , new ThreadPoolExecutor.CallerRunsPolicy());

// 提交5个任务
for(int x = 0 ; x < 5 ; x++) {
threadPoolExecutor.submit(() -> {
System.out.println(Thread.currentThread().getName() + "---->> 执行了任务");
});

}

}

}

控制台输出结果

1
2
3
4
5
pool-1-thread-1---->> 执行了任务
pool-1-thread-3---->> 执行了任务
pool-1-thread-2---->> 执行了任务
pool-1-thread-1---->> 执行了任务
main---->> 执行了任务

通过控制台的输出,我们可以看到次策略没有通过线程池中的线程执行任务,而是直接调用任务的run()方法绕过线程池直接执行。

3.3.6 线程池关闭

关闭线程池的两个方法:

1
1、shutdown():停止接收新任务,已经提交的任务(包括正在跑的和队列中等待的),会继续执行完成。
1
2
3
4
5
2、shutdownNow():停止接收新任务,原来的任务停止执行。
(1)跟 shutdown() 一样,先停止接收新submit的任务;
(2)忽略队列里等待的任务;
(3)尝试将正在执行的任务interrupt中断;
(4)返回未执行的任务列表;
1
awaitTermination(60, TimeUnit.SECONDS) 等待所有任务完成,如果超过此时间仍有未完成的任务,则会返回false。

优雅关闭线程池的方式:

1、调用 ExecutorService 的 shutdown() 方法,这将禁止提交新任务,但已经提交的任务将继续执行,直到完成为止。

2、调用 awaitTermination(60, TimeUnit.SECONDS) 方法,等待所有任务完成,如果超过此时间仍有未完成的任务,则会返回false。

3、返回false,则调用ExecutorService 的 shutdownNow() 方法来中断所有正在执行的任务。

1
2
3
4
executor.shutdown();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
List<Runnable> runnables = executor.shutdownNow();
}

3.3.7 线程池创建方式选择

在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。

而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,但都有其局限性,不够灵活;使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。

image-20230308101138093

7 并发容器类

面试题:请举例说明集合类是不安全的。

4.1 ArrayList线程不安全

4.1.1 问题演示

我们一起先看看下面的程序吧,看你能看出什么问题吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class NotSafeDemo {

public static void main(String[] args) {

List<String> list = new ArrayList<>();
for (int i = 0; i < 20; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString());
System.out.println(list);
}, "线程" + i).start();
}

}
}

你觉得能每次都能正常输出吗?

答案是否定的,也许好几次运行程序都不会出错,但是偶尔就会遇上一次的。

会报一个ConcurrentModificationException的异常,中文名为:并发修改异常。

4.1.2 问题说明

JDK8的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 添加方法的源码
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}

// 进行输出的时候调用的是toString方法,ArrayList集合的toString方法的源码
public String toString() {
Iterator<E> it = iterator(); // 获取迭代器进行遍历操作
if (! it.hasNext())
return "[]";

StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) {
E e = it.next();
sb.append(e == this ? "(this Collection)" : e);
if (! it.hasNext())
return sb.append(']').toString();
sb.append(',').append(' ');
}
}

// 获取迭代器方法
public Iterator<E> iterator() {
return new Itr();
}

// ArrayList集合内部类的Itr迭代器
private class Itr implements Iterator<E> {

int expectedModCount = modCount; // 将实际修改的次数赋值给期望修改的次数

public E next() {
checkForComodification(); // 进行修改检查
int i = cursor;
if (i >= size)
throw new NoSuchElementException();
Object[] elementData = ArrayList.this.elementData;
if (i >= elementData.length)
throw new ConcurrentModificationException();
cursor = i + 1;
return (E) elementData[lastRet = i];
}

final void checkForComodification() {
if (modCount != expectedModCount) // 如果实际修改的次数和期望修改的次数不一致,那么此时抛出异常
throw new ConcurrentModificationException();
}

}

4.1.3 解决方案

(1)使用Vector来代替ArrayList

我们可以使用Vector来代替ArrayList,Vector是线程的安全的,改用Vector进行测试:

1
2
3
4
5
6
7
List<String> list = new Vector<>();
for (int i = 0; i < 20; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString());
System.out.println(list);
}, "线程" + i).start();
}

Vector的源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public synchronized boolean add(E e) {		// 添加元素方法加锁
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}

// 迭代器获取元素的方法
public E next() {
synchronized (Vector.this) { // 加锁方法
checkForComodification();
int i = cursor;
if (i >= elementCount)
throw new NoSuchElementException();
cursor = i + 1;
return elementData(lastRet = i);
}
}

(2)使用Collections.synchronizedList()

Collections 提供了方法 synchronizedList 保证 list 是同步线程安全的。

Collections 仅包含对集合进行操作或返回集合的静态方法,所以我们通常也称Collections 为集合的工具类。

1
2
3
4
5
6
7
List list = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < 20; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString());
System.out.println(list);
}, "线程" + i).start();
}

synchronizedList方法源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 将list集合变成线程安全的list集合
public static <T> List<T> synchronizedList(List<T> list) {
return (list instanceof RandomAccess ?
new SynchronizedRandomAccessList<>(list) :
new SynchronizedList<>(list));
}

// SynchronizedRandomAccessList集合集成体系结构
static class SynchronizedRandomAccessList<E> extends SynchronizedList<E> implements RandomAccess {}

// SynchronizedList集成体系结构
static class SynchronizedList<E> extends SynchronizedCollection<E> implements List<E> {}

// SynchronizedCollection中相关方法
// java.util.Collections.SynchronizedCollection
public boolean add(E e) {
synchronized (mutex) {return c.add(e);}
}

public String toString() {
synchronized (mutex) {return c.toString();}
}

(3)CopyOnWrite写时复制容器

4.2 CopyOnWrite容器

4.2.1 CopyOnWrite容器简介

CopyOnWrite容器(简称COW容器)即写时复制的容器。

通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后向新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器

更新操作开销大(add()、set()、remove()等等),因为要复制整个数组。

这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。

所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器

当然,这个时候会抛出来一个新的问题,也就是读线程数据不一致的问题。

从JDK1.5 开始 Java 并发包里提供了两个使用 CopyOnWrite 机制 实现 的 并发容器CopyOnWriteArrayList 和 CopyOnWriteArraySet。

CopyOnWriteArrayList类,在其内部维护了一个数组,因此本质上还是一个数组。

image-20230308132013222

再来看看CopyOnWriteArrayList的add方法,该方法使用了可重入锁,因此是线程安全的。

image-20230308132201768

4.2.2 代码改造

使用CopyOnWriteArrayList改造上述代码:

1
2
3
4
5
6
7
8
List list = new CopyOnWriteArrayList() ;

for (int i = 0; i < 20; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString());
System.out.println(list);
}, "线程" + i).start();
}

4.3 ConcurrentHashMap容器

4.3.1 HashMap线程不安全

多线程环境下,HashMap是线程不安全的。

为了保证数据的安全性我们可以使用 Hashtable,但是Hashtable的效率低。

基于以上两个原因我们可以使用 JDK1.5 以后所提供的 ConcurrentHashMap 。

案例1:演示HashMap线程不安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class HashMapNonThreadSafeExample {  

private static Map<Integer, Integer> map = new HashMap<>();
// private static Map<Integer, Integer> map = new Hashtable<>();

public static void main(String[] args) throws InterruptedException {
// 创建多个线程来同时操作这个HashMap
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
map.put(j, j);
}
}).start();
}

// 等待所有线程完成
Thread.sleep(2000);

// 打印HashMap的大小,但结果可能不是10000,因为HashMap在并发修改下可能丢失了一些数据
System.out.println("Map size: " + map.size());
}
}

案例2:演示Hashtable线程安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class HashMapNonThreadSafeExample {  

//private static Map<Integer, Integer> map = new HashMap<>();
private static Map<Integer, Integer> map = new Hashtable<>();

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
map.put(j, j);
}
}).start();
}

Thread.sleep(2000);

System.out.println("Map size: " + map.size());
}
}

4.3.2 Hashtable保证线程安全的原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Hashtable<K,V> extends Dictionary<K,V> implements Map<K,V>, Cloneable, java.io.Serializable {

// Entry数组,一个Entry就相当于一个元素
private transient Entry<?,?>[] table;

// Entry类的定义
private static class Entry<K,V> implements Map.Entry<K,V> {
final int hash; // 当前key的hash码值
final K key; // 键
V value; // 值
Entry<K,V> next; // 下一个节点
}

// 存储数据
public synchronized V put(K key, V value){...}

// 获取数据
public synchronized V get(Object key){...}

// 获取长度
public synchronized int size(){...}

...

}

对应的结构如下图所示

1573115712884

Hashtable 保证线程安全性的是使用方法全局锁进行实现的,在线程竞争激烈的情况下HashTable的效率非常低下,因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞状态。

如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,所以竞争越激烈效率越低。

4.3.3 ConcurrentHashMap线程安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class HashMapNonThreadSafeExample {  

private static Map<Integer, Integer> map = new ConcurrentHashMap<>();

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
map.put(j, j);
}
}).start();
}

Thread.sleep(2000);

System.out.println("Map size: " + map.size());
}
}

4.3.4 源码分析

由于ConcurrentHashMap在jdk1.7和jdk1.8的时候实现原理不太相同,因此需要分别来讲解一下两个不同版本的实现原理。

(1)jdk1.7版本

ConcurrentHashMap中的重要成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ConcurrentHashMap<K,V>extends AbstractMap<K,V> implements ConcurrentMap<K,V>,Serializable {

/**
* Segment翻译中文为"段" , 段数组对象
*/
final Segment<K,V>[] segments;

// Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色,将一个大的table分割成多个小的table进行加锁。
static final class Segment<K,V> extends ReentrantLock implements Serializable {

transient volatile int count; // Segment中元素的数量,由volatile修饰,支持内存可见性;
transient int modCount; // 对table的大小造成影响的操作的数量(比如put或者remove操作);
transient int threshold; // 扩容阈值;
transient volatile HashEntry<K,V>[] table; // 链表数组,数组中的每一个元素代表了一个链表的头部;
final float loadFactor; // 负载因子

}

// Segment中的元素是以HashEntry的形式存放在数组中的,其结构与普通HashMap的HashEntry基本一致,不同的是Segment的HashEntry,其value由volatile修饰,以支持内存可见性,即写操作对其他读线程即时可见。
static final class HashEntry<K,V> {
final int hash; // 当前节点key对应的哈希码值
final K key; // 存储键
volatile V value; // 存储值
volatile HashEntry<K,V> next; // 下一个节点
}

}

对应的结构如下图所示:

image-20230308134929316

简单来讲,就是ConcurrentHashMap 比 HashMap 多了一次hash过程:第1次hash定位到Segment,第2次hash定位到HashEntry,然后链表搜索找到指定节点。

在进行写操作时,只需锁住写元素所在的Segment即可(这种锁被称为分段锁),其他Segment无需加锁,从而产生锁竞争的概率大大减小,提高了并发读写的效率。该种实现方式的缺点是hash过程比普通的HashMap要长(因为需要进行两次hash操作)。

ConcurrentHashMap的put方法源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { 

public V put(K key, V value) {

// 定义一个Segment对象
Segment<K,V> s;

// 如果value的值为空,那么抛出异常
if (value == null) throw new NullPointerException();

// hash函数获取key的hashCode,然后做了一些处理
int hash = hash(key);

// 通过key的hashCode定位segment
int j = (hash >>> segmentShift) & segmentMask;

// 对定位的Segment进行判断,如果Segment为空,调用ensureSegment进行初始化操作(第一次hash定位)
if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);

// 调用Segment对象的put方法添加元素
return s.put(key, hash, value, false);
}

// Segment是一种可ReentrantLock,在ConcurrentHashMap里扮演锁的角色,将一个大的table分割成多个小的table进行加锁。
static final class Segment<K,V> extends ReentrantLock implements Serializable {

// 添加元素
final V put(K key, int hash, V value, boolean onlyIfAbsent) {

// 尝试对该段进行加锁,如果加锁失败,则调用scanAndLockForPut方法;在该方法中就要进行再次尝试或者进行自旋等待
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {

// 获取HashEntry数组对象
HashEntry<K,V>[] tab = table;

// 根据key的hashCode值计算索引(第二次hash定位)
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;)

// 若不为null
if (e != null) {
K k;

// 判读当前节点的key是否和链表头节点的key相同(依赖于hashCode方法和equals方法)
// 如果相同,值进行更新
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}

e = e.next;
} else { // 若头结点为null

// 将新节点添加到链表中
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first); //头插法
int c = count + 1;

// 如果超过阈值,则进行rehash操作
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}

return oldValue;
}

}

}

注意:源代码进行简单讲解即可(核心:进行了两次哈希定位以及加锁过程)

(2)jdk1.8版本

在JDK1.8中为了进一步优化ConcurrentHashMap的性能,去掉了Segment分段锁的设计。

在数据结构方面,则是跟HashMap一样,使用一个哈希表table数组(数组 + 链表 + 红黑树) ;

而线程安全方面是结合CAS机制 + 局部锁实现的,减低锁的粒度,提高性能。

同时在HashMap的基础上,对哈希表table数组和链表节点的value,next指针等使用volatile来修饰,从而实现线程可见性。

ConcurrentHashMap中的重要成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {

// Node数组
transient volatile Node<K,V>[] table;

// Node类的定义
static class Node<K,V> implements Map.Entry<K,V> {

final int hash; // 当前key的hashCode值
final K key; // 键
volatile V val; // 值
volatile Node<K,V> next; // 下一个节点

}

// TreeNode类的定义
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // 父节点
TreeNode<K,V> left; // 左子节点
TreeNode<K,V> right; // 右子节点
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red; // 节点的颜色状态
}

}

对应的结构如下图

1573115807375

当链表的长度超过8并且数组的长度超过64的时候,就会把链表变成树。

ConcurrentHashMap的put方法源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {

// 添加元素
public V put(K key, V value) {
return putVal(key, value, false);
}

// putVal方法定义
final V putVal(K key, V value, boolean onlyIfAbsent) {

// key为null直接抛出异常
if (key == null || value == null) throw new NullPointerException();

// 计算key所对应的hashCode值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;

// 哈希表如果不存在,那么此时初始化哈希表
if (tab == null || (n = tab.length) == 0)
tab = initTable();

//通过hash值计算key在table表中的索引,将其值赋值给变量i,然后根据索引找到对应的Node,如果Node为null,做出处理
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {

// 新增链表头结点,cas方式添加到哈希表table
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break;

}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;

// f为链表头结点,使用synchronized加锁
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;

// 节点已经存在,更新value即可
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}

// 该key对应的节点不存在,则新增节点并添加到该链表的末尾
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);//尾插法
break;
}

}

} else if (f instanceof TreeBin) { // 红黑树节点,则往该红黑树更新或添加该节点即可
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}

// 判断是否需要将链表转为红黑树
//当链表的长度超过8并且数组的长度超过64的时候,就会把链表变成树。
// 红黑树是一种自平衡的二叉搜索树。
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

// CAS算法的核心类
private static final sun.misc.Unsafe U;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
...
} catch (Exception e) {
throw new Error(e);
}
}

// 原子获取链表节点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

// CAS更新或新增链表节点
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

}

简单总结:

1、如果当前需要put的key对应的链表在哈希表table中还不存在,则调用casTabAt方法,基于CAS机制来实现添加该链表头结点到哈希表table中,避免该线程在添加该链表头结的时候,其他线程也在添加的并发问题;

2、如果需要添加的链表已经存在哈希表table中,则通过tabAt方法,获取当前最新的链表头结点f,使用synchronized关键字来同步多个线程对该链表的访问。在synchronized(f)同步块里面则是与HashMap一样遍历该链表,如果该key对应的链表节点已经存在,则更新,否则在链表的末尾新增该key对应的链表节点。

8 并发工具类

在JUC并发包里提供了几个非常有用的并发容器和并发工具类,供我们在多线程开发中进行使用。

1.1 CountDownLatch

CountDownLatch被称之为倒计数器CountDownLatch允许一个或多个线程等待其他线程完成操作以后,再执行当前线程。

比如我们在主线程需要开启2个其他线程,当其他的线程执行完毕以后我们再去执行主线程,针对这个需求我们就可以使用CountDownLatch来进行实现。

CountDownLatch中count down是倒着数数的意思;CountDownLatch是通过一个计数器来实现的,每当一个线程完成了自己的任务后,可以调用countDown()方法让计数器-1,当计数器到达0时,调用CountDownLatch的await()方法的线程阻塞状态解除,继续执行。

CountDownLatch的相关方法:

1
2
3
public CountDownLatch(int count)						// 初始化一个指定计数器的CountDownLatch对象
public void await() throws InterruptedException // 让当前线程等待
public void countDown() // 计数器进行减1

案例需求:使用CountDownLatch完成上述需求(我们在主线程需要开启2个其他线程,当其他的线程执行完毕以后我们再去执行主线程)

实现思路:在main方法中创建一个CountDownLatch对象,把这个对象作为作为参数传递给其他的两个任务线程

线程任务类1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CountDownLatchThread01 implements Runnable {

// CountDownLatch类型成员变量
private CountDownLatch countDownLatch ;
// 构造方法的作用:接收CountDownLatch对象
public CountDownLatchThread01(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch ;
}

@Override
public void run() {
try {
Thread.sleep(10000);
System.out.println("10秒以后执行了CountDownLatchThread01......");
} catch (InterruptedException e) {
e.printStackTrace();
}
// 调用CountDownLatch对象的countDown方法对计数器进行-1操作
countDownLatch.countDown();
}
}

线程任务类2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CountDownLatchThread02 implements Runnable {

// CountDownLatch类型成员变量
private CountDownLatch countDownLatch ;
// 构造方法的作用:接收CountDownLatch对象
public CountDownLatchThread02(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch ;
}

@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println("3秒以后执行了CountDownLatchThread02......");
} catch (InterruptedException e) {
e.printStackTrace();
}
// 调用CountDownLatch对象的countDown方法对计数器进行-1操作
countDownLatch.countDown();
}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class CountDownLatchDemo01 {

public static void main(String[] args) {

// 1. 创建一个CountDownLatch对象,CountDownLatch中的计数器的默认值就是2
CountDownLatch countDownLatch = new CountDownLatch(2) ;

// 2. 创建线程任务类对象,并且把这个CountDownLatch对象作为构造方法的参数进行传递
CountDownLatchThread01 countDownLatchThread01 = new CountDownLatchThread01(countDownLatch) ;

// 3. 创建线程任务类对象,并且把这个CountDownLatch对象作为构造方法的参数进行传递
CountDownLatchThread02 countDownLatchThread02 = new CountDownLatchThread02(countDownLatch) ;

// 4. 创建线程对象,并启动线程
Thread t1 = new Thread(countDownLatchThread01);
Thread t2 = new Thread(countDownLatchThread02);
t1.start();
t2.start();

// 5. 在主线程中调用 CountDownLatch中的await让主线程处于阻塞状态
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

// 6. 程序结束的输出
System.out.println("主线程执行了.... 程序结束了......");
}

}

控制台输出结果

1
2
3
3秒以后执行了CountDownLatchThread02......
10秒以后执行了CountDownLatchThread01......
主线程执行了.... 程序结束了......

CountDownLatchThread02线程先执行完毕,此时 计数器 减1;

CountDownLatchThread01线程执行完毕,此时计数器 减1;当计数器的值为0的时候,主线程阻塞状态解除,主线程向下执行。

1.2 CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。

它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

例如:公司召集5名员工开会,等5名员工都到了,会议开始。我们创建5个员工线程,1个开会线程,几乎同时启动,使用CyclicBarrier保证5名员工线程全部执行后,再执行开会线程。

CyclicBarrier的相关方法

1
2
3
4
// 用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景
public CyclicBarrier(int parties, Runnable barrierAction)
// 每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞
public int await()

案例演示:模拟员工开会

实现步骤:

1、创建一个员工线程类(EmployeeThread),该线程类中需要定义一个CyclicBarrier类型的形式参数

2、创建一个开会线程类(MettingThread)

3、测试类

  • 创建CyclicBarrier对象

  • 创建5个EmployeeThread线程对象,把第一步创建的CyclicBarrier对象作为构造方法参数传递过来

  • 启动5个员工线程

员工线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class EmployeeThread extends Thread {

// CyclicBarrier类型的成员变量
private CyclicBarrier cyclicBarrier ;
// 使用构造方法对CyclicBarrier进行初始化
public EmployeeThread(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier ;
}

@Override
public void run() {

try {

// 模拟开会人员的随机到场
Thread.sleep((int) (Math.random() * 1000));
System.out.println(Thread.currentThread().getName() + " 到了! ");

cyclicBarrier.await();

System.out.println(Thread.currentThread().getName() + " 发言! ");

} catch (Exception e) {
e.printStackTrace();
}

}

}

开会线程类

1
2
3
4
5
6
7
8
public class MettingThread extends Thread {

@Override
public void run() {
System.out.println("好了,人都到了,开始开会......");
}

}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CyclicBarrierDemo01 {

public static void main(String[] args) {

// 创建CyclicBarrier对象
CyclicBarrier cyclicBarrier = new CyclicBarrier(5 , new MettingThread()) ;

// 创建5个EmployeeThread线程对象,把第一步创建的CyclicBarrier对象作为构造方法参数传递过来
EmployeeThread thread1 = new EmployeeThread(cyclicBarrier) ;
EmployeeThread thread2 = new EmployeeThread(cyclicBarrier) ;
EmployeeThread thread3 = new EmployeeThread(cyclicBarrier) ;
EmployeeThread thread4 = new EmployeeThread(cyclicBarrier) ;
EmployeeThread thread5 = new EmployeeThread(cyclicBarrier) ;

// 启动5个员工线程
thread1.start();
thread2.start();
thread3.start();
thread4.start();
thread5.start();

}
}

1.3 Semaphore

Semaphore字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目

比如给定一个资源数目有限的资源池,假设资源数目为N,每一个线程均可获取一个资源,但是当资源分配完毕时,后来线程需要阻塞等待,直到前面已持有资源的线程释放资源之后才能继续。

Semaphore的常用方法

1
2
3
public Semaphore(int permits)						permits 表示许可线程的数量
public void acquire() throws InterruptedException 表示获取许可
public void release() 表示释放许可

案例:6辆车抢占3个车位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SemaphoreDemo {

public static void main(String[] args) {

// 初始化信号量,3个车位
Semaphore semaphore = new Semaphore(3);

// 6个线程,模拟6辆车
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
// 抢占一个停车位
semaphore.acquire();

System.out.println(Thread.currentThread().getName() + " 抢到了一个停车位!!");
TimeUnit.SECONDS.sleep(new Random().nextInt(10)); // 停一会儿车
System.out.println(Thread.currentThread().getName() + " 离开停车位!!");

// 开走,释放一个停车位
semaphore.release();

} catch (InterruptedException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}

9 volatile关键字

看程序说结果

分析如下程序,说出在控制台的输出结果。

Thread的子类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class VolatileThread extends Thread {

// 定义成员变量
private boolean flag = false ;
public boolean isFlag() { return flag;}

@Override
public void run() {

// 线程休眠1秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 将flag的值更改为true
this.flag = true ;
System.out.println("flag=" + flag);

}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class VolatileThreadDemo01 {

public static void main(String[] args) {

// 创建VolatileThread线程对象
VolatileThread volatileThread = new VolatileThread() ;
volatileThread.start();

// main方法
while(true) {
if(volatileThread.isFlag()) {
System.out.println("执行了");
}
}

}
}

控制台输出结果

1
flag=true

按照我们的分析,当我们把volatileThread线程启动起来以后,在volatileThread线程的run方法中,线程休眠1s,休眠一秒

以后那么flag的值应该为true。

此时我们在主线程中在调用volatileThread线程中的isFlag()方法,返回的就应该true,那么既然是true,那么就会在控制台输

出”执行了”;但是此时在控制台并没有输出该信息,那么这是为什么呢?要想知道原因,那么我们就需要学习一下JMM。

JMM(Java内存模型)

概述:JMM(Java Memory Model,Java内存模型),是java虚拟机规范中所定义的一种内存模型。

JMM描述了Java程序中各种变量(线程共享变量)的访问规则,以及在JVM中将变量存储到内存和从内存中读取变量这样的底层细节。

特点:

1、所有的线程共享变量都存储于主内存(计算机的RAM)。

2、每个线程拥有自己的工作内存,保留了被线程使用的变量工作副本

3、线程对变量的所有的操作(读/写)都必须在自己的工作内存中完成,而不能直接读写主内存中的变量。

4、不同线程之间也不能直接访问对方工作内存中的变量,线程间变量的值的传递需要通过主内存完成。

image-20230308144950989

问题分析

了解JMM后,再来分析一下之前程序产生问题的原因。

image-20230308145048482

产生问题的流程分析:

1、VolatileThread 线程 从主内存读取到数据放入其对应的工作内存

2、将工作内存中的 flag 的值更改为true,此时 flag 的值还没有回写主内存

3、 main 线程 从 主内存 读取 flag=false 并将其放入到自己的工作内存中。

4、VolatileThread 线程 将flag的值 写回到主内存,但是main函数里面的 while(true) 调用的是系统比较底层的代码,速度快,快到没有时间再去读取主内存中的值,所以while(true)读取到的值一直是false。(如果有一个时刻main线程从主内存中读取到了flag的最新值,那么if语句就可以执行,main线程何时从主内存中读取最新的值,我们无法控制)

我们可以让主线程执行慢一点,执行慢一点以后,在某一个时刻,可能就会读取到主内存中最新的flag的值,那么if语句就可以进行执行。

测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class VolatileThreadDemo02 {
public static void main(String[] args) throws InterruptedException {

// 创建VolatileThread线程对象
VolatileThread volatileThread = new VolatileThread() ;
volatileThread.start();

// main方法
while(true) {
if(volatileThread.isFlag()) {
System.out.println("执行了======");
}
// 让线程休眠100毫秒
TimeUnit.MILLISECONDS.sleep(100);
}
}
}

控制台输出结果

1
2
3
4
5
flag=true
执行了======
执行了======
执行了======
....

此时我们可以看到if语句已经执行了。当然我们在真实开发中可能不能使用这种方式来处理这个问题,那么这个问题应该怎么处理呢?我们就需要学习下一小节的内容。

问题处理

2.4.1 加锁

第一种处理方案,我们可以通过加锁的方式进行处理。

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class VolatileThreadDemo03 {

public static void main(String[] args) throws InterruptedException {

// 创建VolatileThread线程对象
VolatileThread volatileThread = new VolatileThread() ;
volatileThread.start();

// main方法
while(true) {
// 加锁进行问题处理
synchronized (volatileThread) {
if(volatileThread.isFlag()) {
System.out.println("执行了======");
}
}
}
}
}

控制台输出结果

1
2
3
4
5
flag=true
执行了======
执行了======
执行了======
....

工作原理说明

对上述代码加锁完毕以后,某一个线程执行该程序的过程如下:

(1)线程获得锁

(2)清空工作内存

(3)从主内存拷贝共享变量最新的值到工作内存成为副本

(4)执行代码

(5)如果有更新数据,则将修改后的副本的值刷新回主内存中

(6)线程释放锁

2.4.2 volatile关键字

第二种处理方案,我们可以通过volatile关键字来修饰flag变量。

线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class VolatileThread extends Thread {

// 定义成员变量
private volatile boolean flag = false ;
public boolean isFlag() { return flag;}

@Override
public void run() {

// 线程休眠1秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 将flag的值更改为true
this.flag = true ;
System.out.println("flag=" + flag);

}
}

控制台输出结果

1
2
3
4
5
flag=true
执行了======
执行了======
执行了======
....

工作原理说明

1573115138905

执行流程分析

1、VolatileThread线程从主内存读取到数据放入其对应的工作内存

2、将flag的值更改为true,但是这个时候flag的值还没有回写主内存

3、此时main线程读取到了flag的值并将其放入到自己的工作内存中,此时flag的值为false

4、VolatileThread线程将flag的值写到主内存

5、main线程工作内存中的flag变量副本失效

6、main线程再次使用flag时,main线程会从主内存读取最新的值,放入到工作内存中,然后在进行使用。

总结: volatile保证不同线程对共享变量操作的可见性,也就是说一个线程修改了volatile修饰的变量,当修改写回主内存时,另外一个线程可以立即看到最新的值。

10 并发编程三特性

可见性

可见性是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看到修改的值。

volatile关键字修饰的共享变量是可以保证可见性的。

原子性

3.2.1 原子性说明

概述:所谓的原子性是指在一次操作或者多次操作中,要么所有的操作全部都得到了执行并且不会受到任何因素的干扰而中断,要么所有的操作都不执行,多个操作是一个不可以分割的整体

比如:从张三的账户给李四的账户转1000元,这个动作将包含两个基本的操作:从张三的账户扣除1000元,给李四的账户增加1000元。这两个操作必须符合原子性的要求,要么都成功要么都失败。

3.2.2 看程序说结果

分析如下程序的执行结果

线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class VolatileAtomicThread implements Runnable {

// 定义一个int类型的变量
private int count = 0 ;

@Override
public void run() {
// 对该变量进行++操作,100次
for(int x = 0 ; x < 100 ; x++) {
count++ ;
System.out.println("count =========>>>> " + count);
}
}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
public class VolatileAtomicThreadDemo {
public static void main(String[] args) {
// 创建VolatileAtomicThread对象
VolatileAtomicThread volatileAtomicThread = new VolatileAtomicThread() ;

// 开启100个线程对count进行++操作
for(int x = 0 ; x < 100 ; x++) {
new Thread(volatileAtomicThread).start();
}
}
}

程序分析:我们在主线程中通过for循环启动了100个线程,每一个线程都会对VolatileAtomicThread类中的count加100次。

期望的正确结果是10000,但是真正的执行结果和我们分析的是否一样呢?运行程序(多运行几次),查看控制台输出结果

1
2
3
4
....
count =========>>>> 9997
count =========>>>> 9998
count =========>>>> 9999

通过控制台的输出,我们可以看到最终count的结果可能并不是10000。接下来我们就来分析一下问题产生的原因。

3.2.3 问题分析说明

以上问题主要是发生在count++操作上:

count++操作包含3个步骤:

1、从主内存中读取数据到工作内存

2、对工作内存中的数据进行++操作

3、将工作内存中的数据写回到主内存

count++操作不是一个原子性操作,也就是说在某一个时刻对某一个操作的执行,有可能被其他的线程打断。

image-20230308151121238

产生问题的执行流程分析:

1、假设此时count的值是100,线程A需要对改变量进行自增1的操作,首先它需要从主内存中读取变量count的值。由于CPU的切换关系,此时CPU的执行权被切换到了B线程。A线程就处于就绪状态,B线程处于运行状态。

2、线程B也需要从主内存中读取count变量的值,由于线程A没有对count值做任何修改,因此此时B读取到的数据还是100

3、线程B工作内存中对count执行了+1操作,但是未刷新之主内存中

4、此时CPU的执行权切换到了A线程上,由于此时线程B没有将工作内存中的数据刷新到主内存,因此A线程工作内存中的变量值还是100,没有失效。A线程对工作内存中的数据进行了+1操作。

5、线程B将101写入到主内存

6、线程A将101写入到主内存

虽然计算了2次,但是只对A进行了1次修改。

3.2.4 volatile不保证原子性

我们刚才说到了volatile在多线程环境下只保证了共享变量在多个线程间的可见性,但是不保证原子性

那么接下来我们就来做一个测试。测试的思想,就是使用volatile修饰count。

线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class VolatileAtomicThread implements Runnable {

// 定义一个int类型的变量,并且使用volatile修饰
private volatile int count = 0 ;

@Override
public void run() {

// 对该变量进行++操作,100次
for(int x = 0 ; x < 100 ; x++) {
count++ ;
System.out.println("count =========>>>> " + count);
}

}

}

控制台输出结果(需要运行多次)

1
2
3
4
...
count =========>>>> 9997
count =========>>>> 9998
count =========>>>> 9999

通过控制台结果的输出,我们可以看到程序还是会出现问题。因此也就证明volatile关键字是不保证原子性的。

3.2.5 问题处理

解决方案:

1、加锁

2、使用原子类

我们可以给count++操作添加锁,那么count++操作就是临界区中的代码,临界区中的代码一次只能被一个线程去执行,所以count++就变成了原子操作。

线程任务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class VolatileAtomicThread implements Runnable {

// 定义一个int类型的变量,
private int count = 0 ;

// 定义一个Object类型的变量,该变量将作为同步代码块的锁
private Object obj = new Object();

@Override
public void run() {
// 对该变量进行++操作,100次
for(int x = 0 ; x < 100 ; x++) {
synchronized (obj){
count++ ;
System.out.println("count =========>>>> " + count);
}
}
}
}

控制台输出结果

1
2
3
count =========>>>> 9998
count =========>>>> 9999
count =========>>>> 10000

有序性

1、什么是指令重排?

有序性:代码编写顺序和代码执行顺序一致。

计算机在执行程序时,为了提高性能,编译器和处理器有时会对指令重排

如下代码:

1
2
3
4
5
6
public void mySort() {
int x = 11; // 1
int y = 12; // 2
x = x + 5; // 3
y = x * x; // 4
}

按照正常的顺序进行执行,那么执行顺序应该是:1 2 3 4 。

但是如果发生了指令重排,那么此时的执行顺序可能是: 1 3 2 4 或 2 1 3 4 ;

但是肯定不会出现:4 3 2 1这种顺序,因为处理器在进行重排时候,必须考虑到指令之间的数据依赖性。

2、解决方案:使用volatile修饰变量,可以保证指令的有序性。

11 原子类与CAS算法

AtomicInteger

简介

概述:java 从 JDK1.5 开始提供了 java.util.concurrent.atomic包(简称Atomic包)。

这个包中的 原子操作类 提供了一种 线程安全 地更新一个变量的方式,且简单高效。

因为变量的类型有很多种,Atomic包里一共提供了13个类,属于4种类型的原子更新方式,分别是原子更新基本类型、原子更新数组、原子更新引用和原子更新属性(字段)。

本次我们只讲解使用原子的方式更新基本类型,使用原子的方式更新基本类型Atomic包提供了以下3个类:

AtomicBoolean: 原子更新布尔类型

AtomicInteger: 原子更新整型

AtomicLong: 原子更新长整型

以上3个类提供的方法几乎一模一样,所以本节仅以AtomicInteger为例进行讲解,AtomicInteger的常用方法如下:

1
2
3
4
5
6
7
8
public AtomicInteger():	   				初始化一个默认值为0的原子型Integer
public AtomicInteger(int initialValue): 初始化一个指定值的原子型Integer

int get(): 获取值
int getAndIncrement(): 以原子方式将当前值加1,注意,这里返回的是自增前的值。 i++
int incrementAndGet(): 以原子方式将当前值加1,注意,这里返回的是自增后的值。 ++i
int addAndGet(int data): 以原子方式将输入数值与实例中值(AtomicInteger里的value)相加,并返回结果。
int getAndSet(int value): 以原子方式设置为newValue的值,并返回旧值。

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class AtomicIntegerDemo01 {

// 原子型Integer
public static void main(String[] args) {

// 构造方法
// public AtomicInteger():初始化一个默认值为0的原子型Integer
// AtomicInteger atomicInteger = new AtomicInteger() ;
// System.out.println(atomicInteger);

// public AtomicInteger(int initialValue): 初始化一个指定值的原子型Integer
AtomicInteger atomicInteger = new AtomicInteger(5) ;
System.out.println(atomicInteger);

// 获取值
System.out.println(atomicInteger.get());

// 以原子方式将当前值加1,这里返回的是自增前的值
System.out.println(atomicInteger.getAndIncrement());
System.out.println(atomicInteger.get());

// 以原子方式将当前值加1,这里返回的是自增后的值
System.out.println(atomicInteger.incrementAndGet());

// 以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果
System.out.println(atomicInteger.addAndGet(8));

// 以原子方式设置为newValue的值,并返回旧值
System.out.println(atomicInteger.getAndSet(20));
System.out.println(atomicInteger.get());

}

}

问题解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class VolatileAtomicThread implements Runnable {

// 定义一个int类型的变量
private AtomicInteger atomicInteger = new AtomicInteger() ;

@Override
public void run() {
// 对该变量进行++操作,100次
for(int x = 0 ; x < 100 ; x++) {
int i = atomicInteger.incrementAndGet();
System.out.println("count =========>>>> " + i);
}
}
}

控制台输出结果

1
2
3
4
...
count =========>>>> 9998
count =========>>>> 9999
count =========>>>> 10000

通过控制台的执行结果,我们可以看到最终得到的结果就是10000,因此也就证明AtomicInteger所提供的方法是原子性操作方法。

CAS算法

AtomicInteger原理: 自旋锁 + CAS算法

CAS简介

CAS的全称是: Compare And Swap(比较再交换),是一种对内存中的共享数据进行操作的一种特殊指令

CAS可以将 read-modify-write 转换为原子操作,这个原子操作直接由CPU保证。

CAS有3个操作数:内存值V(内存中的一个变量),旧的预期值A,要修改的新值B。

当且仅当旧预期值A和内存值V相同时,将内存值V修改为B并返回true,否则什么都不做,并返回false。

举例说明:

  1. 在内存值V,存储着值为10的变量。

1571817059527

  1. 此时线程1想要把变量的值增加1。对线程1来说,旧的预期值 A = 10 ,要修改的新值 B = 11。

1571817085047

  1. 在线程1要提交更新之前,另一个线程2抢先一步,把内存值V中的变量值率先更新成了11。

1571817628904

  1. 线程1开始提交更新,首先进行A和内存值V的实际值比较(Compare),发现A不等于V的值,提交失败。

1571818176635

  1. 线程1重新获取内存值V作为当前A的值,并重新计算想要修改的新值。此时对线程1来说,A = 11,B = 12。这个重新尝试的过程被称为自旋

1571818465276

  1. 这一次比较幸运,没有其他线程改变V的值。线程1进行Compare,发现A和V的值是相等的。

1571818597998

  1. 线程1进行SWAP,把内存V的值替换为B,也就是12。

1571818747880

举例说明:这好比春节的时候抢火车票,下手快的会抢先买到票,而下手慢的可以再次尝试,直到买到票。

4.2.2 AtomicInteger源码分析

那么接下来我们就来查看一下AtomicInteger类中incrementAndGet方法的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class AtomicInteger extends Number implements java.io.Serializable {

// cas算法的实现类
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();

// 表示变量值在内存中的偏移量地址,unsafe类就是根据内存偏移量地址获取数据值。
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");
private volatile int value;

// 以原子方式将当前值加1,这里返回的是自增后的值
public final int incrementAndGet() {

/* this表示当前AtomicInteger对象 ,1表示要增加的值 */
return U.getAndAddInt(this, VALUE, 1) + 1; // 调用Unsafe类中的getAndAddInt方法

}

}

UnSafe类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final class Unsafe {

// Unsafe类中的getAndAddInt方法
public final int getAndAddInt(Object o, long offset, int delta) {

int v;

// do...while就是自旋操作,当CAS成功以后,循环结束
do {
// 获取AtomicInteger类中所封装的int类型的值,就相当于旧的预期值A
v = getIntVolatile(o, offset);

// 调用本类的weakCompareAndSetInt方法实现比较在交换; o: AtomicInteger对象, v: 相当于旧的预期值A, v + delta:新值B
} while (!weakCompareAndSetInt(o, offset, v, v + delta));

return v;
}

// Unsafe类中的weakCompareAndSetInt方法
public final boolean weakCompareAndSetInt(Object o, long offset, int expected, int x) {
return compareAndSetInt(o, offset, expected, x);
}

// 本地方法,调用CPU指令实现CAS
public final native boolean compareAndSetInt(Object o, long offset, int expected, int x);

}

4.2.3 CAS缺点

1、在并发量比较高的情况下,如果反复尝试更新某个变量,却又一直更新不成功,会给CPU带来较大的压力。

2、ABA问题:当变量从A修改为B再修改回A时,变量值等于期望值A,但是无法判断是否修改,CAS操作在ABA修改后依然成功。

3、CAS机制只保证一个变量的原子性操作,而不能保证整个代码块的原子性。

4.2.4 CAS与synchronized

CAS和synchronized都可以保证多线程环境下共享数据的安全性。那么他们两者有什么区别?

synchronized是从悲观的角度出发:

总是假设最坏的情况,每次去拿数据的时候都认为别人修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。因此synchronized我们也将其称之为悲观锁。jdk中的ReentrantLock也是一种悲观锁。

CAS是从乐观的角度出发:

总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据。CAS这种机制我们也可以将其称之为乐观锁。

12 AQS(抽象队列同步器)-了解

AQS简介

在Java的java.util.concurrent(简称JUC)包中,AbstractQueuedSynchronizer(AQS,抽象队列同步器)是一个核心的基础类,它为依赖于先进先出(FIFO)等待队列的阻塞锁和相关的同步器(如SemaphoreCountDownLatchReentrantLockReentrantReadWriteLock等)提供了一个框架。

AQS是一个基于状态(通常使用一个volatile的int类型的变量state表示)的同步器,它使用了一个内部的FIFO队列来完成等待线程的管理。这个队列被称为CLH队列,用于在获取不到资源的线程之间进行排队。

image-20230309192701029

作用:

  • 同步状态管理:AQS使用一个volatile的int类型的变量state来维护同步状态。state为0表示没有线程占用资源,非0表示有线程占用资源。
  • 队列管理:AQS内部维护了一个等待队列,当线程获取不到资源时,会加入到这个队列中进行等待,直到被唤醒或超时。
  • 独占模式和共享模式:AQS支持两种模式的资源访问控制:独占模式和共享模式。独占模式意味着只有一个线程能访问执行;共享模式允许多个线程同时访问执行。

AQS同步队列的head节点是一个空节点,没有记录线程node.thread=null,其后继节点才是实质性的有线程的节点。

工作模式

独占模式

原理介绍:

独占模式下时,其他线程试图获取该锁将无法取得成功,只有一个线程能执行,如ReentrantLock采用独占模式。

工作流程如下所示:

image-20230309193600787

线程获取锁的流程:

1、线程A获取锁,将state的值由0设置为1

2、在A没有释放锁之前,线程B也来获取锁,线程B获取到state的值为1,表示锁被占用。线程B创建Node节点放入队尾,并且阻塞线程B

3、同理线程C获取到的state的值为1,表示锁被占用。线程C创建Node节点放入队尾,并且阻塞线程C。

线程释放锁的流程:

1、线程A执行完毕以后,将state的值由1设置为0

2、唤醒下一个Node B节点,然后删除线程A节点

以ReentrantLock公平锁为例,获取锁的相关源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// java.util.concurrent.locks.ReentrantLock.FairSync
final void lock() {
acquire(1);
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
// 获取锁,如果获取失败将构建链表的Node节点
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}


protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && // 检查队列是否有其它线程在等待
compareAndSetState(0, acquires)) { // 使用CAS对state进行+1
setExclusiveOwnerThread(current); // 如果获取锁成功,把当前线程设置为有锁线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 可重入
int nextc = c + acquires; // 每重入一次stat累加acquires
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}


hasQueuedPredecessors()方法的作用:通过检查等待队列中是否有其他线程在等待来判断当前线程是否可以立即获取资源。

共享模式

原理说明:

共享模式下,多个线程可以成功获取到锁,多个线程可同时执行。如:Semaphore、CountDownLatch。

工作流程如下所示:

image-20230309194321464

获取锁的流程:

1、线程A判断state的值是否大于0,如果否,则创建Node节点,将其加入到阻塞队列末尾。

2、如果大于0相当于获取到了锁,使用CAS算法对state进行-1

释放锁的流程

1、执行业务操作,业务操作完毕以后对state的值进行+1操作

2、唤醒下一个Node B节点,然后删除线程A节点

源码分析:Semaphore