在多线程编程过程中,为了业务解耦和架构设计,经常会使用并发容器用于存储多线程间的共享数据,这样不仅可以保证线程安全,还可以简化各个线程操作。例如在“生产者-消费者”问题中,会使用阻塞队列(BlockingQueue)作为数据容器,关于 BlockingQueue 可以。为了加深对阻塞队列的理解,唯一的方式是对其实验原理进行理解,这篇文章就主要来看看 ArrayBlockingQueue 和 LinkedBlockingQueue 的实现原理。
阻塞队列最核心的功能是,能够可阻塞式的插入和删除队列元素。当前队列为空时,会阻塞消费数据的线程,直至队列非空时,通知被阻塞的线程;当队列满时,会阻塞插入数据的线程,直至队列未满时,通知插入数据的线程(生产者线程)。那么,多线程中消息通知机制最常用的是 lock 的 condition 机制,关于 condition 可以。那么 ArrayBlockingQueue 的实现是不是也会采用 Condition 的通知机制呢?下面来看看。
ArrayBlockingQueue 的主要属性如下:
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/*
Concurrency control uses the classic two-condition algorithm
found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
从源码中可以看出 ArrayBlockingQueue 内部是采用数组进行数据存储的(属性items
),为了保证线程安全,采用的是ReentrantLock lock
,为了保证可阻塞式的插入删除数据利用的是 Condition,当获取数据的消费者线程被阻塞时会将该线程放置到 notEmpty 等待队列中,当插入数据的生产者线程被阻塞时,会将该线程放置到 notFull 等待队列中。而 notEmpty 和 notFull 等中要属性在构造方法中进行创建:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
复制代码
接下来,主要看看可阻塞式的 put 和 take 方法是怎样实现的。
put(E e)
方法源码如下:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果当前队列已满,将线程移入到notFull等待队列中
while (count == items.length)
notFull.await();
//满足插入数据的要求,直接进行入队操作
enqueue(e);
} finally {
lock.unlock();
}
}
复制代码
该方法的逻辑很简单,当队列已满时(count == items.length
)将线程移入到 notFull 等待队列中,如果当前满足插入数据的条件,就可以直接调用enqueue(e)
插入数据元素。enqueue 方法源码为:
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
//插入数据
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
//通知消费者线程,当前队列中有数据可供消费
notEmpty.signal();
}
复制代码
enqueue 方法的逻辑同样也很简单,先完成插入数据,即往数组中添加数据(items[putIndex] = x
),然后通知被阻塞的消费者线程,当前队列中有数据可供消费(notEmpty.signal()
)。
take 方法源码如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列为空,没有数据,将消费者线程移入等待队列中
while (count == 0)
notEmpty.await();
//获取数据
return dequeue();
} finally {
lock.unlock();
}
}
复制代码
take 方法也主要做了两步:1. 如果当前队列为空的话,则将获取数据的消费者线程移入到等待队列中;2. 若队列不为空则获取数据,即完成出队操作dequeue
。dequeue 方法源码为:
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获取数据
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//通知被阻塞的生产者线程
notFull.signal();
return x;
}
复制代码
dequeue 方法也主要做了两件事情:1. 获取队列中的数据,即获取数组中的数据元素((E) items[takeIndex]
);2. 通知 notFull 等待队列中的线程,使其由等待队列移入到同步队列中,使其能够有机会获得 lock,并执行完成功退出。
从以上分析,可以看出 put 和 take 方法主要是通过 condition 的通知机制来完成可阻塞式的插入数据和获取数据。在理解 ArrayBlockingQueue 后再去理解 LinkedBlockingQueue 就很容易了。
LinkedBlockingQueue 是用链表实现的有界阻塞队列,当构造对象时为指定队列大小时,队列默认大小为Integer.MAX_VALUE
。从它的构造方法可以看出:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
复制代码
LinkedBlockingQueue 的主要属性有:
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
Head of linked list.
Invariant: head.item == null */ transient Node<E> head;
/**
Tail of linked list.
Invariant: last.next == null */ private transient Node<E> last;
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock();
复制代码
/** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
可以看出与 ArrayBlockingQueue 主要的区别是,LinkedBlockingQueue 在插入数据和删除数据时分别是由两个不同的 lock(takeLock
和putLock
)来控制线程安全的,因此,也由这两个 lock 生成了两个对应的 condition(notEmpty
和notFull
)来实现可阻塞的插入和删除数据。并且,采用了链表的数据结构来实现队列,Node 结点的定义为:
static class Node<E> {
E item;
/** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } 复制代码
复制代码
}
接下来,我们也同样来看看 put 方法和 take 方法的实现。
put 方法源码为:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
//如果队列已满,则阻塞当前线程,将其移入等待队列
while (count.get() == capacity) {
notFull.await();
}
//入队操作,插入数据
enqueue(node);
c = count.getAndIncrement();
//若队列满足插入数据的条件,则通知被阻塞的生产者线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
复制代码
put 方法的逻辑也同样很容易理解,可见注释。基本上和 ArrayBlockingQueue 的 put 方法一样。take 方法的源码如下:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//当前队列为空,则阻塞当前线程,将其移入到等待队列中,直至满足条件
while (count.get() == 0) {
notEmpty.await();
}
//移除队头元素,获取数据
x = dequeue();
c = count.getAndDecrement();
//如果当前满足移除元素的条件,则通知被阻塞的消费者线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
复制代码
take 方法的主要逻辑请见于注释,也很容易理解。
相同点:ArrayBlockingQueue 和 LinkedBlockingQueue 都是通过 condition 通知机制来实现可阻塞式插入和删除元素,并满足线程安全的特性;
不同点:1. ArrayBlockingQueue 底层是采用的数组进行实现,而 LinkedBlockingQueue 则是采用链表数据结构;
我们先来看看LinkedBlockingQueue的继承体系。
LinkedBlockingQueue实现了序列化接口 Serializable,因此它有序列化的特性。 LinkedBlockingQueue实现了BlockingQueue接口,BlockingQueue继承了Queue接口,因此它拥有了队列Queue相关方法的操作。
类图来自Java并发编程之美
LinkedBlockingQueue主要特性:
//容量范围,默认值为 Integer.MAX_VALUE
private final int capacity;
//当前队列元素个数
private final AtomicInteger count = new AtomicInteger();
//头结点
transient Node<E> head;
//尾节点
private transient Node<E> last;
//take, poll等方法的可重入锁
private final ReentrantLock takeLock = new ReentrantLock();
//当队列为空时,执行出队操作(比如take )的线程会被放入这个条件队列进行等待
private final Condition notEmpty = takeLock.newCondition();
//put, offer等方法的可重入锁
private final ReentrantLock putLock = new ReentrantLock();
//当队列满时, 执行进队操作( 比如put)的线程会被放入这个条件队列进行等待
private final Condition notFull = putLock.newCondition();
复制代码
LinkedBlockingQueue有三个构造函数:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
复制代码
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
//设置队列大小
this.capacity = capacity;
//new一个null节点,head、tail节点指向该节点
last = head = new Node<E>(null);
}
复制代码
public LinkedBlockingQueue(Collection<? extends E> c) {
//调用指定容量的构造器
this(Integer.MAX_VALUE);
//获取put, offer的可重入锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
int n = 0;
//循环向队列中添加集合中的元素
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
//将队列的last节点指向该节点
enqueue(new Node<E>(e));
++n;
}
//更新容量值
count.set(n);
} finally {
//释放锁
putLock.unlock();
}
}
复制代码
static class Node<E> {
// 当前节点的元素值
E item;
// 下一个节点的索引
Node<E> next;
//节点构造器
Node(E x) {
item = x;
}
}
复制代码
LinkedBlockingQueue的节点符合单向链表的数据结构要求:
item表示当前节点的元素值,next表示指向下一节点的指针
入队方法,其实就是向队列的尾部插入一个元素。如果元素为空,抛出空指针异常。如果队列已满,则丢弃当前元素,返回false,它是非阻塞的。如果队列空闲则插入成功返回true。
offer源代码
offer方法源码如下:
public boolean offer(E e) {
//为空直接抛空指针
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//如果当前队列满了的话,直接返回false
if (count.get() == capacity)
return false;
int c = -1;
//构造新节点
Node<E> node = new Node<E>(e);
获取put独占锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//判断队列是否已满
if (count.get() < capacity) {
//进队列
enqueue(node);
//递增元素计数
c = count.getAndIncrement();
//如果元素入队,还有空闲,则唤醒notFull条件队列里被阻塞的线程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//释放锁
putLock.unlock();
}
//如果容量为0,则
if (c == 0)
//激活 notEmpty 的条件队列,唤醒被阻塞的线程
signalNotEmpty();
return c >= 0;
}
复制代码
enqueue方法源码如下:
private void enqueue(Node<E> node) {
//从尾节点加进去
last = last.next = node;
}
复制代码
为了形象生动,我们用一张图来看看往队列里依次放入元素A和元素B。图片参考来源
signalNotEmpty方法源码如下
private void signalNotEmpty() {
//获取take独占锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//唤醒notEmpty条件队列里被阻塞的线程
notEmpty.signal();
} finally {
//释放锁
takeLock.unlock();
}
}
复制代码
offer执行流程图
基本流程:
put方法也是向队列尾部插入一个元素。如果元素为null,抛出空指针异常。如果队列己满则阻塞当前线程,直到队列有空闲插入成功为止。如果队列空闲则插入成功,直接返回。如果在阻塞时被其他线程设置了中断标志, 则被阻塞线程会抛出 InterruptedException 异常而返回。
put源代码
public void put(E e) throws InterruptedException {
为空直接抛空指针异常
if (e == null) throw new NullPointerException();
int c = -1;
// 构造新节点
Node<E> node = new Node<E>(e);
//获取putLock独占锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//获取独占锁,它跟lock的区别,是可以被中断
putLock.lockInterruptibly();
try {
//队列已满线程挂起等待
while (count.get() == capacity) {
notFull.await();
}
//进队列
enqueue(node);
//递增元素计数
c = count.getAndIncrement();
//如果元素入队,还有空闲,则唤醒notFull条件队列里被阻塞的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
//如果容量为0,则
if (c == 0)
//激活 notEmpty 的条件队列,唤醒被阻塞的线程
signalNotEmpty();
}
复制代码
put流程图
基本流程:
从队列头部获取并移除一个元素, 如果队列为空则返回 null, 该方法是不阻塞的。
poll源代码
poll方法源代码
public E poll() {
final AtomicInteger count = this.count;
//如果队列为空,返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
//获取takeLock独占锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//如果队列不为空,则出队,并递减计数
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
容量大于1,则激活 notEmpty 的条件队列,唤醒被阻塞的线程
if (c > 1)
notEmpty.signal();
}
} finally {
//释放锁
takeLock.unlock();
}
if (c == capacity)
//唤醒notFull条件队列里被阻塞的线程
signalNotFull();
return x;
}
复制代码
dequeue方法源代码
//出队列
private E dequeue() {
//获取head节点
Node<E> h = head;
//获取到head节点指向的下一个节点
Node<E> first = h.next;
//head节点原来指向的节点的next指向自己,等待下次gc回收
h.next = h; // help GC
// head节点指向新的节点
head = first;
// 获取到新的head节点的item值
E x = first.item;
// 新head节点的item值设置为null
first.item = null;
return x;
}
复制代码
为了形象生动,我们用一张图来描述出队过程。图片参考来源
signalNotFull方法源码
private void signalNotFull() {
//获取put独占锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
唤醒notFull条件队列里被阻塞的线程
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
}
复制代码
poll流程图
基本流程:
获取队列头部元素但是不从队列里面移除它,如果队列为空则返回 null。 该方法是不 阻塞的。
peek源代码
public E peek() {
//队列容量为0,返回null
if (count.get() == 0)
return null;
//获取takeLock独占锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
//判断first是否为null,如果是直接返回
if (first == null)
return null;
else
return first.item;
} finally {
//释放锁
takeLock.unlock();
}
}
复制代码
peek流程图
基本流程:
获取当前队列头部元素并从队列里面移除它。 如果队列为空则阻塞当前线程直到队列 不为空然后返回元素,如果在阻塞时被其他线程设置了中断标志, 则被阻塞线程会抛出 InterruptedException 异常而返回。
take源代码
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
//获取takeLock独占锁
final ReentrantLock takeLock = this.takeLock;
//获取独占锁,它跟lock的区别,是可以被中断
takeLock.lockInterruptibly();
try {
//当前队列为空,则阻塞挂起
while (count.get() == 0) {
notEmpty.await();
}
//)出队并递减计数
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
//激活 notEmpty 的条件队列,唤醒被阻塞的线程
notEmpty.signal();
} finally {
//释放锁
takeLock.unlock();
}
if (c == capacity)
//激活 notFull 的条件队列,唤醒被阻塞的线程
signalNotFull();
return x;
}
复制代码
take流程图
基本流程:
删除队列里面指定的元素,有则删除并返回 true,没有则返回 false。
remove方法源代码
public boolean remove(Object o) {
//为空直接返回false
if (o == null) return false;
//双重加锁
fullyLock();
try {
//边历队列,找到元素则删除并返回true
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
//执行unlink操作
unlink(p, trail);
return true;
}
}
return false;
} finally {
//解锁
fullyUnlock();
}
}
复制代码
双重加锁,fullyLock方法源代码
void fullyLock() {
//putLock独占锁加锁
putLock.lock();
//takeLock独占锁加锁
takeLock.lock();
}
复制代码
unlink方法源代码
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
//如果当前队列满 ,则删除后,也不忘记唤醒等待的线程
if (count.getAndDecrement() == capacity)
notFull.signal();
}
复制代码
fullyUnlock方法源代码
void fullyUnlock() {
//与双重加锁顺序相反,先解takeLock独占锁
takeLock.unlock();
putLock.unlock();
}
复制代码
remove流程图
基本流程
获取当前队列元素个数。
public int size() {
return count.get();
}
复制代码
由于进行出队、入队操作时的 count是加了锁的,所以结果相比ConcurrentLinkedQueue 的 size 方法比较准确。
Java并发编程之美中,有一张图惟妙惟肖描述了它,如下图: