Java多线程并发生产者消费者设计模式实例解析

一、两个线程一个生产者一个消费者

需求情景

两个线程,一个负责生产,一个负责消费,生产者生产一个,消费者消费一个。

涉及问题

  • 同步问题:如何保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用标记或加机制。
  • wait() / nofity() 方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。
  • wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行。
  • notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

代码实现(共三个类和一个main方法的测试类)

Resource.java

package com.demo.ProducerConsumer;

/**
 * 资源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;

  /**
   * 生产资源
   */
  public synchronized void create() {
    if (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;
      try {
        wait();//让生产线程等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生产一个
    System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
    flag = true;//将资源标记为已经生产
    notify();//唤醒在等待操作资源的线程(队列)
  }

  /**
   * 消费资源
   */
  public synchronized void destroy() {
    if (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消费者****" + number);

    flag = false;
    notify();
  }
}

Producer.java

package com.demo.ProducerConsumer;

/**
 * 生产者
 * @author lixiaoxi
 *
 */
public class Producer implements Runnable{

  private Resource resource;

  public Producer(Resource resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      resource.create();
    }

  }
}

Consumer.java

package com.demo.ProducerConsumer;

/**
 * 消费者
 * @author lixiaoxi
 *
 */
public class Consumer implements Runnable{

  private Resource resource;

  public Consumer(Resource resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      resource.destroy();
    }

  }
}

ProducerConsumerTest.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest {

  public static void main(String args[]) {
    Resource resource = new Resource();
    new Thread(new Producer(resource)).start();//生产者线程
    new Thread(new Consumer(resource)).start();//消费者线程

  }
}

打印结果:

以上打印结果可以看出没有任何问题。

二、多个线程,多个生产者和多个消费者的问题

需求情景

四个线程,两个个负责生产,两个个负责消费,生产者生产一个,消费者消费一个。

涉及问题

notifyAll()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的所有线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

再次测试代码

ProducerConsumerTest.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest {

  public static void main(String args[]) {
    Resource resource = new Resource();
    new Thread(new Producer(resource)).start();//生产者线程
    new Thread(new Producer(resource)).start();//生产者线程
    new Thread(new Consumer(resource)).start();//消费者线程
    new Thread(new Consumer(resource)).start();//消费者线程

  }
}

运行结果:

通过以上打印结果发现问题

147生产了一次,消费了两次。169生产了,而没有消费。

原因分析

当两个线程同时操作生产者生产或者消费者消费时,如果有生产者或消费者的两个线程都wait()时,再次notify(),由于其中一个线程已经改变了标记而另外一个线程再次往下直接执行的时候没有判断标记而导致的。if判断标记,只有一次,会导致不该运行的线程运行了。出现了数据错误的情况。

解决方案

while判断标记,解决了线程获取执行权后,是否要运行!也就是每次wait()后再notify()时先再次判断标记。

代码改进(Resource中的 if -> while)

Resource.java

package com.demo.ProducerConsumer;

