阻塞队列BlockingQueue是如何唤醒等待的线程的呢?

Reset‘ 2024-08-02 14:31:18 阅读 76

在 Java 中,<code>ConcurrentLinkedQueue、ArrayBlockingQueueLinkedBlockingQueue 等并发队列通常用于生产者-消费者模式。这些队列使用锁和条件(Condition)来实现线程间的通信。

以下是基于 ArrayBlockingQueue 或 LinkedBlockingQueue 的生产者-消费者模式中,消费者如何知道队列中有元素可用的基本原理:

锁(Lock): 队列内部维护了一个锁,通常是 ReentrantLock。这个锁用于同步对队列的访问,确保在任何时刻只有一个线程可以修改队列。

条件(Condition): 锁关联了两个条件,通常称为 notEmpty 和 notFullnotEmpty 条件用于通知消费者队列非空,而 notFull 条件用于通知生产者队列未满。

消费者等待: 当队列为空时,消费者线程会调用条件 notEmpty 的 await() 方法。这将导致消费者线程释放锁并等待,直到另一个线程(生产者)在队列中插入一个元素并调用 signal() 或 signalAll() 方法来唤醒等待的消费者线程。

生产者通知: 当生产者向队列中添加一个元素时,它会调用 notEmpty 条件的 signal() 方法来唤醒一个(或所有)等待的消费者线程。

以下是一个简化的例子:

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer {

final Lock lock = new ReentrantLock();

final Condition notEmpty = lock.newCondition();

final Condition notFull = lock.newCondition();

final Object[] items = new Object[100]; // 假设缓冲区大小为100

int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {

lock.lock();

try {

while (count == items.length) // 如果队列已满,则等待

notFull.await();

items[putptr] = x; // 在这里插入元素

if (++putptr == items.length) putptr = 0;

++count;

notEmpty.signal(); // 通知消费者队列非空

} finally {

lock.unlock();

}

}

public Object take() throws InterruptedException {

lock.lock();

try {

while (count == 0) // 如果队列为空,则等待

notEmpty.await();

Object x = items[takeptr]; // 在这里取出元素

if (++takeptr == items.length) takeptr = 0;

--count;

notFull.signal(); // 通知生产者队列未满

return x;

} finally {

lock.unlock();

}

}

}

在这个例子中,消费者在 notEmpty 条件上等待,而生产者在 notFull 条件上等待。当队列状态改变时(例如,生产者添加了一个元素或消费者取出一个元素),相应的条件会被信号唤醒,这样等待的线程就可以重新获取锁并继续执行。



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。