BlockingQueue 经常被考察的点,就是是否有界( Bounded 、 Unbounded ),这一点也往往会影响我们在应用开发中的选择,我这里简单总结一下。
LinkedBlockingQueue
LinkedBlockingQueue,容易被误解为无边界,但其实其行为和内部代码都是基于有界的逻辑实现的,只不过如果我们没有在创建队列时就指定容量,那么其容量限制就自动被 设置为 Integer.MAX_VALUE ,成为了无界队列。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
static class Node<E> {
E item;
/**
* 下列三种情况之一:
* - 真正的后继节点
* - 自己,发生在出队时
* - null, 表示没有后继节点,是最后了
*/
Node<E> next;
Node(E x) { item = x; }
}
入队
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
初始化列表 last = head = new Node
![入队图示](https://raw.githubusercontent.com/smartlin/pic/main/_posts/java%E5%B9%B6%E5%8F%91/%E9%98%BB%E5%A1%9E%E9%98%9F%E5%88%97.md/LinkedBlockingQueue%E5%85%A5%E9%98%9F%E5%9B%BE%E7%A4%BA.png =888x)
出队
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
加锁分析
两把锁,同一时刻,允许两个线程(一个生产者,一个消费者)同时执行
- 当节点总数大于2时(包括Dummy节点),putLock保证的last节点的线程安全,takeLock保证的时head节点的安全,两把锁同时保证了入队和出队么有竞争
- 当节点总数等于2时(即一个Dummy节点,一个正常节点),这时候仍然时2把锁,不会竞争
- 当节点总数等于1时(即只有Dummy节点)这时的take线程会被notEmpty条件阻塞,有竞争,会阻塞
//用于put(阻塞) offer(非阻塞)
private final ReentrantLock putLock = new ReentrantLock();
//用于take(阻塞) poll(非阻塞)
private final Condition notFull = putLock.newCondition();
put操作
public void put(E e) throws InterruptedException {
//不允许有空值
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
//count用于维护元素技术
final AtomicInteger count = this.count;
//加锁,可打断
putLock.lockInterruptibly();
try {
//满了等待
while (count.get() == capacity) {
//倒过来读,等待notFull
notFull.await();
}
//有空位,入队且计数+1
enqueue(node);
c = count.getAndIncrement();
//除了自己put以外,队列还有空位,由自己唤醒其他put线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//如果队列中只有一个元素,唤醒take线程
if (c == 0)
signalNotEmpty();
}
take操作
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果队列中只有一个空位,唤醒put线程
//如果有多个队列进行出队,第一个线程满足c == capacity,但后续线程c < capacity
if (c == capacity)
signalNotFull();
return x;
}
与ArrayBlockingQueue比较
ArrayBlockingQueue是最典型的的有界队列,其内部以fnal的数组保存数据,数组的大小就决定了队列的边界,所以我们在创建ArrayBlockingQueue时,都要指定容量,如
public ArrayBlockingQueue(int capacity, boolean fair)
- LinkedBlockingQueue支持有界,ArrayBlockingQueue强制有界
- LinkedBlockingQueue实现是链表,ArrayBlockingQueue是数组
- LinkedBlockingQueue是懒加载,ArrayBlockingQueue需要提前初始化node数组
- LinkedBlockingQueue每次入队会生产新Node,而ArrayBlockingQueue的Node是提前创建好的
- LinkedBlockingQueue两把锁(put,take),ArrayBlockingQueue一把锁,两者都是用ReentrantLock加锁
SynchronousQueue
SynchronousQueue,这是一个非常奇葩的队列实现,每个删除操作都要等待插入操作,反之每个插入操作也都要等待删除动作。那么这个队列的容量是多少呢?是1吗?其实不 是的,其内部容量是0
利用 CAS 替换掉了原本基于锁的逻辑,同步开销比较小。它是 Executors.newCachedThreadPool() 的默认队列。
PriorityBlockingQueue
PriorityBlockingQueue 是无边界的优先队列,虽然严格意义上来讲,其大小总归是要受系统资源影响。
DelayedQueue
DelayedQueue和LinkedTransferQueue同样是无边界的队列。对于无边界的队列,有一个自然的结果,就是put操作永远也不会发生其他BlockingQueue的那种等待情况。
如果我们分析不同队列的底层实现, BlockingQueue 基本都是基于锁实现
ConcurrentLinkedQueue
ConcurrentLinkedQueue与LinkedBlockingQueue非常像,只是在加锁的部分使用了CAS
Deque
有两个特别的Deque实现,ConcurrentLinkedDeque 和 LinkedBlockingDeque。
Deque 的侧重点是支持对队列头尾都进行插入和删除,所以提供了特定的方法,
- 尾部插入时需要的addLast(e)、offerLast(e)
- 尾部删除所需要的removeLast()、pollLast()
以 LinkedBlockingQueue 、 ArrayBlockingQueue 和 SynchronousQueue 为例,
我们一起来分析一下,根据需求可以从很多方面考量: 考虑应用场景中对队列边界的要求。
ArrayBlockingQueue是有明确的容量限制的,
而LinkedBlockingQueue则取决于我们是否在创建时指定,
SynchronousQueue则干脆不能缓存任何元素。
从空间利用角度,数组结构的ArrayBlockingQueue要比LinkedBlockingQueue紧凑,因为其不需要创建所谓节点,但是其初始分配阶段就需要一段连续的空间,所以初始内存 需求更大。
通用场景中,LinkedBlockingQueue的吞吐量一般优于ArrayBlockingQueue,因为它实现了更加细粒度的锁操作。
ArrayBlockingQueue实现比较简单,性能更好预测,属于表现稳定的“选手”。
如果我们需要实现的是两个线程之间接力性(handof)的场景,你可能会选择CountDownLatch,但是SynchronousQueue也是完美符合这种场景的,而且线程间协调和数据传输统一起来,代码更加规范。
可能令人意外的是,很多时候SynchronousQueue的性能表现,往往大大超过其他实现,尤其是在队列元素较小的场景。