浅谈Java中的Queue家族

 更新时间:2021年06月01日 14:12:35   作者:flydean  
Java中Collection集合有三大家族List,Set和Queue。当然Map也算是一种集合类,但Map并不继承Collection接口。List,Set在我们的工作中会经常使用,通常用来存储结果数据,而Queue由于它的特殊性,通常用在生产者消费者模式中。今天这篇文章将带大家进入Queue家族。

Queue接口

先看下Queue的继承关系和其中定义的方法:

Queue继承自Collection,Collection继承自Iterable。

Queue有三类主要的方法,我们用个表格来看一下他们的区别:

方法类型 方法名称 方法名称 区别
Insert add offer 两个方法都表示向Queue中添加某个元素,不同之处在于添加失败的情况,add只会返回true,如果添加失败,会抛出异常。offer在添加失败的时候会返回false。所以对那些有固定长度的Queue,优先使用offer方法。
Remove remove poll 如果Queue是空的情况下,remove会抛出异常,而poll会返回null。
Examine element peek 获取Queue头部的元素,但不从Queue中删除。两者的区别还是在于Queue为空的情况下,element会抛出异常,而peek返回null。

注意,因为对poll和peek来说null是有特殊含义的,所以一般来说Queue中禁止插入null,但是在实现中还是有一些类允许插入null比如LinkedList。

尽管如此,我们在使用中还是要避免插入null元素。

Queue的分类

一般来说Queue可以分为BlockingQueue,Deque和TransferQueue三种。

BlockingQueue

BlockingQueue是Queue的一种实现,它提供了两种额外的功能:

当当前Queue是空的时候,从BlockingQueue中获取元素的操作会被阻塞。当当前Queue达到最大容量的时候,插入BlockingQueue的操作会被阻塞。

BlockingQueue的操作可以分为下面四类:

操作类型Throws exceptionSpecial valueBlocksTimes outInsertadd(e)offer(e)put(e)offer(e, time, unit)Removeremove()poll()take()poll(time, unit)Examineelement()peek()not applicablenot applicable

第一类是会抛出异常的操作,当遇到插入失败,队列为空的时候抛出异常。

第二类是不会抛出异常的操作。

第三类是会Block的操作。当Queue为空或者达到最大容量的时候。

第四类是time out的操作,在给定的时间里会Block,超时会直接返回。

BlockingQueue是线程安全的Queue,可以在生产者消费者模式的多线程中使用,如下所示:

class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }

最后,在一个线程中向BlockQueue中插入元素之前的操作happens-before另外一个线程中从BlockQueue中删除或者获取的操作。

Deque

Deque是Queue的子类,它代表double ended queue,也就是说可以从Queue的头部或者尾部插入和删除元素。

同样的,我们也可以将Deque的方法用下面的表格来表示,Deque的方法可以分为对头部的操作和对尾部的操作:

方法类型 Throws exception Special value Throws exception Special value
Insert addFirst(e) offerFirst(e) addLast(e) offerLast(e)
Remove removeFirst() pollFirst() removeLast() pollLast()
Examine getFirst() peekFirst() getLast() peekLast()

和Queue的方法描述基本一致,这里就不多讲了。

当Deque以 FIFO (First-In-First-Out)的方法处理元素的时候,Deque就相当于一个Queue。

当Deque以LIFO (Last-In-First-Out)的方式处理元素的时候,Deque就相当于一个Stack。

TransferQueue

TransferQueue继承自BlockingQueue,为什么叫Transfer呢?因为TransferQueue提供了一个transfer的方法,生产者可以调用这个transfer方法,从而等待消费者调用take或者poll方法从Queue中拿取数据。

还提供了非阻塞和timeout版本的tryTransfer方法以供使用。

我们举个TransferQueue实现的生产者消费者的问题。

先定义一个生产者:

@Slf4j
@Data
@AllArgsConstructor
class Producer implements Runnable {
    private TransferQueue<String> transferQueue;

    private String name;

    private Integer messageCount;

    public static final AtomicInteger messageProduced = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                boolean added = transferQueue.tryTransfer( "第"+i+"个", 2000, TimeUnit.MILLISECONDS);
                log.info("transfered {} 是否成功: {}","第"+i+"个",added);
                if(added){
                    messageProduced.incrementAndGet();
                }
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
        log.info("total transfered {}",messageProduced.get());
    }
}

在生产者的run方法中,我们调用了tryTransfer方法,等待2秒钟,如果没成功则直接返回。

再定义一个消费者:

