Java多线程并发生产者消费者设计模式实例解析

 更新时间:2020年03月24日 11:30:23   作者:平凡希  
这篇文章主要介绍了Java多线程并发生产者消费者设计模式实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

一、两个线程一个生产者一个消费者

需求情景

两个线程,一个负责生产,一个负责消费,生产者生产一个,消费者消费一个。

涉及问题

  • 同步问题:如何保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用标记或加锁机制。
  • wait() / nofity() 方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。
  • wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行。
  • notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

代码实现(共三个类和一个main方法的测试类)

Resource.java

package com.demo.ProducerConsumer;

/**
 * 资源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;

  /**
   * 生产资源
   */
  public synchronized void create() {
    if (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;
      try {
        wait();//让生产线程等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生产一个
    System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
    flag = true;//将资源标记为已经生产
    notify();//唤醒在等待操作资源的线程(队列)
  }

  /**
   * 消费资源
   */
  public synchronized void destroy() {
    if (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消费者****" + number);

    flag = false;
    notify();
  }
}

Producer.java

package com.demo.ProducerConsumer;

/**
 * 生产者
 * @author lixiaoxi
 *
 */
public class Producer implements Runnable{

  private Resource resource;

  public Producer(Resource resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      resource.create();
    }

  }
}

Consumer.java

package com.demo.ProducerConsumer;

/**
 * 消费者
 * @author lixiaoxi
 *
 */
public class Consumer implements Runnable{

  private Resource resource;

  public Consumer(Resource resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      resource.destroy();
    }

  }
}

ProducerConsumerTest.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest {

  public static void main(String args[]) {
    Resource resource = new Resource();
    new Thread(new Producer(resource)).start();//生产者线程
    new Thread(new Consumer(resource)).start();//消费者线程

  }
}

打印结果:

以上打印结果可以看出没有任何问题。

二、多个线程,多个生产者和多个消费者的问题

需求情景

四个线程,两个个负责生产,两个个负责消费,生产者生产一个,消费者消费一个。

涉及问题

notifyAll()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的所有线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

再次测试代码

ProducerConsumerTest.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest {

  public static void main(String args[]) {
    Resource resource = new Resource();
    new Thread(new Producer(resource)).start();//生产者线程
    new Thread(new Producer(resource)).start();//生产者线程
    new Thread(new Consumer(resource)).start();//消费者线程
    new Thread(new Consumer(resource)).start();//消费者线程

  }
}

运行结果:

通过以上打印结果发现问题

147生产了一次,消费了两次。169生产了,而没有消费。

原因分析

当两个线程同时操作生产者生产或者消费者消费时,如果有生产者或消费者的两个线程都wait()时,再次notify(),由于其中一个线程已经改变了标记而另外一个线程再次往下直接执行的时候没有判断标记而导致的。if判断标记,只有一次,会导致不该运行的线程运行了。出现了数据错误的情况。

解决方案

while判断标记,解决了线程获取执行权后,是否要运行!也就是每次wait()后再notify()时先再次判断标记。

代码改进(Resource中的 if -> while)

Resource.java

package com.demo.ProducerConsumer;

