Java阻塞队列

BlockingQueue(阻塞队列)是java.util.concurrent包下面的一个接口,在Java并发编程中,阻塞队列占有重要的一席之地。比如,线程池、消息队列等等。总结一下Java中的阻塞队列。

阻塞队列

BlockingQueue

BlockingQueue是所有阻塞队列的父接口,定义了一个队列应该具备的基本功能,即’存’和’取’的接口。阻塞队列常常运用于生产者消费者的编程模型当中,从中存入或取出元素都是线程安全的。

阻塞队列

所谓的”阻塞队列”,即若队列为空,则 [消费者线程] 试图从队列取出元素时阻塞,直到 [生产者线程] 向其中放入一个元素。若队列已满,则 [生产者线程] 试图向队列插入元素时阻塞,直到 [消费者线程] 从中取出一个元素。

满足上述功能的接口即 put(o)take()。除此以外,BlockingQueue还提供了其他类型的存取接口:

抛出异常 返回特定的值 阻塞 带超时的阻塞
插入 add(o) offer(o) put(o) offer(o,timeout,timeunit)
移除 remove(o) poll() take() poll(timeout,timeunit)
检查 element() peek()

插入:即向队列中添加一个元素;

移除:即从队列中取出一个元素,同时从队列中删除该元素;

检查:即从队列中取出一个元素,但不删除该元素

抛出异常:即当调用的方法不能顺利完成操作时(比如从空队列中取出一个元素),抛出异常

返回特定值:即当调用的方法不能顺利完成操作时,返回一个特定的值。比如add(o),如果此时队列已满插入失败,则返回false

阻塞:即当调用的方法不能顺利完成操作时,一直阻塞直到完成该操作

带超时的阻塞:即当调用的方法不能顺利完成操作时,阻塞直到操作完成或到达指定超时时间

ArrayBlockingQueue

ArrayBlockingQueue实现了BlockingQueue接口。从名字上就可以看出,ArrayBlockingQueue是基于数组实现的。

ArrayBlockingQueue是一个有界队列,同时,该队列具有先进先出的特性。ArrayBlockingQueue维护了两个索引,takeIndex和putIndex。takeIndex指向下一个要取出的元素位置,即队列头部;putIndex指出下一个元素要插入的位置,即队列尾部。

ArrayBlockingQueue的实现很简单,是生产者消费者模型的一个典型例子。其内部使用两个条件对象notEmpty、notFull完成生产者消费者线程的等待和唤醒。

LinkedBlockingQueue

LinkedBlockingQueue同样实现了BlockingQueue接口。顾名思义,LinkedBlockingQueue底层是用链表结构存储元素的。与ArrayBlockingQueue一样,LinkedBlockingQueue也是先进先出队列。

LinkedBlockingQueue的默认构造函数会将其设为无界队列(Integer.MAX_VALUE),也可以调用LinkedBlockingQueue(int capacity)将其设为有界队列。

PriorityBlockingQueue

PriorityBlockingQueue即优先级队列。PriorityBlockingQueue是一个无界队列,其底层存储使用了数组,存储空间不足时会发生自动扩容。

既然是优先级队列,表示该队列是按优先级排序的。PriorityBlockingQueue内部使用了最小堆进行排序,不允许插入不可比较的对象(包括NULL)。

有两种方法实现队列元素可比较:

第一种PriorityBlockingQueue接受一个实现了Comparatorj接口的对象作为构造函数参数。

第二种是插入的元素本身实现了Comparable接口。

