目录

JAVA生产者消费者模式

JAVA生产者消费者模式

一:什么是生产者消费者问题

生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。生产者生成一定量的数据放到缓冲区中,然后重复此过程;与此同时,消费者也在缓冲区消耗这些数据。生产者和消费者之间必须保持同步,要保证生产者不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据。不够完善的解决方法容易出现死锁的情况,此时进程都在等待唤醒。

示意图:
https://oss.lumingbao.cn/images/20211027/f46635c66f51474ca2b31df9889e8b25.png

便于更好的理解举个例子:

以开小卖部为例,老板要去进货,客户要从小卖部买东西,这个时候,老板就是生产者,客户就是消费者, 如果老板进货的速度小于客户买东西的速度,那么当仓库的货物卖完以后,就会出现消费者没有东西可买的情况; 如果老板进货的速度大于消费者消费的速度,那么仓库里的货物会越堆越多,把仓库撑爆。 所以这个时候就需要一种同步机制,当仓库的货物不足的时候去进货,当货物达到一定的量以后,暂停进货,消费者开始消费,仓库的货物不足时暂停消费,开始进货,循环往复。

二:下面介绍JAVA实现生产者消费者模型的四种方法:

1、wait()和notify()方法的实现

这是最简单最基础的实现,缓冲区满和为空时都调用wait()方法等待,当生产者生产了一个产品或者消费者消费了一个产品之后会唤醒所有线程。

当缓冲区已满时,生产者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行;
当缓冲区已空时,消费者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行。
当生产者向缓冲区放入一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态;
当消费者从缓冲区取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

仓库Storage.java

package cn.lumingbao.demo;

import java.util.LinkedList;

/**
 * ============================
 * 仓库
 * @ClassName Storage
 * @Description
 * @Author lumingbao
 * @Date 2018/03/12 15:54
 * ============================
 */
public class Storage {

  /**
   * 仓库容量
   */
  private final int MAX_SIZE = 10;

  /**
   * 仓库存储的载体
   */
  private LinkedList<Object> list = new LinkedList<>();

