介绍
生产者消费者问题,描述的是共享固定大小缓冲区的两个进程——生产者和消费者,在实际运行时会发生的问题。
即生产者不能一直生产而不消费,这会造成缓冲区数据堆积;而消费者也需要有数据,才能进行消费。
使用 Java 多线程相关语法,解决生产者与消费者问题,有助于在以后的工作学习中,解决类似多个线程协同处理共享资源的问题。
Java 提供了关键字 synchronize、及相关工具类实现多线程,这里将使用 3 种方式实现。
实现思路
定义共享缓冲区,缓冲区能够添加元素和去除元素。缓冲区满时将不能添加元素,缓冲区为空时将不能去除元素
定义生产者,负责向缓冲区添加数据;定义消费者,负责向缓冲区取数据
使用 Synchronize 关键字实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| public class TestA {
// 定义共享资源区 class Resource{ private int currCount = 0; private int maxCount = 3; private Object object = new Object();
// 存数据之前先拿到对象锁 public void put() throws Exception{ synchronized (object){ while (currCount >= maxCount){ try { object.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } currCount++; System.out.println(Thread.currentThread().getName() + " put:"+currCount); object.notify(); } }
public void take() throws Exception{ synchronized (object){ while (currCount == 0){ try { object.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } currCount--; System.out.println(Thread.currentThread().getName() + " take:"+currCount); object.notify(); } } }
// 生产者,传入资源实例 存数据 class Producer implements Runnable{
private Resource resource;
public Producer(Resource resource){ this.resource = resource; }
@Override public void run() { while (true){ try { sleep(1000); resource.put(); } catch (Exception e) { e.printStackTrace(); } } } }
// 消费者,传入资源实例 取数据 class Consumer implements Runnable{
private Resource resource;
public Consumer(Resource resource){ this.resource = resource; } @Override public void run() { while (true){ try { sleep(1000); resource.take(); } catch (Exception e) { e.printStackTrace(); } } } }
public static void main(String[] args) { TestA testA = new TestA(); Resource resource = testA.new Resource(); // 定义 3 个生产者, 1 个消费者 new Thread(testA.new Producer(resource)).start(); new Thread(testA.new Producer(resource)).start(); new Thread(testA.new Producer(resource)).start(); new Thread(testA.new Consumer(resource)).start(); } }
|
使用 Lock、Condition 实现
与前面 synchronize 实现相比,主要区别就是 lock 需要在 finally 代码块中主动释放锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class TestB {
// 定义共享资源区 class Resource{ private int currCount = 0; private int maxCount = 3; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition();
public void put() throws Exception{ lock.lock(); try { while (currCount >= maxCount){ condition.await(); } currCount++; System.out.println(Thread.currentThread().getName() + " put:"+currCount); condition.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } }
public void take() throws Exception{ lock.lock(); try { while (currCount == 0){ condition.await(); } currCount--; System.out.println(Thread.currentThread().getName() + " take:"+currCount); condition.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } }
... 定义生产者、消费者、main 方法与上面一样,不再赘述。 }
|
使用 BlockingQueue 阻塞队列实现
阻塞队列无需额外的关键字或者方法进行控制,它的底层实现逻辑就是 Lock 方法实现,当数据不满足取或者存的条件时它就会阻塞等待。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class TestC {
// 定义共享资源区 class Resource{ private BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);
public void put() throws Exception{ queue.put(1); System.out.println(Thread.currentThread().getName()+" put:"+queue.size()); }
public void take() throws Exception{ queue.take(); System.out.println(Thread.currentThread().getName()+" take:"+queue.size()); } } ... 定义生产者、消费者、main 方法与上面一样,不再赘述。 }
|
TODO
任务同步,Java 提供很多实现方式。后续学习了其它工具类的用法再来更新这个例子。