您的当前位置:首页正文

LinkedBlockingQueue阻塞队列解析

2024-11-30 来源:个人技术集锦

并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解

1. ArrayBlockingQueue 简介

在多线程编程过程中,为了业务解耦和架构设计,经常会使用并发容器用于存储多线程间的共享数据,这样不仅可以保证线程安全,还可以简化各个线程操作。例如在“生产者-消费者”问题中,会使用阻塞队列(BlockingQueue)作为数据容器,关于 BlockingQueue 可以。为了加深对阻塞队列的理解,唯一的方式是对其实验原理进行理解,这篇文章就主要来看看 ArrayBlockingQueue 和 LinkedBlockingQueue 的实现原理。

2. ArrayBlockingQueue 实现原理

阻塞队列最核心的功能是,能够可阻塞式的插入和删除队列元素。当前队列为空时,会阻塞消费数据的线程,直至队列非空时,通知被阻塞的线程;当队列满时,会阻塞插入数据的线程,直至队列未满时,通知插入数据的线程(生产者线程)。那么,多线程中消息通知机制最常用的是 lock 的 condition 机制,关于 condition 可以。那么 ArrayBlockingQueue 的实现是不是也会采用 Condition 的通知机制呢?下面来看看。

2.1 ArrayBlockingQueue 的主要属性

ArrayBlockingQueue 的主要属性如下:

/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/*

Concurrency control uses the classic two-condition algorithm
found in any textbook.
*/

/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;


 

/** Condition for waiting puts */

private final Condition notFull;

从源码中可以看出 ArrayBlockingQueue 内部是采用数组进行数据存储的(属性items),为了保证线程安全,采用的是ReentrantLock lock,为了保证可阻塞式的插入删除数据利用的是 Condition,当获取数据的消费者线程被阻塞时会将该线程放置到 notEmpty 等待队列中,当插入数据的生产者线程被阻塞时,会将该线程放置到 notFull 等待队列中。而 notEmpty 和 notFull 等中要属性在构造方法中进行创建:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
复制代码

接下来,主要看看可阻塞式的 put 和 take 方法是怎样实现的。

2.2 put 方法详解

put(E e)方法源码如下:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
		//如果当前队列已满,将线程移入到notFull等待队列中
        while (count == items.length)
            notFull.await();
		//满足插入数据的要求,直接进行入队操作
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
复制代码

该方法的逻辑很简单,当队列已满时(count == items.length)将线程移入到 notFull 等待队列中,如果当前满足插入数据的条件,就可以直接调用enqueue(e)插入数据元素。enqueue 方法源码为:

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
	//插入数据
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
	//通知消费者线程,当前队列中有数据可供消费
    notEmpty.signal();
}
复制代码

enqueue 方法的逻辑同样也很简单,先完成插入数据,即往数组中添加数据(items[putIndex] = x),然后通知被阻塞的消费者线程,当前队列中有数据可供消费(notEmpty.signal())。

2.3 take 方法详解

take 方法源码如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
		//如果队列为空,没有数据,将消费者线程移入等待队列中
        while (count == 0)
            notEmpty.await();
		//获取数据
        return dequeue();
    } finally {
        lock.unlock();
    }
}
复制代码

take 方法也主要做了两步:1. 如果当前队列为空的话,则将获取数据的消费者线程移入到等待队列中;2. 若队列不为空则获取数据,即完成出队操作dequeue。dequeue 方法源码为:

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
	//获取数据
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //通知被阻塞的生产者线程
	notFull.signal();
    return x;
}
复制代码

dequeue 方法也主要做了两件事情:1. 获取队列中的数据,即获取数组中的数据元素((E) items[takeIndex]);2. 通知 notFull 等待队列中的线程,使其由等待队列移入到同步队列中,使其能够有机会获得 lock,并执行完成功退出。

从以上分析,可以看出 put 和 take 方法主要是通过 condition 的通知机制来完成可阻塞式的插入数据和获取数据。在理解 ArrayBlockingQueue 后再去理解 LinkedBlockingQueue 就很容易了。