  /**
   * 生产
   */
  public void produce() {
    synchronized (list) {
      while (list.size() + 1 > MAX_SIZE) {
        System.out.println("【生产者" + Thread.currentThread().getName() + "】仓库已满");
        try {
          list.wait();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      list.add(new Object());
      System.out.println("【生产者" + Thread.currentThread().getName() + "】生产一个产品,现库存" + list.size());
      list.notifyAll();
    }
  }

  /**
   * 消费
   */
  public void consume() {
    synchronized (list) {
      while (list.size() == 0) {
        System.out.println("【消费者" + Thread.currentThread().getName() + "】仓库为空");
        try {
          list.wait();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      list.remove();
      System.out.println("【消费者" + Thread.currentThread().getName() + "】消费一个产品,现库存" + list.size());
      list.notifyAll();
    }
  }
}

生产者Producer.java

package cn.lumingbao.demo;

/**
 * ============================
 * 生产者
 * @ClassName Producer
 * @Description
 * @Author lumingbao
 * @Date 2018/03/12 15:58
 * ============================
 */
public class Producer implements Runnable{

  private Storage storage;

  public Producer(){}

  public Producer(Storage storage){
    this.storage = storage;
  }

  @Override
  public void run() {
    while(true){
      try{
        Thread.sleep(1000);
        storage.produce();
      }catch (InterruptedException e){
        e.printStackTrace();
      }
    }
  }
}

消费者Consumer.java

package cn.lumingbao.demo;

/**
 * ============================
 * 消费者
 * @ClassName Consumer
 * @Description
 * @Author lumingbao
 * @Date 2018/03/12 16:02
 * ============================
 */
public class Consumer implements Runnable{

  private Storage storage;

  public Consumer(){}

  public Consumer(Storage storage){
    this.storage = storage;
  }

  @Override
  public void run() {
    while(true){
      try{
        Thread.sleep(3000);
        storage.consume();
      }catch (InterruptedException e){
        e.printStackTrace();
      }
    }
  }
}

主函数Main.java

package cn.lumingbao.demo;

/**
 * ============================
 * 主函数
 * @ClassName Main
 * @Description
 * @Author lumingbao
 * @Date 2018/03/12 16:40
 * ============================
 */
public class Main {

    public static void main(String[] args) {
        Storage storage = new Storage();

        Thread p1 = new Thread(new Producer(storage));
        Thread p2 = new Thread(new Producer(storage));
        Thread p3 = new Thread(new Producer(storage));

        Thread c1 = new Thread(new Consumer(storage));
        Thread c2 = new Thread(new Consumer(storage));
        Thread c3 = new Thread(new Consumer(storage));

        p1.start();
        p2.start();
        p3.start();

        c1.start();
        c2.start();
        c3.start();
    }
}

运行结果: https://oss.lumingbao.cn/images/20211027/a994dd3e6ed54c24b86a8cd882686b0e.png

一个生产者线程运行produce方法,睡眠1s;一个消费者运行一次consume方法,睡眠3s。此次实验过程中,有3个生产者和3个消费者,也就是我们说的多对多的情况。仓库的容量为10,可以看出消费的速度明显慢于生产的速度,符合设定。

注意:

notifyAll()方法可使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。此时,优先级最高的哪个线程最先执行,但也有可能是随机执行的,这要取决于JVM虚拟机的实现。即最终也只有一个线程能被运行,上述线程优先级都相同,每次运行的线程都不确定是哪个,后来给线程设置优先级后也跟预期不一样,还是要看JVM的具体实现吧。

2、await() / signal()方法的实现

在JDK5中,用ReentrantLock和Condition可以实现等待/通知模型,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

在这里只需改动Storage类

package cn.lumingbao.demo;

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * ============================
 * 仓库
 * @ClassName Storage
 * @Description
 * @Author lumingbao
 * @Date 2018/03/12 15:54
 * ============================
 */
public class Storage {

    /**
     * 仓库容量
     */
    private final int MAX_SIZE = 10;

    /**
     * 仓库存储的载体
     */
    private LinkedList<Object> list = new LinkedList<>();

    /**
     * 锁
     */
    private final Lock lock = new ReentrantLock();

    /**
     * 仓库满的条件变量
     */
    private final Condition full = lock.newCondition();

    /**
     * 仓库空的条件变量
     */
    private final Condition empty = lock.newCondition();

    /**
     * 生产
     */
    public void produce() {
        // 获得锁
        lock.lock();
        while (list.size() + 1 > MAX_SIZE) {
            System.out.println("【生产者" + Thread.currentThread().getName() + "】仓库已满");
            try {
                full.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        list.add(new Object());
        System.out.println("【生产者" + Thread.currentThread().getName() + "】生产一个产品,现库存" + list.size());
        empty.signalAll();
        lock.unlock();
    }

    /**
     * 消费
     */
    public void consume() {
        // 获得锁
        lock.lock();
        while (list.size() == 0) {
            System.out.println("【消费者" + Thread.currentThread().getName() + "】仓库为空");
            try {
                empty.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        list.remove();
        System.out.println("【消费者" + Thread.currentThread().getName() + "】消费一个产品,现库存" + list.size());
        full.signalAll();
        lock.unlock();
    }
}

运行结果与wait()/notify()类似

3、BlockingQueue阻塞队列方法

BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。

put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。

同样改动Storage类

package cn.lumingbao.demo;

import java.util.concurrent.LinkedBlockingQueue;

/**
 * ============================
 * 仓库
 * @ClassName Storage
 * @Description
 * @Author lumingbao
 * @Date 2018/03/12 15:54
 * ============================
 */
public class Storage {

    /**
     * 仓库存储的载体
     */
    private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<>(10);

    /**
     * 生产
     */
    public void produce() {
        try{
            list.put(new Object());
            System.out.println("【生产者" + Thread.currentThread().getName() + "】生产一个产品,现库存" + list.size());
        } catch (InterruptedException e){
            e.printStackTrace();
        }
    }

    /**
     * 消费
     */
    public void consume() {
        try{
            list.take();
            System.out.println("【消费者" + Thread.currentThread().getName() + "】消费了一个产品,现库存" + list.size());
        } catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

可能会出现put()或take()和System.out.println()输出不匹配的情况,是由于它们之间没有同步造成的。BlockingQueue可以放心使用,这可不是它的问题,只是在它和别的对象之间的同步有问题。

4、信号量

Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。计数为0的Semaphore是可以release的,然后就可以acquire(即一开始使线程阻塞从而完成其他执行。)

同样改动Storage类

package cn.lumingbao.demo;

import java.util.LinkedList;
import java.util.concurrent.Semaphore;

/**
 * ============================
 * 仓库
 * @ClassName Storage
 * @Description
 * @Author lumingbao
 * @Date 2018/03/12 15:54
 * ============================
 */
public class Storage {

    /**
     * 仓库存储的载体
     */
    private LinkedList<Object> list = new LinkedList<Object>();

    /**
     * 仓库的最大容量
     */
    final Semaphore notFull = new Semaphore(10);

    /**
     * 将线程挂起,等待其他来触发
     */
    final Semaphore notEmpty = new Semaphore(0);

    /**
     * 互斥锁
     */
    final Semaphore mutex = new Semaphore(1);

    /**
     * 生产
     */
    public void produce() {
        try {
            notFull.acquire();
            mutex.acquire();
            list.add(new Object());
            System.out.println("【生产者" + Thread.currentThread().getName() + "】生产一个产品,现库存" + list.size());
        }
        catch (Exception e) {
            e.printStackTrace();
        } finally {
            mutex.release();
            notEmpty.release();
        }
    }

    /**
     * 消费
     */
    public void consume() {
        try {
            notEmpty.acquire();
            mutex.acquire();
            list.remove();
            System.out.println("【消费者" + Thread.currentThread().getName() + "】消费一个产品,现库存" + list.size());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            mutex.release();
            notFull.release();
        }
    }
}

评论