Java 多线程(五)—— 阻塞队列、wait、notify

熵减玩家 2024-10-23 17:35:01 阅读 63

wait

wait 和 notify 都是 Object 类提供的方法,也就是说 Java 任意对象都可以使用 这两个方法。

在这里插入图片描述

首先 wait 会抛出 InterruptedException 这个异常,说明这个方法可以被 interrupt 给唤醒。

然后我们是不能直接使用 wait 方法的,否则还会抛出下面的异常:

在这里插入图片描述

IllegalMonitorStateException 是非法的锁状态,monitor 是监视器 也就是我们 的 synchronized ,后面 current thread is not owner ,是说当前的线程不是这把锁的拥有者,说明我们要想使用 wait 方法 就需要先获得这把锁

<code> public static void main(String[] args) throws InterruptedException { -- -->

Object locker = new Object();

synchronized(locker) {

locker.wait();

}

}

wait 的作用是让当前的线程先释放这把锁,然后进入 waiting(死等) 状态,当然 wait 也和 sleep 一样也可以设置时间,当该线程在规定时间内没有被唤醒,就会自动唤醒,重新争夺锁。这时候有时间限制的 wait 的线程状态就是 time_waiting 状态。

我们来看一下wait 的流程图:

在这里插入图片描述

wait 和 sleep 的区别:

首先在 synchronized 下, wait 是会释放掉锁的,但是sleep 不会,sleep 会抱着锁一起睡

wait 的使用必须搭配锁, sleep 不需要


当我们使用 wait 的时候一般是搭配条件判断的,这里建议 将条件判断换成 while 循环语句,这也是Java 标准库里写明的:

在这里插入图片描述

翻译:等待的最佳方法是在等待调用周围的 while 循环中检查正在等待的条件,如下面的例子所示。这种方法可以避免由随机唤醒引起的问题。

这个方案其实是操作系统原生的 api 就建议过 wait 搭配 while,Java 只是继承前人的意志,当然具体你要使用 if 还是使用 while 还是要看具体问题具体分析的。

因为 wait 可能会被 interrupt() 给唤醒,如果使用 if 作为判断,此时就可能会存在 wait 被提前唤醒的 情况。

notify

notify 是唤醒线程,在操作系统原生的api 中 调用 notify 的线程是可以不用获得对应对象的锁的,但是在Java中规定了 notify 的使用和 wait 是一样都需要先获得这把锁对象

wait 和 notify 针对的是同一个对象,notify 才能生效, wait 的线程才能被唤醒