@Slf4j
@Data
@AllArgsConstructor
public class Consumer implements Runnable {

    private TransferQueue<String> transferQueue;

    private String name;

    private int messageCount;

    public static final AtomicInteger messageConsumed = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                String element = transferQueue.take();
                log.info("take {}",element );
                messageConsumed.incrementAndGet();
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
        log.info("total consumed {}",messageConsumed.get());
    }

}

在run方法中,调用了transferQueue.take方法来取消息。

下面先看一下一个生产者,零个消费者的情况:

@Test
public void testOneProduceZeroConsumer() throws InterruptedException {

    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(10);
    Producer producer = new Producer(transferQueue, "ProducerOne", 5);

    exService.execute(producer);

    exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
    exService.shutdown();
}

输出结果:

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0个 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1个 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第2个 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第3个 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第4个 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer - total transfered 0

可以看到,因为没有消费者,所以消息并没有发送成功。

再看下一个有消费者的情况:

@Test
public void testOneProduceOneConsumer() throws InterruptedException {

    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(10);
    Producer producer = new Producer(transferQueue, "ProducerOne", 2);
    Consumer consumer = new Consumer(transferQueue, "ConsumerOne", 2);

    exService.execute(producer);
    exService.execute(consumer);

    exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
    exService.shutdown();
}

输出结果:

[pool-1-thread-2] INFO com.flydean.Consumer - take 第0个

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0个 是否成功: true

[pool-1-thread-2] INFO com.flydean.Consumer - take 第1个

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1个 是否成功: true

[pool-1-thread-1] INFO com.flydean.Producer - total transfered 2

[pool-1-thread-2] INFO com.flydean.Consumer - total consumed 2

可以看到Producer和Consumer是一个一个来生产和消费的。

以上就是浅谈Java中的Queue家族的详细内容,更多关于Java中的Queue家族的资料请关注脚本之家其它相关文章!

相关文章

  • spring boot 使用profile来分区配置的操作

    spring boot 使用profile来分区配置的操作

    这篇文章主要介绍了spring boot使用profile来分区配置的操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • Java this 关键字的使用方法详解

    Java this 关键字的使用方法详解

    这篇文章主要介绍了Java this 关键字的使用方法详解的相关资料,希望通过本文能帮助到大家,让大家彻底理解掌握这部分内容,需要的朋友可以参考下
    2017-10-10
  • springboot实现配置两个parent的方法

    springboot实现配置两个parent的方法

    这篇文章主要介绍了springboot实现配置两个parent的方法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • 讲解ssm框架整合(最通俗易懂)

    讲解ssm框架整合(最通俗易懂)

    这篇文章主要介绍了讲解ssm框架整合(最通俗易懂),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • 详解Java中使用泛型实现快速排序算法的方法

    详解Java中使用泛型实现快速排序算法的方法

    这篇文章主要介绍了Java中使用泛型实现快速排序算法的方法,快速排序的平均时间复杂度为(n\log n),文中的方法立足于基础而并没有考虑优化处理,需要的朋友可以参考下
    2016-05-05
  • Java基本语法之内部类示例详解

    Java基本语法之内部类示例详解

    本文带大家认识Java基本语法——内部类,将一个类定义放在另一类的定义的内部,这个就是内部类,内部类允许将一些逻辑相关的类组织在一起,并能够控制位于内部的类的可视性,感兴趣的可以了解一下
    2022-03-03
  • 高级数据结构及应用之使用bitmap进行字符串去重的方法实例

    高级数据结构及应用之使用bitmap进行字符串去重的方法实例

    今天小编就为大家分享一篇关于高级数据结构及应用之使用bitmap进行字符串去重的方法实例,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-02-02
  • Java线程并发工具类CountDownLatch原理及用法

    Java线程并发工具类CountDownLatch原理及用法

    这篇文章主要介绍了Java线程并发工具类CountDownLatch原理及用法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10
  • Java利用递归实现树形结构的工具类

    Java利用递归实现树形结构的工具类

    有时候,我们的数据是带有层级的,比如常见的省市区三级联动,就是一层套着一层。而我们在数据库存放数据的时候,往往是列表形式的,这个时候可能就需要递归处理为树形结构了。本文就为大家介绍了Java利用递归实现树形结构的工具类,希望对大家有所帮助
    2023-03-03
  • java实现猜拳游戏试题

    java实现猜拳游戏试题

    这篇文章主要为大家详细介绍了java实现猜拳游戏试题,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-03-03

最新评论