/**
 * 资源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;

  /**
   * 生产资源
   */
  public synchronized void create() {
    while (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;
      try {
        wait();//让生产线程等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生产一个
    System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
    flag = true;//将资源标记为已经生产
    notify();//唤醒在等待操作资源的线程(队列)
  }

  /**
   * 消费资源
   */
  public synchronized void destroy() {
    while (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消费者****" + number);

    flag = false;
    notify();
  }
}

运行结果:

再次发现问题

打印到某个值比如生产完187,程序运行卡死了,好像锁死了一样。

原因分析

notify:只能唤醒一个线程,如果本方唤醒了本方,没有意义。而且while判断标记+notify会导致”死锁”。

解决方案

notifyAll解决了本方线程一定会唤醒对方线程的问题。

最后代码改进(Resource中的 notify() -> notifyAll())

Resource.java

package com.demo.ProducerConsumer;

/**
 * 资源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;

  /**
   * 生产资源
   */
  public synchronized void create() {
    while (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;
      try {
        wait();//让生产线程等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生产一个
    System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
    flag = true;//将资源标记为已经生产
    notifyAll();//唤醒在等待操作资源的线程(队列)
  }

  /**
   * 消费资源
   */
  public synchronized void destroy() {
    while (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消费者****" + number);

    flag = false;
    notifyAll();
  }
}

运行结果:

以上就大功告成了,没有任何问题。

再来梳理一下整个流程。按照示例,生产者消费者交替运行,每次生产后都有对应的消费者,测试类创建实例,如果是生产者先运行,进入run()方法,进入create()方法,flag默认为false,number+1,生产者生产一个产品,flag置为true,同时调用notifyAll()方法,唤醒所有正在等待的线程,接下来如果还是生产者运行呢?这是flag为true,进入while循环,执行wait()方法,接下来如果是消费者运行的话,调用destroy()方法,这时flag为true,消费者购买了一次产品,随即将flag置为false,并唤醒所有正在等待的线程。这就是一次完整的多生产者对应多消费者的问题。

三、使用Lock和Condition来解决生产者消费者问题

上面的代码有一个问题,就是我们为了避免所有的线程都处于等待的状态,使用了notifyAll方法来唤醒所有的线程,即notifyAll唤醒的是自己方和对方线程。如果我需要只是唤醒对方的线程,比如:生产者只能唤醒消费者的线程,消费者只能唤醒生产者的线程。

在jdk1.5当中为我们提供了多线程的升级解决方案:

1. 将同步synchronized替换成了Lock操作。

2. 将Object中的wait,notify,notifyAll方法替换成了Condition对象。

3. 可以只唤醒对方的线程。

完整代码:

Resource1.java

package com.demo.ProducerConsumer;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 资源
 * @author lixiaoxi
 *
 */
public class Resource1 {

  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;

  private Lock lock = new ReentrantLock();
  //使用lock建立生产者的condition对象
  private Condition condition_pro = lock.newCondition();
  //使用lock建立消费者的condition对象
  private Condition condition_con = lock.newCondition(); 

  /**
   * 生产资源
   */
  public void create() throws InterruptedException {

    try{
      lock.lock();
      //先判断标记是否已经生产了,如果已经生产,等待消费
      while(flag){
        //生产者等待
        condition_pro.await();
      }
      //生产一个
      number++;
      System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
      //将资源标记为已经生产
      flag = true;
      //生产者生产完毕后,唤醒消费者的线程(注意这里不是signalAll)
      condition_con.signal();
    }finally{
      lock.unlock();
    }
  }

  /**
   * 消费资源
   */
  public void destroy() throws InterruptedException{

    try{
      lock.lock();
      //先判断标记是否已经消费了,如果已经消费,等待生产
      while(!flag){
        //消费者等待
        condition_con.await();
      }

      System.out.println(Thread.currentThread().getName() + "消费者****" + number);
      //将资源标记为已经消费
      flag = false;
      //消费者消费完毕后,唤醒生产者的线程
      condition_pro.signal();
    }finally{
      lock.unlock();
    }
  }
}

Producer1.java

package com.demo.ProducerConsumer;

/**
 * 生产者
 * @author lixiaoxi
 *
 */
public class Producer1 implements Runnable{

  private Resource1 resource;

  public Producer1(Resource1 resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
        resource.create();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

}

Consumer1.java

package com.demo.ProducerConsumer;

/**
 * 消费者
 * @author lixiaoxi
 *
 */
public class Consumer1 implements Runnable{

  private Resource1 resource;

  public Consumer1(Resource1 resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
        resource.destroy();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

}

ProducerConsumerTest1.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest1 {

  public static void main(String args[]) {
    Resource1 resource = new Resource1();
    new Thread(new Producer1(resource)).start();//生产者线程
    new Thread(new Producer1(resource)).start();//生产者线程
    new Thread(new Consumer1(resource)).start();//消费者线程
    new Thread(new Consumer1(resource)).start();//消费者线程

  }
}

运行结果:

四、总结

1、如果生产者、消费者都是1个,那么flag标记可以用if判断。这里有多个,必须用while判断。

2、在while判断的同时,notify函数可能唤醒本类线程(如一个消费者唤醒另一个消费者),这会导致所有消费者忙等待,程序无法继续往下执行。使用notifyAll函数代替notify可以解决这个问题,notifyAll可以保证非本类线程被唤醒(消费者线程能唤醒生产者线程,反之也可以),解决了忙等待问题。

小心假死

生产者/消费者模型最终达到的目的是平衡生产者和消费者的处理能力,达到这个目的的过程中,并不要求只有一个生产者和一个消费者。可以多个生产者对应多个消费者,可以一个生产者对应一个消费者,可以多个生产者对应一个消费者。

假死就发生在上面三种场景下。假死指的是全部线程都进入了WAITING状态,那么程序就不再执行任何业务功能了,整个项目呈现停滞状态。

比方说有生产者A和生产者B,缓冲区由于空了,消费者处于WAITING。生产者B处于WAITING,生产者A被消费者通知生产,生产者A生产出来的产品本应该通知消费者,结果通知了生产者B,生产者B被唤醒,发现缓冲区满了,于是继续WAITING。至此,两个生产者线程处于WAITING,消费者处于WAITING,系统假死。

上面的分析可以看出,假死出现的原因是因为notify的是同类,所以非单生产者/单消费者的场景,可以采取两种方法解决这个问题:

(1)synchronized用notifyAll()唤醒所有线程、ReentrantLock用signalAll()唤醒所有线程。

(2)用ReentrantLock定义两个Condition,一个表示生产者的Condition,一个表示消费者的Condition,唤醒的时候调用相应的Condition的signal()方法就可以了。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

时间:2020-03-24

Java多线程 线程状态原理详解

这篇文章主要介绍了Java多线程 线程状态原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 java.lang.Thread.State枚举定义了6种线程状态. NEW: 尚未启动(start)的线程的线程状态 RUNNABLE: 运行状态,但线程可能正在JVM中执行,也可能在等待CPU调度 BLOCKED: 线程阻塞,等待监视器锁以进入同步代码块/方法 WAITING: 等待状态.使用以下不带超时的方式时会进入:Object.wait.

Java模拟多线程实现抢票代码实例

这篇文章主要介绍了Java模拟多线程实现抢票,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 实现100张票抢购的demo 这里需要一个变量,来保存100张 局部变量: 定义在方法内,方法运行存在,方法运行结束销毁,无法保存一个持久化数据!!! 成员变量: 保存在类对象内,创建对象之后存在,对象不销毁成员变量也不会被内存收回.因为 在每一个类对象中,都存在一个对应的成员变量,这些成员变量不是同一个数据.不是 共享资源,不合适!!! 静态成员变量:

Java多线程状态及方法实例解析

这篇文章主要介绍了Java多线程状态及方法实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 首先介绍线程的五种状态: 新生态:New Thread() 就绪态:准备抢CPU时间片 运行态:抢到了CPU时间片 阻塞态:放弃已经抢到的CPU时间片,且暂时不参与争抢 死亡态:Run运行完了之后 接下来介绍三种方法:线程的阻塞,线程的优先级设置,线程的礼让 public class MutliThreadDemo4 { public static

Java多线程模拟电影售票过程

用多线程模拟电影售票过程(Java实训) 实训目的: 多线程的实现.线程同步 实训要求: 总票数和售票窗口数由键盘输入,用每个线程处理一个窗口的售票. Test.java package program5; import java.util.Scanner; public class Test { public static void main(String[] args) { // TODO Auto-generated method stub Scanner sc=new Scanner(S

Java多线程生产者消费者模式实现过程解析

单生产者与单消费者 示例: public class ProduceConsume { public static void main(String[] args) { String lock = new String(""); Produce produce = new Produce(lock); Consume consume = new Consume(lock); new Thread(() -> { while (true) { produce.setValue();

java多线程关键字final和static详解

这篇文章主要介绍了java多线程关键字final和static详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 final关键字 1.final关键字在单线程中的特点: 1)final修饰的静态成员:必须在进行显示初始化或静态代码块赋值,并且仅能赋值一次. 2)final修饰的类成员变量,可以在三个地方进行赋值:显示初始化.构造代码块和构造方法,并且仅能赋值一次. 3)final修饰的局部变量,必须在使用之前进行显示初始化(并不一定要在定义是

基于Java实现多线程下载并允许断点续传

完整代码:https://github.com/iyuanyb/Downloader 多线程下载及断点续传的实现是使用 HTTP/1.1 引入的 Range 请求参数,可以访问Web资源的指定区间的内容.虽然实现了多线程及断点续传,但还有很多不完善的地方. 包含四个类: Downloader: 主类,负责分配任务给各个子线程,及检测进度DownloadFile: 表示要下载的哪个文件,为了能写输入到文件的指定位置,使用 RandomAccessFile 类操作文件,多个线程写同一个文件需要保证线

Java多线程文件分片下载实现的示例代码

多线程下载介绍 多线程下载技术是很常见的一种下载方案,这种方式充分利用了多线程的优势,在同一时间段内通过多个线程发起下载请求,将需要下载的数据分割成多个部分,每一个线程只负责下载其中一个部分,然后将下载后的数据组装成完整的数据文件,这样便大大加快了下载效率.常见的下载器,迅雷,QQ旋风等都采用了这种技术. 分片下载 所谓分片下载就是要利用多线程的优势,将要下载的文件一块一块的分配到各个线程中去下载,这样就极大的提高了下载速度. 技术难点 并不能说是什么难点,只能说没接触过不知道罢了. 1.如何请

java压缩文件和下载图片示例

本文实例为大家分享了java压缩文件和下载图片示例,供大家参考,具体内容如下 主页面index.xml <%@ page language="java" import="java.util.*" pageEncoding="utf-8"%> <html> <head> <title>项目的主页</title> </head> <body> <h2>主页

webuploader在springMVC+jquery+Java开发环境下的大文件分片上传的实例代码

注意: 1,webuploader上传组件会和jQuery自带的上传组件冲突,所以不要使用<form>标签中添加上传文件的属性; enctype="multipart/form-data" 2.并且屏蔽ApplicationContext-mvc.xml里面的拦截配置! <!– 上传拦截,如最大上传值及最小上传值 –> <!–新增加的webuploader上传组件,必须要屏蔽这里的拦截机制 <bean id="multipartRes

Java FTP上传下载删除功能实例代码

在没给大家上完整代码之前先给大家说下注意点: FTP上传下载,容易出现乱码,记得转换 package com.yinhai.team.action; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; impo

Java利用Redis实现消息队列的示例代码

本文介绍了Java利用Redis实现消息队列的示例代码,分享给大家,具体如下: 应用场景 为什么要用redis? 二进制存储.java序列化传输.IO连接数高.连接频繁 一.序列化 这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口; 其代码如下: package Utils

java多线程编程技术详解和实例代码

java多线程编程技术详解和实例代码 1.   Java和他的API都可以使用并发. 可以指定程序包含不同的执行线程,每个线程都具有自己的方法调用堆栈和程序计数器,使得线程在与其他线程并发地执行能够共享程序范围内的资源,比如共享内存,这种能力被称为多线程编程(multithreading),在核心的C和C++语言中并不具备这种能力,尽管他们影响了JAVA的设计. 2.   线程的生命周期 新线程的生命周期从"新生"状态开始.程序启动线程前,线程一直是"新生"状态:

Java多线程并发开发之DelayQueue使用示例

在学习Java 多线程并发开发过程中,了解到DelayQueue类的主要作用:是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走.这种队列是有序的,即队头对象的延迟到期时间最长.注意:不能将null元素放置到这种队列中. Delayed,一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象.此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序. 在网上看到了一些

Java实现文件的加密解密功能示例

本文实例讲述了Java实现文件的加密解密功能分享给大家供大家参考,具体如下: package com.copy.encrypt; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; impor

利用java生成二维码工具类示例代码

Java多线程并发生产者消费者设计模式实例解析

二维码介绍 二维条形码最早发明于日本,它是用某种特定的几何图形按一定规律在平面(二维方向上)分布的黑白相间的图形记录数据符号信息的,在代码编制上巧妙地利用构成计算机内部逻辑基础的"0"."1"比特流的概念,使用若干个与二进制相对应的几何形体来表示文字数值信息,通过图象输入设备或光电扫描设备自动识读以实现信息自动处理. 如下为java生成二维码工具类,可以选择生成文件,或者直接在页面生成,话不多说了,来一起看看详细的示例代码吧. 示例代码 import java.aw

servlet+jquery实现文件上传进度条示例代码

Java多线程并发生产者消费者设计模式实例解析

现在文件的上传,特别是大文件上传,都需要进度条,让客户知道上传进度. 本文简单记录下如何弄进度条,以及一些上传信息,比如文件的大小,上传速度,预计剩余时间等一些相关信息.代码是匆忙下简单写的,一些验证没做,或代码存在一些隐患,不严谨的地方.本文代码只供参考. 进度条的样式多种多样,有些网站弄得非常绚烂漂亮.本文UI端不太懂,只会一些简单的基本的css而已,所以进度条弄得不好看.本文侧重的给读者提供一个参考,一个实现思路而已. 注:由于jQuery版本用的是2.1.1,所以如果跑本例子源码,请用I

Java 中的注解详解及示例代码

在Java中,注解(Annotation)引入始于Java5,用来描述Java代码的元信息,通常情况下注解不会直接影响代码的执行,尽管有些注解可以用来做到影响代码执行. 注解可以做什么 Java中的注解通常扮演以下角色 编译器指令 构建时指令 运行时指令 其中 Java内置了三种编译器指令,本文后面部分会重点介绍 Java注解可以应用在构建时,即当你构建你的项目时.构建过程包括生成源码,编译源码,生成xml文件,打包编译的源码和文件到JAR包等.软件的构建通常使用诸如Apache Ant和Mav

原文 

https://www.zhangshengrong.com/p/x7XRMDExNz/

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » Java多线程并发生产者消费者设计模式实例解析

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址