3. LinkedBlockingQueue 实现原理

LinkedBlockingQueue 是用链表实现的有界阻塞队列,当构造对象时为指定队列大小时,队列默认大小为Integer.MAX_VALUE。从它的构造方法可以看出:

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
复制代码

3.1 LinkedBlockingQueue 的主要属性

LinkedBlockingQueue 的主要属性有:

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**

  • Head of linked list.
  • Invariant: head.item == null */ transient Node<E> head;

/**

  • Tail of linked list.
  • Invariant: last.next == null */ private transient Node<E> last;

/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock();

复制代码

/** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();

可以看出与 ArrayBlockingQueue 主要的区别是,LinkedBlockingQueue 在插入数据和删除数据时分别是由两个不同的 lock(takeLockputLock)来控制线程安全的,因此,也由这两个 lock 生成了两个对应的 condition(notEmptynotFull)来实现可阻塞的插入和删除数据。并且,采用了链表的数据结构来实现队列,Node 结点的定义为:

static class Node<E> {
    E item;
/** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node&lt;E&gt; next; Node(E x) { item = x; } 复制代码

复制代码

}

接下来,我们也同样来看看 put 方法和 take 方法的实现。

3.2 put 方法详解

put 方法源码为:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
		//如果队列已满,则阻塞当前线程,将其移入等待队列
        while (count.get() == capacity) {
            notFull.await();
        }
		//入队操作,插入数据
        enqueue(node);
        c = count.getAndIncrement();
		//若队列满足插入数据的条件,则通知被阻塞的生产者线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
复制代码

put 方法的逻辑也同样很容易理解,可见注释。基本上和 ArrayBlockingQueue 的 put 方法一样。take 方法的源码如下:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
		//当前队列为空,则阻塞当前线程,将其移入到等待队列中,直至满足条件
        while (count.get() == 0) {
            notEmpty.await();
        }
		//移除队头元素,获取数据
        x = dequeue();
        c = count.getAndDecrement();
        //如果当前满足移除元素的条件,则通知被阻塞的消费者线程
		if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
复制代码

take 方法的主要逻辑请见于注释,也很容易理解。

4. ArrayBlockingQueue 与 LinkedBlockingQueue 的比较

相同点:ArrayBlockingQueue 和 LinkedBlockingQueue 都是通过 condition 通知机制来实现可阻塞式插入和删除元素,并满足线程安全的特性;

不同点:1. ArrayBlockingQueue 底层是采用的数组进行实现,而 LinkedBlockingQueue 则是采用链表数据结构;

LinkedBlockingQueue的概述

LinkedBlockingQueue的继承体系图

我们先来看看LinkedBlockingQueue的继承体系。

LinkedBlockingQueue实现了序列化接口 Serializable,因此它有序列化的特性。 LinkedBlockingQueue实现了BlockingQueue接口,BlockingQueue继承了Queue接口,因此它拥有了队列Queue相关方法的操作。

LinkedBlockingQueue的类图

类图来自Java并发编程之美

LinkedBlockingQueue主要特性:

  1. LinkedBlockingQueue底层数据结构为单向链表。
  2. LinkedBlockingQueue 有两个Node节点,一个head节点,一个tail节点,只能从head取元素,从tail添加元素。
  3. LinkedBlockingQueue 容量是一个原子变量count,它的初始值为0。
  4. LinkedBlockingQueue有两把ReentrantLock的锁,一把控制元素入队,一把控制出队,保证在并发情况下的线程安全。
  5. LinkedBlockingQueue 有两个条件变量,notEmpty 和 notFull。它们内部均有一个条件队列,存放着出入队列被阻塞的线程,这其实是生产者-消费者模型。

LinkedBlockingQueue的重要成员变量

//容量范围,默认值为 Integer.MAX_VALUE
private final int capacity;

//当前队列元素个数
private final AtomicInteger count = new AtomicInteger();

//头结点
transient Node<E> head;

//尾节点
private transient Node<E> last;

//take, poll等方法的可重入锁
private final ReentrantLock takeLock = new ReentrantLock();

//当队列为空时,执行出队操作(比如take )的线程会被放入这个条件队列进行等待
private final Condition notEmpty = takeLock.newCondition();

//put, offer等方法的可重入锁
private final ReentrantLock putLock = new ReentrantLock();

//当队列满时, 执行进队操作( 比如put)的线程会被放入这个条件队列进行等待
private final Condition notFull = putLock.newCondition();
复制代码

LinkedBlockingQueue的构造函数

LinkedBlockingQueue有三个构造函数:

  1. 无参构造函数,容量为Integer.MAX
public LinkedBlockingQueue() {
   this(Integer.MAX_VALUE);
}
复制代码
  1. 设置指定容量的构造器
public LinkedBlockingQueue(int capacity) {
  if (capacity <= 0) throw new IllegalArgumentException();
   //设置队列大小
   this.capacity = capacity;
   //new一个null节点,head、tail节点指向该节点
   last = head = new Node<E>(null);
}
复制代码
  1. 传入集合,如果调用该构造器,容量默认也是Integer.MAX_VALUE
 public LinkedBlockingQueue(Collection<? extends E> c) {
        //调用指定容量的构造器
        this(Integer.MAX_VALUE);
        //获取put, offer的可重入锁
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); 
        try {
            int n = 0;
            //循环向队列中添加集合中的元素
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                //将队列的last节点指向该节点
                enqueue(new Node<E>(e));
                ++n;
            }
            //更新容量值
            count.set(n);
        } finally {
            //释放锁
            putLock.unlock();
        }
    }
复制代码

LinkedBlockingQueue底层Node类

Node源码

static class Node<E> {
    // 当前节点的元素值
    E item;
    // 下一个节点的索引
    Node<E> next;
    //节点构造器
    Node(E x) { 
     item = x;
   }
 }
复制代码

LinkedBlockingQueue的节点符合单向链表的数据结构要求:

  • 一个成员变量为当前节点的元素值
  • 一个成员变量是下一节点的索引
  • 构造方法的唯一参数节点元素值。

Node节点图

item表示当前节点的元素值,next表示指向下一节点的指针

LinkedBlockingQueue常用操作

offer操作

入队方法,其实就是向队列的尾部插入一个元素。如果元素为空,抛出空指针异常。如果队列已满,则丢弃当前元素,返回false,它是非阻塞的。如果队列空闲则插入成功返回true。

offer源代码

offer方法源码如下:

   public boolean offer(E e) {
        //为空直接抛空指针
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        //如果当前队列满了的话,直接返回false
        if (count.get() == capacity)
            return false;
        int c = -1;
        //构造新节点
        Node<E> node = new Node<E>(e);
        获取put独占锁
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            //判断队列是否已满
            if (count.get() < capacity) {
                //进队列
                enqueue(node);
                //递增元素计数
                c = count.getAndIncrement();
                //如果元素入队,还有空闲,则唤醒notFull条件队列里被阻塞的线程
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            //释放锁
            putLock.unlock();
        }
        //如果容量为0,则
        if (c == 0)
            //激活 notEmpty 的条件队列,唤醒被阻塞的线程
            signalNotEmpty();
        return c >= 0;
    }
复制代码

enqueue方法源码如下:

private void enqueue(Node<E> node) {
 //从尾节点加进去
 last = last.next = node;
 }
复制代码

为了形象生动,我们用一张图来看看往队列里依次放入元素A和元素B。图片参考来源

signalNotEmpty方法源码如下

private void signalNotEmpty() {
    //获取take独占锁
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
       //唤醒notEmpty条件队列里被阻塞的线程
       notEmpty.signal();
     } finally {
       //释放锁
       takeLock.unlock();
        }
    }
复制代码

offer执行流程图

基本流程:

  • 判断元素是否为空,如果是,就抛出空指针异常。
  • 判读队列是否已满,如果是,添加失败,返回false。
  • 如果队列没满,构造Node节点,上锁。
  • 判断队列是否已满,如果队列没满,Node节点在队尾加入队列待。
  • 加入队列后,判断队列是否还有空闲,如果是,唤醒notFull的阻塞线程。
  • 释放完锁后,判断容量是否为空,如果是,唤醒notEmpty的阻塞线程。

put操作

put方法也是向队列尾部插入一个元素。如果元素为null,抛出空指针异常。如果队列己满则阻塞当前线程,直到队列有空闲插入成功为止。如果队列空闲则插入成功,直接返回。如果在阻塞时被其他线程设置了中断标志, 则被阻塞线程会抛出 InterruptedException 异常而返回。

put源代码

  public void put(E e) throws InterruptedException {
        为空直接抛空指针异常
        if (e == null) throw new NullPointerException();
        int c = -1;
        // 构造新节点
        Node<E> node = new Node<E>(e);
        //获取putLock独占锁
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //获取独占锁,它跟lock的区别,是可以被中断
        putLock.lockInterruptibly();
        try {
            //队列已满线程挂起等待
            while (count.get() == capacity) {
                notFull.await();
            }
            //进队列
            enqueue(node);
            //递增元素计数
            c = count.getAndIncrement();
            //如果元素入队,还有空闲,则唤醒notFull条件队列里被阻塞的线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //释放锁
            putLock.unlock();
        }
        //如果容量为0,则
        if (c == 0)
            //激活 notEmpty 的条件队列,唤醒被阻塞的线程
            signalNotEmpty();
    }
复制代码

put流程图

基本流程:

  • 判断元素是否为空,如果是就抛出空指针异常。
  • 构造Node节点,上锁(可中断锁)
  • 判断队列是否已满,如果是,阻塞当前线程,一直等待。
  • 如果队列没满,Node节点在队尾加入队列。
  • 加入队列后,判断队列是否还有空闲,如果是,唤醒notFull的阻塞线程。
  • 释放完锁后,判断容量是否为空,如果是,唤醒notEmpty的阻塞线程。

poll操作

从队列头部获取并移除一个元素, 如果队列为空则返回 null, 该方法是不阻塞的。

poll源代码

poll方法源代码

 public E poll() {
        final AtomicInteger count = this.count;
        //如果队列为空,返回null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        //获取takeLock独占锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //如果队列不为空,则出队,并递减计数
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                容量大于1,则激活 notEmpty 的条件队列,唤醒被阻塞的线程
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
           //释放锁
            takeLock.unlock();
        }
        if (c == capacity)
            //唤醒notFull条件队列里被阻塞的线程
            signalNotFull();
        return x;
    }
复制代码

dequeue方法源代码

  //出队列
  private E dequeue() {
        //获取head节点
        Node<E> h = head;
        //获取到head节点指向的下一个节点
        Node<E> first = h.next;
        //head节点原来指向的节点的next指向自己,等待下次gc回收
        h.next = h; // help GC
        // head节点指向新的节点
        head = first;
        // 获取到新的head节点的item值
        E x = first.item;
        // 新head节点的item值设置为null
        first.item = null;
        return x;
    }
复制代码

为了形象生动,我们用一张图来描述出队过程。图片参考来源

signalNotFull方法源码

 private void signalNotFull() {
        //获取put独占锁
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            唤醒notFull条件队列里被阻塞的线程
            notFull.signal();
        } finally {
            //释放锁
            putLock.unlock();
        }
    }
复制代码

poll流程图

基本流程:

  • 判断元素是否为空,如果是,就返回null。
  • 加锁
  • 判断队列是否有元素,如果没有,释放锁
  • 如果队列有元素,则出队列,获取数据,容量计数器减一。
  • 判断此时容量是否大于1,如果是,唤醒notEmpty的阻塞线程。
  • 释放完锁后,判断容量是否满,如果是,唤醒notFull的阻塞线程。

peek操作

获取队列头部元素但是不从队列里面移除它,如果队列为空则返回 null。 该方法是不 阻塞的。

peek源代码

  public E peek() {
        //队列容量为0,返回null
        if (count.get() == 0)
            return null;
        //获取takeLock独占锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            //判断first是否为null,如果是直接返回
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            //释放锁
            takeLock.unlock();
        }
    }
复制代码

peek流程图

基本流程:

  • 判断队列容量大小是否为0,如果是,就返回null。
  • 加锁
  • 获取队列头部节点first
  • 判断节点first是否为null,是的话,返回null。
  • 如果fist不为null,返回节点first的元素。
  • 释放锁。

take操作

获取当前队列头部元素并从队列里面移除它。 如果队列为空则阻塞当前线程直到队列 不为空然后返回元素,如果在阻塞时被其他线程设置了中断标志, 则被阻塞线程会抛出 InterruptedException 异常而返回。

take源代码

 public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        //获取takeLock独占锁
        final ReentrantLock takeLock = this.takeLock;
        //获取独占锁,它跟lock的区别,是可以被中断
        takeLock.lockInterruptibly();
        try {
            //当前队列为空,则阻塞挂起
            while (count.get() == 0) {
                notEmpty.await();
            }
            //)出队并递减计数
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
               //激活 notEmpty 的条件队列,唤醒被阻塞的线程
                notEmpty.signal();
        } finally {
            //释放锁
            takeLock.unlock();
        }
        if (c == capacity)
            //激活 notFull 的条件队列,唤醒被阻塞的线程
            signalNotFull();
        return x;
    }
复制代码

take流程图

基本流程:

  • 加锁
  • 判断队列容量大小是否为0,如果是,阻塞当前线程,直到队列不为空。
  • 如果队列容量大小大于0,节点出队列,获取元素x,计数器减一。
  • 判断队列容量大小是否大于1,如果是,唤醒notEmpty的阻塞线程。
  • 释放锁。
  • 判断队列容量是否已满,如果是,唤醒notFull的阻塞线程。
  • 返回出队元素x

remove操作

删除队列里面指定的元素,有则删除并返回 true,没有则返回 false。

remove方法源代码

 public boolean remove(Object o) {
         //为空直接返回false
        if (o == null) return false;
        //双重加锁
        fullyLock();
        try {
            //边历队列,找到元素则删除并返回true
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    //执行unlink操作
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            //解锁
            fullyUnlock();
        }
    }
复制代码

双重加锁,fullyLock方法源代码

void fullyLock() {
        //putLock独占锁加锁
        putLock.lock();
        //takeLock独占锁加锁
        takeLock.lock();
    }
复制代码

unlink方法源代码

  void unlink(Node<E> p, Node<E> trail) {
        p.item = null;
        trail.next = p.next;
        if (last == p)
            last = trail;
        //如果当前队列满 ,则删除后,也不忘记唤醒等待的线程 
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }
复制代码

fullyUnlock方法源代码

  void fullyUnlock() {
        //与双重加锁顺序相反,先解takeLock独占锁
        takeLock.unlock();
        putLock.unlock();
    }
复制代码

remove流程图

基本流程

  • 判断要删除的元素是否为空,是就返回false。
  • 如果要删除的元素不为空,加双重锁
  • 遍历队列,找到要删除的元素,如果找不到,返回false。
  • 如果找到,删除该节点,返回true。
  • 释放锁

size操作

获取当前队列元素个数。

 public int size() {
        return count.get();
    }
复制代码

由于进行出队、入队操作时的 count是加了锁的,所以结果相比ConcurrentLinkedQueue 的 size 方法比较准确。

总结

  • LinkedBlockingQueue底层通过单向链表实现。
  • 它有头尾两个节点,入队操作是从尾节点添加元素,出队操作是对头节点进行操作。
  • 它的容量是原子变量count,保证szie获取的准确性。
  • 它有两把独占锁,保证了队列操作原子性。
  • 它的两把锁都配备了一个条件队列,用来存放阻塞线程,结合入队、出队操作实现了一个生产消费模型。

Java并发编程之美中,有一张图惟妙惟肖描述了它,如下图:


 

显示全文