<code>public class Main { -- -->

public static void main(String[] args) {

Object locker = new Object();

Thread t1 = new Thread(() -> {

synchronized(locker) {

try {

System.out.println("t1 wait 之前");

locker.wait();

System.out.println("t1 wait 之后");

} catch (InterruptedException e) {

throw new RuntimeException(e);

}

}

});

Thread t2 = new Thread(() -> {

synchronized (locker) {

System.out.println("t2 开始唤醒 t1");

locker.notify();

}

});

t1.start();

t2.start();

}

在这里插入图片描述

一个 notify 只能唤醒一个 wait ,如果有多个 wait 的话,一个 notify 会随机唤醒其中一个 wait 线程

如果你想唤醒全部的 wait 线程,可以使用 notifyAll()

如果notify 在 wait 之前就已经运行了,那 后面的 wait 是不会接收到 前面的 notify 信号的,所以我们要确保 notify 在 wait 之后执行,避免程序卡住。


wait 和 notify 的使用不仅仅是为了线程等待和线程唤醒这些基本的操作,其实还有一个作用,就是让线程释放掉锁,也就是当线程此时不满足条件无法进行工作的时候,应该将锁资源释放掉,给其他线程用用,这也避免了线程饥饿问题(线程迟迟没能得到 CPU 资源而运行),进而提高资源的利用率

阻塞队列

阻塞队列是另一种设计模式,是为了实现生产者和消费者模型的。

在这里插入图片描述

生产者负责生产,生产出来的产品会被消费者进行消耗,而我们的阻塞队列其实就是连接生产者和消费者的管道,阻塞队列是由容量的,当生产者生产速度过快,消费者的消费速度跟不上生产的速度,阻塞队列在某一个时刻就会发生阻塞,即当丢列已满的情况下,生产者不能继续生产往队列里面填充产品。

同理,当消费者的消费速度过快,生产者跟不上其消费速度,阻塞队列就会在某一个时刻为空,此时消费者就不能继续从队列里获取产品了,即当丢列为空的时候,消费者会处于阻塞状态。

阻塞队列的特性

当队列为空,尝试进行出队列的操作,这时候进行出队列的操作的线程会发生阻塞,直到其他线程添加其他元素未知

当队列已满,尝试进行入队列的操作,这时候进行入队列操作的线程会发生阻塞,直到其他线程从队列取走元素为止


在实际开发过程中,我们如果要使用阻塞队列,很有可能会将一个阻塞队列放在一个服务器(机器 / 集群)上进行部署。

原先不使用阻塞队列的时候,A 服务器和 B 服务器直接进行交互,很有可能发生一件事情:当 A 服务器发出过量的请求的时候,B 服务器就可能无法处理这么多请求而发生崩溃,这个例子大家请参考学校的教务系统之选课环节。

在这里插入图片描述

当我们使用阻塞队列,并且把它部署到一个集群上时:

在这里插入图片描述

A 服务器和 B服务器就会与队列进行交互,从而实现 削峰填谷,即使A 服务器 有一大波请求涌来,这时候这一波数据并不会直接被 B 服务器接收,而是被 队列给拦下来了,B 服务器依旧可以按照自己的节奏从 队列里获取请求然后进行响应,这样 B 服务器就没这么容易崩溃。

不仅如此,还能实现 代码的 解耦合,这样的部署,就会使 A 和 B 的代码耦合性变低。

当然也有一个不好的地方,就是你多使用了一个机器来部署阻塞队列,这时候会增加机器的复杂性,生产环境会变得复杂,管理难度上升。并且引入队列,A 和 B服务器不是直接交互,效率也会受到影响

阻塞队列的优点:解耦合,削峰填谷

缺点:加机器的复杂性,生产环境会变得复杂,管理难度上升

效率受到影响

使用

在 Java 中给我们提供了一个阻塞队列的 类 <code>BlockingQueue

在这里插入图片描述

我们可以从上面直到这是一个接口,继承 Queue,说明我们可以使用 queue 的方法,但是注意 这是一个阻塞队列,Java 给我们提供了带有阻塞功能的 方法 <code>put() 和 take()


由于 BlockingQueue 本身是一个接口,所以无法直接实例化,Java 给我们提供了下面的实例化方式,我们可以创建基于数组实现的阻塞队列、基于链表实现的阻塞队列、也可以创建带有优先级的阻塞队列、还可以创建双端队列

在这里插入图片描述

在这里插入图片描述


当我们使用 put 和 take 方法的时候记得抛出 InterruptedException 异常

<code>import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.BlockingQueue;

public class Demo2 { -- -->

public static void main(String[] args) throws InterruptedException {

BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);

queue.put(10);

queue.take();

}

}

模拟实现

现在我们来模拟实现一个基于数组实现的阻塞循环队列。

class MyBlockingQueue {

private int[] elem;

private int size;

private int head;

private int tail;

Object locker = new Object();

public MyBlockingQueue(int capacity) {

elem = new int[capacity];

}

public void put(int x) throws InterruptedException {

while(size == elem.length) {

synchronized (locker) {

locker.wait();

}

}

elem[tail] = x;

tail++;

if(tail == elem.length) {

tail = 0;

}

size++;

synchronized (locker) {

locker.notify();

}

}

public int take() throws InterruptedException {

while(size == 0) {

synchronized (locker) {

locker.wait();

}

}

int x = elem[head];

head++;

if(head == elem.length) {

head = 0;

}

size--;

synchronized (locker) {

locker.notify();

}

return x;

}

}

这里着重介绍 wait 和 notify 的作用,当我们进行 put 操作的时候,如果发现队列已满,我们需要进入线程等待状态,等到 take 拿走元素之后就进行 notify 操作唤醒 put ;同理,当进行 take 操作的时候,如果发现队列为空,要进入等待状态,等到 put 放入元素之后进行 notify 操作唤醒 take

在这里插入图片描述



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。