转载

多线程学习笔记五-------------多生产者多消费者问题

线程通信--多生产者多消费者问题多生产者,多消费者会导致线程死锁的情况。

public class RoastDuck {  public static void main(String[] args) {   Duck d = new Duck();   ProductProcess pp = new ProductProcess(d);   ProductProcess pp0 = new ProductProcess(d);   ConsumeProcess cp = new ConsumeProcess(d);   ConsumeProcess cp0 = new ConsumeProcess(d);   Thread t0 = new Thread(pp0);   Thread t1 = new Thread(pp);   Thread t2 = new Thread(cp);   Thread t3 = new Thread(cp0);   t0.start();   t1.start();   t2.start();   t3.start();  } } class ProductProcess implements Runnable {  Duck d ;  public ProductProcess(Duck d) {   this.d = d;  }  @Override  public void run() {   while(true){    synchronized(d){     d.product("好吃的烤鸭");    }   }  } } class ConsumeProcess implements Runnable{  Duck d ;  public ConsumeProcess(Duck d) {   this.d = d;  }  @Override  public void run() {   while(true){    synchronized(d){     d.consume();    }   }  } } class Duck{  private String name;  private int count = 1;  private boolean flag = false;  public void product(String name){   //第一种写法:   if(flag == false){     this.name = name+count;    count++;    System.out.println(Thread.currentThread().getName()+" : product  "+this.name +"  duck");    flag = true;    this.notify();   }else{    try {     this.wait();    } catch (InterruptedException e) {     e.printStackTrace();    }   }   //第二种写法:   /*if(flag == true){    try {     this.wait();//如果这样写,程序会在这里进入线程池,下次被唤醒的时候就不会进行判断,直接进入下面的操作,会出现线程安全问题。    } catch (InterruptedException e) {     e.printStackTrace();    }   }   this.name = name+count;   count++;   System.out.println(Thread.currentThread().getName()+" : product  "+this.name +"  duck");   flag = true;   this.notify();*/   //第二种方法改正:   /*while(flag){    try {     this.wait();//如果这样写,程序会在这里进入线程池,下次被唤醒的时候就不会进行判断,直接进入下面的操作,会出现线程安全问题。    } catch (InterruptedException e) {     e.printStackTrace();    }   }   this.name = name+count;   count++;   System.out.println(Thread.currentThread().getName()+" : product  "+this.name +"  duck");   flag = true;   this.notify();*/  }  public void consume(){   if(flag == true){    System.out.println(Thread.currentThread().getName()+" : consume  "+this.name +"  duck");    flag = false;    this.notify();   }else{    try {     this.wait();    } catch (InterruptedException e) {     e.printStackTrace();    }   }  } } 

产生原因:

因为每次唤醒的线程都是任意的,如果唤醒线程的时候总是唤醒本方的线程(例如都是生产者或者都是消费者),就会导致线程死锁。

解决思路:

每次唤醒都要保证有对方的线程被唤醒。

解决方法:

每次都唤醒所有的线程。使用notifyAll();方法。

public class RoastDuck {  public static void main(String[] args) {   Duck d = new Duck();   ProductProcess pp = new ProductProcess(d);   ProductProcess pp0 = new ProductProcess(d);   ConsumeProcess cp = new ConsumeProcess(d);   ConsumeProcess cp0 = new ConsumeProcess(d);   Thread t0 = new Thread(pp0);   Thread t1 = new Thread(pp);   Thread t2 = new Thread(cp);   Thread t3 = new Thread(cp0);   t0.start();   t1.start();   t2.start();   t3.start();  } } class ProductProcess implements Runnable {  Duck d ;  public ProductProcess(Duck d) {   this.d = d;  }  @Override  public void run() {   while(true){    synchronized(d){     d.product("好吃的烤鸭");    }   }  } } class ConsumeProcess implements Runnable{  Duck d ;  public ConsumeProcess(Duck d) {   this.d = d;  }  @Override  public void run() {   while(true){    synchronized(d){     d.consume();    }   }  } } class Duck{  private String name;  private int count = 1;  private boolean flag = false;  public void product(String name){   //第一种写法:   if(flag == false){     this.name = name+count;    count++;    System.out.println(Thread.currentThread().getName()+" : product  "+this.name +"  duck");    flag = true;    this.notifyAll();   }else{    try {     this.wait();    } catch (InterruptedException e) {     e.printStackTrace();    }   }   //第二种写法:   /*if(flag == true){    try {     this.wait();//如果这样写,程序会在这里进入线程池,下次被唤醒的时候就不会进行判断,直接进入下面的操作,会出现线程安全问题。    } catch (InterruptedException e) {     e.printStackTrace();    }   }   this.name = name+count;   count++;   System.out.println(Thread.currentThread().getName()+" : product  "+this.name +"  duck");   flag = true;   this.notifyAll();*/   //第二种方法改正:   /*while(flag){    try {     this.wait();//如果这样写,程序会在这里进入线程池,下次被唤醒的时候就不会进行判断,直接进入下面的操作,会出现线程安全问题。    } catch (InterruptedException e) {     e.printStackTrace();    }   }   this.name = name+count;   count++;   System.out.println(Thread.currentThread().getName()+" : product  "+this.name +"  duck");   flag = true;   this.notifyAll();*/  }  public void consume(){   if(flag == true){    System.out.println(Thread.currentThread().getName()+" : consume  "+this.name +"  duck");    flag = false;    this.notifyAll();   }else{    try {     this.wait();    } catch (InterruptedException e) {     e.printStackTrace();    }   }  } } 

notifyAll()方法解决了一定能唤醒对方线程的问题。

以前解决多生产者多消费者的情况就用while循环判断+notifyAll解决。

但是一唤醒就全醒了,本方也醒了,本方还要重新判断标记。

这些问题在JDK1.5给出了解决方案:

1.5以前,我们用的synchronized同步代码块是隐式的,操作起来不灵活。

1.5以后,新增了一个接口---->lock

在java.util.concurrent.locks包中。

public interface Lock

Lock 实现提供了比使用 synchronized 方法(同步函数)和语句(同步块)可获得的【更广泛】的锁定操作。

此实现允许更灵活的结构,可以具有差别很大的属性,可以支持多个相关的 Condition 对象。

语法形式:

lock.lock();

code...

lock.unlock();

将同步和锁封装成了对象,并将操作锁的隐式方式定义到了该对象中,将隐式动作变成了显示动作。

public interface Condition

Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。

其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。

1.5之前解决线程安全问题:

class Object{  public final void wait() throws InterruptedException {   wait(0);  }  public final native void notify();  public final native void notifyAll(); } class Demo extends Object{ } class MyThread implements Thread{  Demo d = new Demo();  public void run(){   synchronized(d){    d.wait();   }  } } 

1.5之后解决线程安全问题:

class Condition{  void await() throws InterruptedException;  void signal();  void signalAll(); } Lock lock = new ReentrantLock(); Condition c1 = lock.newCondition(); Condition c2 = lock.newCondition(); lock.lock(); try{     code... }finally{     lock.unlock(); } 

1.5以前,一个锁上只能由一组监视器,这组监视器既监视生产者又监视消费者。

1.5以后,一个锁上面弄两组监视器,一组监视器监视生产者,一组监视器监视消费者。

用Lock解决线程死锁问题:

import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class RoastDuckLock {  public static void main(String[] args) {   Duck2 d = new Duck2();   ProductProcess2 pp = new ProductProcess2(d);   ProductProcess2 pp0 = new ProductProcess2(d);   ConsumeProcess2 cp = new ConsumeProcess2(d);   ConsumeProcess2 cp0 = new ConsumeProcess2(d);   Thread t0 = new Thread(pp0);   Thread t1 = new Thread(pp);   Thread t2 = new Thread(cp);   Thread t3 = new Thread(cp0);   t0.start();   t1.start();   t2.start();   t3.start();  } } class ProductProcess2 implements Runnable {  Duck2 d ;  public ProductProcess2(Duck2 d) {   this.d = d;  }  @Override  public void run() {   while(true){    d.product("好吃的烤鸭");   }  } } class ConsumeProcess2 implements Runnable{  Duck2 d ;  public ConsumeProcess2(Duck2 d) {   this.d = d;  }  @Override  public void run() {   while(true){    d.consume();   }  } } class Duck2{  private String name;  private int count = 1;  private boolean flag = false;  Lock lock = new ReentrantLock();  Condition product_con = lock.newCondition();  Condition consume_con = lock.newCondition();  public void product(String name){   lock.lock();   try{    if(flag == false){      this.name = name+count;     count++;     System.out.println(Thread.currentThread().getName()+" : product  "+this.name +"  duck");     flag = true;     consume_con.signal();    }else{     try {      product_con.await();     } catch (InterruptedException e) {      e.printStackTrace();     }    }   }finally{    lock.unlock();   }  }  public void consume(){   lock.lock();   try{    if(flag == true){     System.out.println(Thread.currentThread().getName()+" : consume    "+this.name +"  duck");     flag = false;     product_con.signal();    }else{     try {      consume_con.await();     } catch (InterruptedException e) {      e.printStackTrace();     }    }   }finally{    lock.unlock();   }  } } 

Lock接口:代替了同步块和同步函数,将隐式锁操作变成了显示锁操作。同时更加灵活,可以一个锁上面加多组监视器。

lock():获取锁

unlock():释放锁,需要放在finally当中。

Condition接口:出现了代替wait() notify() notifyAll(),并将这些监视器方法进行了封装,变成了Condition监视器对象。可以与任意锁进行组合。

await();

signal();

signalAll();

JDK文档内的示例:相对来说复杂,但是很经典,真正开发用的也是这种。

class BoundedBuffer {   final Lock lock = new ReentrantLock();   final Condition notFull  = lock.newCondition();    final Condition notEmpty = lock.newCondition();         final Object[] items = new Object[100];   int putptr, takeptr, count;        public void put(Object x) throws InterruptedException {     lock.lock();     try {       while (count == items.length)          notFull.await();       items[putptr] = x;        if (++putptr == items.length) putptr = 0;       ++count;       notEmpty.signal();     } finally {       lock.unlock();     }   }        public Object take() throws InterruptedException {     lock.lock();     try {       while (count == 0)          notEmpty.await();       Object x = items[takeptr];        if (++takeptr == items.length) takeptr = 0;       --count;       notFull.signal();       return x;     } finally {       lock.unlock();     }   }  } 
正文到此结束
Loading...