0%

线程通信

线程通信

线程之间为什么要通信?

线程通信的目的是为了更好的协作,线程无论是交替式执行,还是接力式执行,都需要进行通信告知。java线程通信可以分为以下四种方式:

  1. volatile
  2. 等待/通知机制
  3. join方式
  4. threadLocal

本博客主要介绍“等待/通知机制”,等待通知机制是基于(线程共享的资源的)wait和notify方法来实现的。

方法名 作用
wait() 线程释放当前的锁,让出CPU,进入等待状态,直到其他线程通知(notify/notifyAll)。与sleep不同,会释放锁
wait(long timeout) 指定等待的毫秒数
notify() 唤醒一个处于等待状态的线程随机选择线程唤醒,但是和实际运行环境相关),然后继续往下执行,直到执行完synchronized 代码块中的代码或是中途遇到wait() ,才释放锁。
notifyAll() 唤醒同一个对象上所有调用wait()方法的线程(优先级高的线程优先调度),然后继续往下执行…

注意:wait()使当前线程阻塞,前提是先获得锁,一般配合synchronized 关键字使用,即一般在synchronized 同步代码块里使用 wait()、notify/notifyAll()方法,否则会抛出异常IllegalMonitorStateException。notify/notifyAll() 的执行只是唤醒沉睡的线程,而不会立即释放锁,锁的释放要看代码块的具体执行情况。所以在编程中,尽量在使用了notify/notifyAll() 后立即退出临界区,以唤醒其他线程让其获得锁。

管程法

管程是什么?

管程可以看做一个软件模块,它是将共享的变量对共享变量的操作方法封装起来,形成一个具有一定接口的功能模块,进程(或线程)可以调用管程来实现进程级别的并发控制。详细介绍见:传送门

并发协作模型“生产者/消费者模式” –> 管程法

  • 生产者:负责生产数据的模块(可能是方法、对象、线程、进程)
  • 消费者:负责处理数据的模块(可能是方法、对象、线程、进程)
  • 缓冲区:消费者不能直接使用生产者的数据,他们之间有一个“缓冲区”

生产者将生产好的数据放入缓冲区,消费者从缓冲区拿出数据。生产者和消费者共享一个资源,并且生产者和消费者之间相互依赖,互为条件。

  • 对于生产者,没有生产产品之前,要通知消费者等待,而生产了产品之后,又需要马上通知消费者消费
  • 对于消费者,在消费之后,要通知生产者已经结束消费,需要生产新的产品以供消费

代码演示

资源