/**
 * 资源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;

  /**
   * 生产资源
   */
  public synchronized void create() {
    while (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;
      try {
        wait();//让生产线程等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生产一个
    System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
    flag = true;//将资源标记为已经生产
    notify();//唤醒在等待操作资源的线程(队列)
  }

  /**
   * 消费资源
   */
  public synchronized void destroy() {
    while (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消费者****" + number);

    flag = false;
    notify();
  }
}

运行结果:

再次发现问题

打印到某个值比如生产完187,程序运行卡死了,好像锁死了一样。

原因分析

notify:只能唤醒一个线程,如果本方唤醒了本方,没有意义。而且while判断标记+notify会导致”死锁”。

解决方案

notifyAll解决了本方线程一定会唤醒对方线程的问题。

最后代码改进(Resource中的 notify() -> notifyAll())

Resource.java

package com.demo.ProducerConsumer;

/**
 * 资源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;

  /**
   * 生产资源
   */
  public synchronized void create() {
    while (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;
      try {
        wait();//让生产线程等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生产一个
    System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
    flag = true;//将资源标记为已经生产
    notifyAll();//唤醒在等待操作资源的线程(队列)
  }

  /**
   * 消费资源
   */
  public synchronized void destroy() {
    while (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消费者****" + number);

    flag = false;
    notifyAll();
  }
}

运行结果:

以上就大功告成了,没有任何问题。

再来梳理一下整个流程。按照示例,生产者消费者交替运行,每次生产后都有对应的消费者,测试类创建实例,如果是生产者先运行,进入run()方法,进入create()方法,flag默认为false,number+1,生产者生产一个产品,flag置为true,同时调用notifyAll()方法,唤醒所有正在等待的线程,接下来如果还是生产者运行呢?这是flag为true,进入while循环,执行wait()方法,接下来如果是消费者运行的话,调用destroy()方法,这时flag为true,消费者购买了一次产品,随即将flag置为false,并唤醒所有正在等待的线程。这就是一次完整的多生产者对应多消费者的问题。

三、使用Lock和Condition来解决生产者消费者问题

上面的代码有一个问题,就是我们为了避免所有的线程都处于等待的状态,使用了notifyAll方法来唤醒所有的线程,即notifyAll唤醒的是自己方和对方线程。如果我需要只是唤醒对方的线程,比如:生产者只能唤醒消费者的线程,消费者只能唤醒生产者的线程。

在jdk1.5当中为我们提供了多线程的升级解决方案:

1. 将同步synchronized替换成了Lock操作。

2. 将Object中的wait,notify,notifyAll方法替换成了Condition对象。

3. 可以只唤醒对方的线程。

完整代码:

Resource1.java

package com.demo.ProducerConsumer;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 资源
 * @author lixiaoxi
 *
 */
public class Resource1 {

  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;
  
  private Lock lock = new ReentrantLock();
  //使用lock建立生产者的condition对象
  private Condition condition_pro = lock.newCondition(); 
  //使用lock建立消费者的condition对象
  private Condition condition_con = lock.newCondition(); 


  /**
   * 生产资源
   */
  public void create() throws InterruptedException {
    
    try{
      lock.lock();
      //先判断标记是否已经生产了,如果已经生产,等待消费
      while(flag){
        //生产者等待
        condition_pro.await();
      }
      //生产一个
      number++;
      System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
      //将资源标记为已经生产
      flag = true;
      //生产者生产完毕后,唤醒消费者的线程(注意这里不是signalAll)
      condition_con.signal();
    }finally{
      lock.unlock();
    }
  }

  /**
   * 消费资源
   */
  public void destroy() throws InterruptedException{

    try{
      lock.lock();
      //先判断标记是否已经消费了,如果已经消费,等待生产
      while(!flag){
        //消费者等待
        condition_con.await();
      }
      
      System.out.println(Thread.currentThread().getName() + "消费者****" + number);
      //将资源标记为已经消费
      flag = false;
      //消费者消费完毕后,唤醒生产者的线程
      condition_pro.signal();
    }finally{
      lock.unlock();
    }
  }
}

Producer1.java

package com.demo.ProducerConsumer;

/**
 * 生产者
 * @author lixiaoxi
 *
 */
public class Producer1 implements Runnable{

  private Resource1 resource;

  public Producer1(Resource1 resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
        resource.create();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
  
}

Consumer1.java

package com.demo.ProducerConsumer;

/**
 * 消费者
 * @author lixiaoxi
 *
 */
public class Consumer1 implements Runnable{

  private Resource1 resource;

  public Consumer1(Resource1 resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
        resource.destroy();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
  
}

ProducerConsumerTest1.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest1 {

  public static void main(String args[]) {
    Resource1 resource = new Resource1();
    new Thread(new Producer1(resource)).start();//生产者线程
    new Thread(new Producer1(resource)).start();//生产者线程
    new Thread(new Consumer1(resource)).start();//消费者线程
    new Thread(new Consumer1(resource)).start();//消费者线程

  }
}

运行结果:

四、总结

1、如果生产者、消费者都是1个,那么flag标记可以用if判断。这里有多个,必须用while判断。

2、在while判断的同时,notify函数可能唤醒本类线程(如一个消费者唤醒另一个消费者),这会导致所有消费者忙等待,程序无法继续往下执行。使用notifyAll函数代替notify可以解决这个问题,notifyAll可以保证非本类线程被唤醒(消费者线程能唤醒生产者线程,反之也可以),解决了忙等待问题。

小心假死

生产者/消费者模型最终达到的目的是平衡生产者和消费者的处理能力,达到这个目的的过程中,并不要求只有一个生产者和一个消费者。可以多个生产者对应多个消费者,可以一个生产者对应一个消费者,可以多个生产者对应一个消费者。

假死就发生在上面三种场景下。假死指的是全部线程都进入了WAITING状态,那么程序就不再执行任何业务功能了,整个项目呈现停滞状态。

比方说有生产者A和生产者B,缓冲区由于空了,消费者处于WAITING。生产者B处于WAITING,生产者A被消费者通知生产,生产者A生产出来的产品本应该通知消费者,结果通知了生产者B,生产者B被唤醒,发现缓冲区满了,于是继续WAITING。至此,两个生产者线程处于WAITING,消费者处于WAITING,系统假死。

上面的分析可以看出,假死出现的原因是因为notify的是同类,所以非单生产者/单消费者的场景,可以采取两种方法解决这个问题:

(1)synchronized用notifyAll()唤醒所有线程、ReentrantLock用signalAll()唤醒所有线程。

(2)用ReentrantLock定义两个Condition,一个表示生产者的Condition,一个表示消费者的Condition,唤醒的时候调用相应的Condition的signal()方法就可以了。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • Java中使用MongoDB数据库实例Demo

    Java中使用MongoDB数据库实例Demo

    MongoDB是由C++语言编写的,基于分布式文件存储的数据库,是一个介于关系数据库和非关系数据库之间的产品,是最接近于关系型数据库的NoSQL数据库,下面这篇文章主要给大家介绍了关于Java中使用MongoDB数据库的相关资料,需要的朋友可以参考下
    2023-12-12
  • mybatis-plus用insertBatchSomeColumn方法批量新增指定字段

    mybatis-plus用insertBatchSomeColumn方法批量新增指定字段

    mybatisPlus底层的新增方法是一条一条的新增的,下面这篇文章主要给大家介绍了关于mybatis-plus用insertBatchSomeColumn方法批量新增指定字段的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2023-05-05
  • Java中的StringBuilder()常见方法详解

    Java中的StringBuilder()常见方法详解

    StringBuilder是一个可变的字符序列,此类提供一个与 StringBuffer 兼容的 API,但不保证同步,这篇文章主要介绍了StringBuilder()常见方法,需要的朋友可以参考下
    2023-09-09
  • JDK14性能管理工具之Jconsole的使用详解

    JDK14性能管理工具之Jconsole的使用详解

    JConsole是JDK自带的管理工具,在JAVA_HOME/bin下面,直接命令JConsole即可开启JConsole。接下来通过本文给大家分享JDK14性能管理工具之Jconsole的使用,感兴趣的朋友一起看看吧
    2020-05-05
  • idea如何快速查找一个类或类中方法名和变量

    idea如何快速查找一个类或类中方法名和变量

    这篇文章主要介绍了idea如何快速查找一个类或类中方法名和变量问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-11-11
  • Spring中的Aware接口及应用场景详解

    Spring中的Aware接口及应用场景详解

    这篇文章主要介绍了Spring中的Aware接口及应用场景,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-01-01
  • java解析php函数json_encode unicode 编码问题

    java解析php函数json_encode unicode 编码问题

    这篇文章主要介绍了java解析php函数json_encode unicode 编码问题,需要的朋友可以参考下
    2016-04-04
  • Java Hibernate中的持久化类和实体类关系

    Java Hibernate中的持久化类和实体类关系

    Hibernate是一种Java对象关系映射框架,通过持久化类将Java对象映射到数据库表中。持久化类需要实现无参构造器、具有标识属性和使用注解或XML进行映射。Hibernate通过Session来管理对象的状态,包括临时状态、持久化状态和游离状态
    2023-04-04
  • Java实现文件上传的两种方法(uploadify和Spring)

    Java实现文件上传的两种方法(uploadify和Spring)

    这篇文章主要为大家详细介绍了Java实现文件上传的两种方法,uploadify和Spring实现文件上传,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-11-11
  • Java实现的二叉树常用操作【前序建树,前中后递归非递归遍历及层序遍历】

    Java实现的二叉树常用操作【前序建树,前中后递归非递归遍历及层序遍历】

    这篇文章主要介绍了Java实现的二叉树常用操作,包括二叉树的前序建树,前中后递归非递归遍历及层序遍历等相关操作技巧,需要的朋友可以参考下
    2018-01-01

最新评论