对于优先级相同的元素(compare结果为0)PriorityBlockingQueue没有确定他们之间被取出的顺序,可以自定义元素的包装类实现相同元素FIFO的顺序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class FIFOEntry<E extends Comparable<? super E>> implements Comparable<FIFOEntry<E>> {
static final AtomicLong seq = new AtomicLong(0);
final long seqNum;
final E entry;
public FIFOEntry(E entry) {
seqNum = seq.getAndIncrement();
this.entry = entry;
}
public E getEntry() { return entry; }
public int compareTo(FIFOEntry<E> other) {
if (res == 0 && other.entry != this.entry)
res = (seqNum < other.seqNum ? -1 : 1);
return res;
}

还有一点需要注意的是,使用PriorityBlockingQueue返回的迭代器Iterator对队列进行迭代时,所迭代的顺序并不一定是具有优先级的。如果需要按照优先级顺序进行迭代,可以使用Arrays.sort(pq.toArray())先将PriorityBlockingQueue.toArray()得到的数组进行排序,然后对该数组进行遍历。PriorityBlockingQueue.toArray()返回的数组,其中的元素并不映射到PriorityBlockingQueue存储元素的数组,而是其拷贝。

DelayQueue

DelayQueue在如何执行一个延迟任务?中已经提到过了。

DelayQueue内部元素的存储委托给了PriorityQueue。PriorityQueue与PriorityBlockingQueue相似,是具有优先级排序的队列,只不过没有实现BlockingQueue接口,同时也不是线程安全的。因为使用了PriorityQueue存储元素,所以DelayQueue也是无界队列。

DelayQueue的思想是:插入的元素按指定的优先级顺序排列,而这个优先级顺序是到期时间。即距离到期时间最短的元素具有最高优先级。同时,DelayQueue的元素也只有到了到期时间后才可被消费者线程取出。

这样的话,只要看队列优先级最高元素即可,所有的消费者线程阻塞在take方法上,直到优先级最高元素到期,被某个消费者线程取出,队列下一个元素成为优先级最高元素,其余消费者线程继续阻塞。

如何获取元素的到期时间?DelayQueue要求插入队列的元素必须实现Delayed接口。

1
2
3
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}

getDelay方法到期时间,返回值小于等于0时,认为到达过期时间。

可以看到,Delayed接口继承了Comparable接口,意味着两个Delayed对象之间可以进行比较。因此DelayQueue(PriorityQueue)可以对插入的元素进行排序。

ScheduledThreadPoolExecutor.ScheduledFutureTask实现了Delayed接口,ScheduledThreadPoolExecutor使用ScheduledFutureTask为任务进行排序。

ScheduledFutureTask:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public long getDelay(TimeUnit unit) {
// 返回到期时间与当前时间相距的纳秒数
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if sameobject
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}

SynchronousQueue

SynchronousQueue严格意义上来说,并不能称之为队列。SynchronousQueue内部并没有数据缓存空间。当一个生产者线程试图插入一个元素到SynchronousQueue时会被阻塞,直到有消费者线程取走这个元素。同样的,当一个消费者线程试图从SynchronousQueue取出一个元素时会被阻塞,直到有生产者线程插入了一个元素。

有点类似于”一手交钱一手交货”的情景,数据是在配对的生产者和消费者线程之间直接传递的。SynchronousQueue提供了线程之间安全的交换元素的方法。

Executors.newCachedThreadPool()使用了SynchronousQueue,保证了如果有空闲线程,则使用空闲线程执行任务,若没有空闲线程,则创建新的线程来执行任务。适合任务执行时间短,生产者速度小于消费者速度的场景。

When should I use SynchronousQueue讨论了SynchronousQueue的应用场景。

BlockingDeque

与BlockingQueue类似,对BlockingDeque的存取也是线程安全的。实际上,BlockingDeque接口继承自BlockingQueue和Deque。Deque,意味着”Double Ended Queue”,即可以从两端分别进行存取的队列。BlockingQueue只可以从队列头部取出,从对列尾部插入,与BlockingQueue相比,BlockingDeque既可以从头部插入、取出,也可以从尾部插入、取出。

阻塞双端队列

Java-ForkJoin框架这篇文章中提到过的”工作窃取”技术,就是基于双端队列实现的。

BlockingDeque提供了4组从队列中获取、插入、查看的方法:

抛出异常 返回特定的值 阻塞 带超时的阻塞
插入 addFirst(o) offerFirst(o) putFirst(o) offerFirst(o,timeout,timeunit)
移除 removeFirst(o) pollFirst() takeFirst() pollFirst(timeout,timeunit)
检查 elementFirst() peekFirst()
插入 addLast(o) offerLast(o) putLast(o) offerLast(o,timeout,timeunit)
移除 removeLast(o) pollLast() takeLast() pollLast(timeout,timeunit)
检查 elementLast() peekLast()

BlockingDeque的实现类为LinkedBlockingDeque,是一个基于链表结构的双端队列。

参考

java.util.concurrent - Java Concurrency Utilities

推荐文章