阻塞队列BlockingQueue是如何唤醒等待的线程的呢?
Reset‘ 2024-08-02 14:31:18 阅读 76
在 Java 中,<code>ConcurrentLinkedQueue、ArrayBlockingQueue
、LinkedBlockingQueue
等并发队列通常用于生产者-消费者模式。这些队列使用锁和条件(Condition
)来实现线程间的通信。
以下是基于 ArrayBlockingQueue
或 LinkedBlockingQueue
的生产者-消费者模式中,消费者如何知道队列中有元素可用的基本原理:
锁(Lock): 队列内部维护了一个锁,通常是 ReentrantLock
。这个锁用于同步对队列的访问,确保在任何时刻只有一个线程可以修改队列。
条件(Condition): 锁关联了两个条件,通常称为 notEmpty
和 notFull
。notEmpty
条件用于通知消费者队列非空,而 notFull
条件用于通知生产者队列未满。
消费者等待: 当队列为空时,消费者线程会调用条件 notEmpty
的 await()
方法。这将导致消费者线程释放锁并等待,直到另一个线程(生产者)在队列中插入一个元素并调用 signal()
或 signalAll()
方法来唤醒等待的消费者线程。
生产者通知: 当生产者向队列中添加一个元素时,它会调用 notEmpty
条件的 signal()
方法来唤醒一个(或所有)等待的消费者线程。
以下是一个简化的例子:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notEmpty = lock.newCondition();
final Condition notFull = lock.newCondition();
final Object[] items = new Object[100]; // 假设缓冲区大小为100
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) // 如果队列已满,则等待
notFull.await();
items[putptr] = x; // 在这里插入元素
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal(); // 通知消费者队列非空
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) // 如果队列为空,则等待
notEmpty.await();
Object x = items[takeptr]; // 在这里取出元素
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal(); // 通知生产者队列未满
return x;
} finally {
lock.unlock();
}
}
}
在这个例子中,消费者在 notEmpty
条件上等待,而生产者在 notFull
条件上等待。当队列状态改变时(例如,生产者添加了一个元素或消费者取出一个元素),相应的条件会被信号唤醒,这样等待的线程就可以重新获取锁并继续执行。
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。