阻塞队列

starlin 2,596 2021-03-31

BlockingQueue 经常被考察的点,就是是否有界( Bounded 、 Unbounded ),这一点也往往会影响我们在应用开发中的选择,我这里简单总结一下。

LinkedBlockingQueue

LinkedBlockingQueue,容易被误解为无边界,但其实其行为和内部代码都是基于有界的逻辑实现的,只不过如果我们没有在创建队列时就指定容量,那么其容量限制就自动被 设置为 Integer.MAX_VALUE ,成为了无界队列

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable

static class Node<E> {
        E item;

        /**
         * 下列三种情况之一:
         * - 真正的后继节点
         * - 自己,发生在出队时
         * - null, 表示没有后继节点,是最后了
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

入队

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

初始化列表 last = head = new Node(null);Dummy节点用来占位,item为null
![入队图示](https://raw.githubusercontent.com/smartlin/pic/main/_posts/java%E5%B9%B6%E5%8F%91/%E9%98%BB%E5%A1%9E%E9%98%9F%E5%88%97.md/LinkedBlockingQueue%E5%85%A5%E9%98%9F%E5%9B%BE%E7%A4%BA.png =888x)

出队

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

![出队示意图](https://raw.githubusercontent.com/smartlin/pic/main/_posts/java%E5%B9%B6%E5%8F%91/%E9%98%BB%E5%A1%9E%E9%98%9F%E5%88%97.md/%E5%87%BA%E9%98%9F%E7%A4%BA%E6%84%8F%E5%9B%BE.png =1088x)

加锁分析

两把锁,同一时刻,允许两个线程(一个生产者,一个消费者)同时执行

  • 当节点总数大于2时(包括Dummy节点),putLock保证的last节点的线程安全,takeLock保证的时head节点的安全,两把锁同时保证了入队和出队么有竞争
  • 当节点总数等于2时(即一个Dummy节点,一个正常节点),这时候仍然时2把锁,不会竞争
  • 当节点总数等于1时(即只有Dummy节点)这时的take线程会被notEmpty条件阻塞,有竞争,会阻塞
    //用于put(阻塞) offer(非阻塞)
    private final ReentrantLock putLock = new ReentrantLock();

    //用于take(阻塞) poll(非阻塞)
    private final Condition notFull = putLock.newCondition();

put操作

    public void put(E e) throws InterruptedException {
        //不允许有空值
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        //count用于维护元素技术
        final AtomicInteger count = this.count;
        //加锁,可打断
        putLock.lockInterruptibly();
        try {
            //满了等待
            while (count.get() == capacity) {
                //倒过来读,等待notFull
                notFull.await();
            }
            //有空位,入队且计数+1
            enqueue(node);
            c = count.getAndIncrement();
            //除了自己put以外,队列还有空位,由自己唤醒其他put线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        //如果队列中只有一个元素,唤醒take线程
        if (c == 0)
            signalNotEmpty();
    }

take操作

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //如果队列中只有一个空位,唤醒put线程
    //如果有多个队列进行出队,第一个线程满足c == capacity,但后续线程c < capacity
    if (c == capacity)
        signalNotFull();
    return x;
}

与ArrayBlockingQueue比较

ArrayBlockingQueue是最典型的的有界队列,其内部以fnal的数组保存数据,数组的大小就决定了队列的边界,所以我们在创建ArrayBlockingQueue时,都要指定容量,如

public ArrayBlockingQueue(int capacity, boolean fair)
  • LinkedBlockingQueue支持有界,ArrayBlockingQueue强制有界
  • LinkedBlockingQueue实现是链表,ArrayBlockingQueue是数组
  • LinkedBlockingQueue是懒加载,ArrayBlockingQueue需要提前初始化node数组
  • LinkedBlockingQueue每次入队会生产新Node,而ArrayBlockingQueue的Node是提前创建好的
  • LinkedBlockingQueue两把锁(put,take),ArrayBlockingQueue一把锁,两者都是用ReentrantLock加锁

SynchronousQueue

SynchronousQueue,这是一个非常奇葩的队列实现,每个删除操作都要等待插入操作,反之每个插入操作也都要等待删除动作。那么这个队列的容量是多少呢?是1吗?其实不 是的,其内部容量是0
利用 CAS 替换掉了原本基于锁的逻辑,同步开销比较小。它是 Executors.newCachedThreadPool() 的默认队列。

PriorityBlockingQueue

PriorityBlockingQueue 是无边界的优先队列,虽然严格意义上来讲,其大小总归是要受系统资源影响。

DelayedQueue

DelayedQueueLinkedTransferQueue同样是无边界的队列。对于无边界的队列,有一个自然的结果,就是put操作永远也不会发生其他BlockingQueue的那种等待情况。
如果我们分析不同队列的底层实现, BlockingQueue 基本都是基于锁实现

ConcurrentLinkedQueue

ConcurrentLinkedQueue与LinkedBlockingQueue非常像,只是在加锁的部分使用了CAS

Deque

有两个特别的Deque实现,ConcurrentLinkedDeque 和 LinkedBlockingDeque。
Deque 的侧重点是支持对队列头尾都进行插入和删除,所以提供了特定的方法,

  • 尾部插入时需要的addLast(e)、offerLast(e)
  • 尾部删除所需要的removeLast()、pollLast()

以 LinkedBlockingQueue 、 ArrayBlockingQueue 和 SynchronousQueue 为例,
我们一起来分析一下,根据需求可以从很多方面考量: 考虑应用场景中对队列边界的要求。
ArrayBlockingQueue是有明确的容量限制的,
而LinkedBlockingQueue则取决于我们是否在创建时指定,
SynchronousQueue则干脆不能缓存任何元素。
从空间利用角度,数组结构的ArrayBlockingQueue要比LinkedBlockingQueue紧凑,因为其不需要创建所谓节点,但是其初始分配阶段就需要一段连续的空间,所以初始内存 需求更大。
通用场景中,LinkedBlockingQueue的吞吐量一般优于ArrayBlockingQueue,因为它实现了更加细粒度的锁操作。
ArrayBlockingQueue实现比较简单,性能更好预测,属于表现稳定的“选手”。
如果我们需要实现的是两个线程之间接力性(handof)的场景,你可能会选择CountDownLatch,但是SynchronousQueue也是完美符合这种场景的,而且线程间协调和数据传输统一起来,代码更加规范。
可能令人意外的是,很多时候SynchronousQueue的性能表现,往往大大超过其他实现,尤其是在队列元素较小的场景。


# java并发