创建一个Chicken类,作为产品(资源

1
2
3
4
5
6
7
//产品
class Chicken{
int id;//产品编号
public Chicken(int id){
this.id = id;
}
}

缓冲区

创建一个SynContainer类,作为缓冲区,存放资源。前边的管程思想就体现在这里。Chicken为线程共享资源,方法push与pop实现对共享资源的操作,将共享的资源对共享资源的操作方法封装起来,形成一个具有一定接口的功能模块,线程可以调用管程来实现线程级别的并发控制。

注意:在synchronized 同步代码块里使用 wait()、notify/notifyAll()方法。通过判断Chicken(资源)的数量来决定是否让当前线程释放对象锁,进入等待状态(条件的判断,建议放到while循环中而不是if)。尽量在使用了notify/notifyAll() 后立即退出临界区(synchronized范围)。

条件的判断,建议放到while循环中而不是if,主要是基于以下考虑:wait方法让线程进入等待状态,当线程被notify/notifyAll方法唤醒后,应当再次判断是否符合条件,如果使用if则不会再次进行判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//缓冲区
class SynContainer{

//需要一个容器,假设大小为10
Chicken[] chickens = new Chicken[10];
//容器计数器
int count = 0;

//生产者放入产品
public synchronized void push(Chicken chicken){
//synchronized方法,下边this关键字对应的是SynContainer实例,而不是调用该方法的线程。是被操作的资源使用wait()、notify/notifyAll() 方法
//如果容器满了,就需要等待消费者消费
while (count == chickens.length){
try {
this.wait();//线程 释放当前的对象锁,让出CPU,进入等待状态,等待notify或notifyAll将其唤醒
} catch (InterruptedException e) {
e.printStackTrace();
}
}


//如果没有满,就需要将产品放入容器中
chickens[count] = chicken;
System.out.println("生产了第"+ ++count +"只鸡");

//通知消费者消费

this.notifyAll();//唤醒所有等待的线程,但是还没释放对象锁,会继续执行完当前synchronized代码或下边又遇到wait() 方法才会释放锁。
}

//消费者消费产品
public synchronized Chicken pop(){
//判断能否消费
while (count == 0) {
//等待生产者生产,消费者等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//如果可以消费
System.out.println("消费了第"+ count-- +"只鸡");
Chicken chicken = chickens[count];

//吃完了,通知生产者生产
this.notifyAll();

return chicken;
}

}

生产者

创建一个Producer类,作为生产者,生产资源。继承Thread类实现多线程,调用缓冲区的push方法操作共享资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//生产者
class Producer extends Thread{
SynContainer container;

public Producer(SynContainer container){
this.container = container;
}

@Override
public void run() {//生产
for (int i = 0; i < 20; i++) {
container.push(new Chicken(i));
}
}
}

消费者

创建一个Consumer类,作为消费者,消费(处理)资源。继承Thread类实现多线程,调用缓冲区的pop方法操作共享资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//消费者
class Consumer extends Thread{
SynContainer container;

public Consumer(SynContainer container){
this.container = container;
}

@Override
public void run() {//消费
for (int i = 0; i < 20; i++) {
container.pop();
}
}
}

结果

创建一个TestPC类,用于测试:

1
2
3
4
5
6
7
8
9
//测试:生产者消费者模型 --> 利用缓冲区解决:管程法
public class TestPC {
public static void main(String[] args) {
SynContainer container = new SynContainer();

new Producer(container).start();
new Consumer(container).start();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
生产了第1只鸡
生产了第2只鸡
生产了第3只鸡
生产了第4只鸡
生产了第5只鸡
生产了第6只鸡
生产了第7只鸡
生产了第8只鸡
生产了第9只鸡
生产了第10只鸡
消费了第10只鸡
消费了第9只鸡
消费了第8只鸡
消费了第7只鸡
消费了第6只鸡
消费了第5只鸡
消费了第4只鸡
消费了第3只鸡
消费了第2只鸡
消费了第1只鸡
生产了第1只鸡
生产了第2只鸡
生产了第3只鸡
生产了第4只鸡
生产了第5只鸡
生产了第6只鸡
生产了第7只鸡
生产了第8只鸡
生产了第9只鸡
生产了第10只鸡
消费了第10只鸡
消费了第9只鸡
消费了第8只鸡
消费了第7只鸡
消费了第6只鸡
消费了第5只鸡
消费了第4只鸡
消费了第3只鸡
消费了第2只鸡
消费了第1只鸡

Process finished with exit code 0

信号灯法

在上边的管程法中,有一个缓冲区角色,将共享的资源对共享资源的操作方法封装起来,形成一个具有一定接口的功能模块,线程可以调用管程来实现线程级别的并发控制。通过判断缓冲区容量的大小来实现通知/等待机制,信号灯法使用一个标志位来实现通知/等待机制。

代码演示

资源

创建一个TV类,作为产品(资源)。在本例中,原缓冲区角色合并到了产品中(资源)。方法play与watch实现对共享资源的操作。

注意:在synchronized 同步代码块里使用 wait()、notify/notifyAll()方法。通过判断标志位flag的布尔值来决定是否让当前线程释放对象锁,进入等待状态(条件的判断,建议放到while循环中而不是if)。尽量在使用了notify/notifyAll() 后立即退出临界区(synchronized范围)。

条件的判断,建议放到while循环中而不是if,主要是基于以下考虑:wait方法让线程进入等待状态,当线程被notify/notifyAll方法唤醒后,应当再次判断是否符合条件,如果使用if则不会再次进行判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//产品 --> 节目
class TV{
//演员录制节目,观众等待
//观众观看,演员等待(休息)
String voice;//录制的节目
boolean flag = false;//是否有录制好的节目

//录制节目
public synchronized void play(String voice){
while (flag){//已有录制好的节目时不需要录制节目,可以休息
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

//演员录制节目
System.out.println("演员录制了:"+voice);
//录制好之后,通知观众观看
this.notifyAll();
this.voice = voice;
this.flag = !flag;
}
//观看节目
public synchronized void watch(){
while (!flag){//还没有录制好的电视节目,等待演员录制好
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//观众观看节目
System.out.println("观看了:"+voice+"\n");
//节目观看完了,通知演员去录制(催更)
this.notifyAll();
this.flag = !flag;
}

}

生产者

创建一个Player类,作为生产者,生产资源。继承Thread类实现多线程,调用缓冲区的play方法操作共享资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//生产者 --> 演员
class Player extends Thread{
TV tv;
public Player(TV tv){
this.tv = tv;
}

@Override
public void run() {
for (int i = 0; i < 5; i++) {
if (i%2 == 0){
tv.play("快乐大本营");
}else {
tv.play("天天向上");
}
}
}
}

消费者

创建一个Watcher类,作为消费者,消费(处理)资源。继承Thread类实现多线程,调用缓冲区的watch方法操作共享资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//消费者 --> 公众
class Watcher extends Thread{
TV tv;
public Watcher(TV tv){
this.tv = tv;
}

@Override
public void run() {
for (int i = 0; i < 5; i++) {
if (i%2 == 0){
tv.watch();
}else {
tv.watch();
}
}
}

}

结果

创建一个TestPC02类,用于测试:

1
2
3
4
5
6
7
8
9
//测试:生产者消费者模型 --> 利用标志位解决:信号灯法
public class TestPC02 {
public static void main(String[] args) {
TV tv = new TV();

new Player(tv).start();
new Watcher(tv).start();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
演员录制了:快乐大本营
观看了:快乐大本营

演员录制了:天天向上
观看了:天天向上

演员录制了:快乐大本营
观看了:快乐大本营

演员录制了:天天向上
观看了:天天向上

演员录制了:快乐大本营
观看了:快乐大本营


Process finished with exit code 0

参考资料:三月烟雨飘摇的南方-博客遇见狂神说-B站有意思的程序员-知乎

若图片不能正常显示,请在浏览器中打开

欢迎关注我的其它发布渠道