登录后台

页面导航

本文编写于 950 天前,最后修改于 164 天前,其中某些信息可能已经过时。

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列。

  1. ArrayDeque, (数组双端队列)
  2. PriorityQueue, (优先级队列)
  3. ConcurrentLinkedQueue, (基于链表的并发队列)
  4. DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口)
  5. ArrayBlockingQueue, (基于数组的并发阻塞队列)
  6. LinkedBlockingQueue, (基于链表的FIFO阻塞队列)
  7. LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列)
  8. PriorityBlockingQueue, (带优先级的无界阻塞队列)
  9. SynchronousQueue (并发同步阻塞队列)

详细内容可参考:Java 并发工具包 java.util.concurrent 用户指南

阻塞队列和生产者-消费者模式

阻塞队列(Blocking queue)提供了可阻塞的put和take方法,它们与可定时的offer和poll是等价的。如果Queue已经满了,put方法会被阻塞直到有空间可用;如果Queue是空的,那么take方法会被阻塞,直到有元素可用。Queue的长度可以有限,也可以无限;无限的Queue永远不会充满,所以它的put方法永远不会阻塞。

阻塞队列支持生产者-消费者设计模式。一个生产者-消费者设计分离了“生产产品”和“消费产品”。该模式不会发现一个工作便立即处理,而是把工作置于一个任务(“to do”)清单中,以备后期处理。生产者-消费者模式简化了开发,因为它解除了生产者和消费者之间相互依赖的代码。生产者和消费者以不同的或者变化的速度生产和消费数据,生产者-消费者模式将这些活动解耦,因而简化了工作负荷的管理。

生产者-消费者设计是围绕阻塞队列展开的,生产者把数据放入队列,并使数据可用,当消费者为适当的行为做准备时会从队列中获取数据。生产者不需要知道消费者的省份或者数量,甚至根本没有消费者—它们只负责把数据放入队列。类似地,消费者也不需要知道生产者是谁,以及是谁给它们安排的工作。BlockingQueue可以使用任意数量的生产者和消费者,从而简化了生产者-消费者设计的实现。最常见的生产者-消费者设计是将线程池与工作队列相结合。

阻塞队列简化了消费者的编码,因为take会保持阻塞直到可用数据出现。如果生产者不能足够快地产生工作,让消费者忙碌起来,那么消费者只能一直等待,直到有工作可做。同时,put方法的阻塞特性也大大地简化了生产者的编码;如果使用一个有界队列,那么当队列充满的时候,生产者就会阻塞,暂不能生成更多的工作,从而给消费者时间来赶进进度。

有界队列是强大的资源管理工具,用来建立可靠的应用程序:它们遏制那些可以产生过多工作量、具有威胁的活动,从而让你的程序在面对超负荷工作时更加健壮。

虽然生产者-消费者模式可以把生产者和消费者的代码相互解耦合,但是它们的行为还是间接地通过共享队列耦合在一起了

类库中包含一些BlockingQueue的实现,其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,与 LinkedList和ArrayList相似,但是却拥有比同步List更好的并发性能。PriorityBlockingQueue是一个按优先级顺序排序的队列,当你不希望按照FIFO的属性处理元素时,这个PriorityBolckingQueue是非常有用的。正如其他排序的容器一样,PriorityBlockingQueue可以比较元素本身的自然顺序(如果它们实现了Comparable),也可以使用一个 Comparator进行排序。

最后一个BlockingQueue的实现是SynchronousQueue,它根本上不是一个真正的队列,因为它不会为队列元素维护任何存储空间。不过,它维护一个排队的线程清单,这些线程等待把元素加入(enqueue)队列或者移出(dequeue)队列。因为SynchronousQueue没有存储能力,所以除非另一个线程已经准备好参与移交工作,否则put和take会一直阻止。SynchronousQueue这类队列只有在消费者充足的时候比较合适,它们总能为下一个任务作好准备。

非阻塞算法

基于锁的算法会带来一些活跃度失败的风险。如果线程在持有锁的时候因为阻塞I/O,页面错误,或其他原因发生延迟,很可能所有的线程都不能前进了。

一个线程的失败或挂起不应该影响其他线程的失败或挂起,这样的算法成为非阻塞(nonblocking)算法;如果算法的每一个步骤中都有一些线程能够继续执行,那么这样的算法称为锁自由(lock-free)算法。在线程间使用CAS进行协调,这样的算法如果能构建正确的话,它既是非阻塞的,又是锁自由的。非竞争的CAS总是能够成功,如果多个线程以一个CAS竞争,总会有一个胜出并前进。非阻塞算法堆死锁和优先级倒置有“免疫性”(但它们可能会出现饥饿和活锁,因为它们允许重进入)。

非阻塞算法通过使用低层次的并发原语,比如比较交换,取代了锁。原子变量类向用户提供了这些底层级原语,也能够当做“更佳的 volatile 变量”使用,同时提供了整数类和对象引用的原子化更新操作。

阻塞队列的使用

ArrayBlockingQueue<E> 实现了 BlockingQueue<E>,另外还有 LinkedBlockingQueue<E>PriorityBlockingQueue<E>ArrayBlockingQueue<E> 主要是基于数组实现的, LinkedBlockingQueue<E> 是基于链表实现的,它们都是先进先出;而 PriorityBlockingQueue<E> 是基于优先级队列的。这篇博文主要用 ArrayBlockingQueue<E> 举例,另外两个使用方式和 ArrayBlockingQueue<E> 差不多,具体的可以参考官方文档。

