转载

生产者消费者问题(2) – Java手动实现semaphore和PV系统?

既然要解决生产者消费者问题,那第一步自然就是想到PV操作,但是发现Java并没有semaphore这一套系统,虽然麻烦了点,但是还是试图自己去实现看看。

于是我首先像模像样定义了一个semaphore。

public class Semaphore {    private int s;    public Semaphore(int s) {  this.s = s;  }    public int getS() {  return s;  }    public void setS(int s) {  this.s = s;  } } 

接下来实现一个PV系统。

importjava.util.Queue;   public class PVSystem {  private Launcherlauncher;    public PVSystem(Launcherlauncher) {  this.launcher = launcher;  }    public synchronizedvoid p(Semaphoresem) {  sem.setS(sem.getS()-1);    if (sem.getS() < 0) {  try {  launcher.threadQueue.offer(Thread.currentThread());  Thread.currentThread().wait();  } catch (InterruptedException e) {  e.printStackTrace();  }  }  }    public synchronizedvoid v(Semaphoresem) {  System.out.println(Thread.currentThread().getName());  sem.setS(sem.getS() + 1);  if (sem.getS() <= 0) {  launcher.threadQueue.poll().notify();  }  } } 

生产者和消费者线程直接调用PV操作

public class Producer extends Thread{  private Launcherlauncher;    public Producer(Launcherlauncher) {  this.launcher = launcher;  }    @Override  public void run() {  PVSystempv = new PVSystem(launcher);  while (true) {  pv.p(Launcher.empty);  pv.p(Launcher.mutex);  System.out.println(this.getName() + ":" + launcher.size + " -> " + (++launcher.size));    try {  Thread.sleep(500);  } catch (InterruptedException e) {  e.printStackTrace();  }    pv.v(Launcher.mutex);  pv.v(Launcher.full);  }  }   } 

消费者线程

public class Consumer extends Thread {  private Launcherlauncher;    public Consumer(Launcherlauncher) {  this.launcher = launcher;  }    @Override  public void run() {  PVSystempv = new PVSystem(launcher);  while (true) {  pv.p(Launcher.full);  pv.p(Launcher.mutex);  System.out.println(this.getName() + ":" + launcher.size + "->" + (--launcher.size));    try {  Thread.sleep(500);  } catch (InterruptedException e) {  e.printStackTrace();  }    pv.v(Launcher.mutex);  pv.v(Launcher.empty);  }  }   } 

然后在main函数里面启动,并且设置一个线程队列。

importjava.util.LinkedList; importjava.util.Queue;   public class Launcher {  public int size = 0;    public static Semaphorefull = new Semaphore(0);  public static Semaphoreempty = new Semaphore(10);    public static Semaphoremutex = new Semaphore(1);    public Queue<Thread> threadQueue = new LinkedList<>();    public static void main(String[] args) {  Launcherlauncher = new Launcher();    for (int i = 0; i < 1; i++) {  Producerproducer = new Producer(launcher);  producer.setName("Producer " + i);  producer.start();    Consumerconsumer = new Consumer(launcher);  consumer.setName("Consumer " + i);  consumer.start();  }  } } 

本以为大功告成,没想到马上就gg了。那么问题出在哪呢?再次查文档发现,问题出来我对synchronized关键字以及wait和notify函数的理解上。

  • 误以为object.wait()跟object.notify()是使调用该object的线程阻塞跟唤醒
  • p跟v函数的synchronized是没有发挥作用的,因为new了两个PVSystem对象。但是本意也不是同步,只是想实现PV操作,但是wait跟notify必须在synchronized修饰的方法里执行。
  • 报错illeageMonitorState。调用object.notify()的线程必须已经获得了该object的monitor,但是我调用thread.notify()显然是没有获得thread的monitor的。因为在synchronized方法里肯定已经获得了this的monitor,所以直接调用notify()不会报错,相当于调用this.notify()。

至此明白了,作为面向对象的Java语言,所有的一切都是面向对象的,也就是说,是封装了底层的互斥实现过程,直接对对象进行操作,而不让你自己操作线程来实现互斥。下面的这种方式才是正确的用法,用同步关键字修饰的obj自带互斥锁。如果是修饰方法,相当于synchronized(this)。

 synchronized (obj) {  while (condition) {  obj.wait();  }  // do something  obj.notify();  } 
  • wait是阻塞已经获得synchronized的monitor(相当于lock)的线程,并且释放monitor。
  • notify是唤醒**某一个**申请monitor的线程,但是不释放monitor。

也就是说,调用obj.notify()之后,唤醒线程,但是不释放monitor,被唤醒的线程可能因为拿不到锁又阻塞了。不过当synchronized修饰的代码块执行完毕后,monitor会自动释放。

搞明白以后,于是有了下面的修改。

原文  https://www.zhouchao.me/pc-java-semaphore-and-pv-implement/
正文到此结束
Loading...