使用 ArrayBlockingQueue<E> 需要先指明缓存区的大小,指明之后就无法修改,试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。下面使用 ArrayBlockingQueue<E> 来实现类似于前面那篇博客中提到的一个功能:

public class BlockingQueueTest {

    public static void main(String[] args) {
        final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3); //缓冲区允许放3个数据

        for(int i = 0; i < 2; i ++) {
            new Thread() { //开启两个线程不停的往缓冲区存数据

                @Override
                public void run() {
                    while(true) {
                        try {
                            Thread.sleep((long) (Math.random()*1000));
                            System.out.println(Thread.currentThread().getName() + "准备放数据"
                                    + (queue.size() == 3?"..队列已满,正在等待":"..."));
                            queue.put(1);
                            System.out.println(Thread.currentThread().getName() + "存入数据," 
                                    + "队列目前有" + queue.size() + "个数据");
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }

            }.start();
        }

        new Thread() { //开启一个线程不停的从缓冲区取数据

            @Override
            public void run() {
                while(true) {
                    try {
                        Thread.sleep(1000);
                        System.out.println(Thread.currentThread().getName() + "准备取数据"
                                + (queue.size() == 0?"..队列已空,正在等待":"..."));
                        queue.take();
                        System.out.println(Thread.currentThread().getName() + "取出数据," 
                                + "队列目前有" + queue.size() + "个数据");
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        }.start();
    }

}

程序的逻辑很简单,不难理解,下面看一下运行的结果:

Thread-0准备放数据…
Thread-0存入数据,队列目前有1个数据
Thread-1准备放数据…
Thread-1存入数据,队列目前有2个数据
Thread-2准备取数据…
Thread-2取出数据,队列目前有1个数据
Thread-0准备放数据…
Thread-0存入数据,队列目前有2个数据
Thread-1准备放数据…
Thread-1存入数据,队列目前有3个数据
Thread-2准备取数据…
Thread-2取出数据,队列目前有2个数据
Thread-0准备放数据…
Thread-0存入数据,队列目前有3个数据
Thread-1准备放数据..队列已满,正在等待
Thread-0准备放数据..队列已满,正在等待
Thread-2准备取数据…
Thread-2取出数据,队列目前有2个数据
Thread-1存入数据,队列目前有3个数据

所以使用阻塞队列就非常方便了,不用我们人为的去做判断了。

线程间通信可以使用 synchronized 和 wait、notify 组合,可以使用 Condition,其实我们也可以使用阻塞队列来实现线程间的通信,下面举个示例:

//阻塞队列实现线程间通信
public class BlockingQueueCommunication {

    public static void main(String[] args) {
        Business bussiness = new Business();

        new Thread(new Runnable() {// 开启一个子线程

                    @Override
                    public void run() {
                        for (int i = 1; i <= 3; i++) {

                            bussiness.sub();
                        }

                    }
                }).start();

        // main方法主线程
        for (int i = 1; i <= 3; i++) {

            bussiness.main();
        }
    }
}

class Business {

    private int i = 0;

    BlockingQueue queue1 = new ArrayBlockingQueue(1);
    BlockingQueue queue2 = new ArrayBlockingQueue(1);

    {
        try {
            queue2.put(1);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void sub() {
        try {
            queue1.put(1); //如果主线程没执行完,则子线程缓冲区一直有数,子线程在这里被阻塞
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.out.println("子线程执行前i=" + i++);
        System.out.println("子线程执行后i=" + i);

        try {
            queue2.take(); //取出主线程中缓冲区的数,让主线程执行
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void main() {
        try {
            queue2.put(1); //如果子线程没执行完,则主线程缓冲区一直有数,主线程在这里被阻塞
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.out.println("主线程执行前i=" + i++);
        System.out.println("主线程执行后i=" + i);

        try {
            queue1.take(); //取出子线程中缓冲区的数,让子线程执行
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

代码看起来有点长,但是逻辑很简单,就是主线程和子线程你一下我一下,轮流执行,执行的任务就是将公共数据i自增1,使用阻塞队列实现的,而且线程安全,因为一个线程执行的时候,另一个线程是被阻塞的。

设计思想是这样的,定义两个阻塞队列,分别只能放1个数,分别对应两个线程。那么我在静态代码块中先将主线程的队列塞满,然后我让两个线程在执行任务之前,先往各自的队列中塞一个数,那么肯定主线程肯定被阻塞,因为我之前已经在静态代码块中给主线程的队列塞过一个数了。然后子线程在执行完任务后,把主线程队列中的数取出,那么主线程就开始执行了,子线程此时被阻塞(因为刚刚它自己执行任务前塞了个数),主线程执行完拿出子线程队列中的数,这时候子线程又开始执行了。所以利用了阻塞队列会阻塞一个线程的办法来实现两个线程之间交替执行。

相关阅读:http://blog.csdn.net/column/details/bingfa.html

本文内容来自:

  1. https://blog.csdn.net/u012881904/article/details/51491736
  2. https://blog.csdn.net/eson_15/article/details/51586127