生产者消费者Java实现

介绍

upload successful

生产者消费者问题,描述的是共享固定大小缓冲区的两个进程——生产者和消费者,在实际运行时会发生的问题。

即生产者不能一直生产而不消费,这会造成缓冲区数据堆积;而消费者也需要有数据,才能进行消费。

使用 Java 多线程相关语法,解决生产者与消费者问题,有助于在以后的工作学习中,解决类似多个线程协同处理共享资源的问题。

Java 提供了关键字 synchronize、及相关工具类实现多线程,这里将使用 3 种方式实现。

实现思路

  1. 定义共享缓冲区,缓冲区能够添加元素和去除元素。缓冲区满时将不能添加元素,缓冲区为空时将不能去除元素

  2. 定义生产者,负责向缓冲区添加数据;定义消费者,负责向缓冲区取数据

使用 Synchronize 关键字实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public class TestA {

// 定义共享资源区
class Resource{
private int currCount = 0;
private int maxCount = 3;
private Object object = new Object();

// 存数据之前先拿到对象锁
       public void put() throws Exception{
synchronized (object){
while (currCount >= maxCount){
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
currCount++;
System.out.println(Thread.currentThread().getName() + " put:"+currCount);
object.notify();
}
}

public void take() throws Exception{
synchronized (object){
while (currCount == 0){
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
currCount--;
System.out.println(Thread.currentThread().getName() + " take:"+currCount);
object.notify();
}
}
}

// 生产者,传入资源实例 存数据
class Producer implements Runnable{

private Resource resource;

public Producer(Resource resource){
this.resource = resource;
}

@Override
public void run() {
while (true){
try {
sleep(1000);
resource.put();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

// 消费者,传入资源实例 取数据
class Consumer implements Runnable{

private Resource resource;

public Consumer(Resource resource){
this.resource = resource;
}

@Override
public void run() {
while (true){
try {
sleep(1000);
resource.take();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
TestA testA = new TestA();
Resource resource = testA.new Resource();
// 定义 3 个生产者, 1 个消费者
new Thread(testA.new Producer(resource)).start();
new Thread(testA.new Producer(resource)).start();
new Thread(testA.new Producer(resource)).start();
new Thread(testA.new Consumer(resource)).start();
}
}

使用 Lock、Condition 实现

与前面 synchronize 实现相比,主要区别就是 lock 需要在 finally 代码块中主动释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class TestB {

// 定义共享资源区
class Resource{
private int currCount = 0;
private int maxCount = 3;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();


public void put() throws Exception{
lock.lock();
try {
while (currCount >= maxCount){
condition.await();
}
currCount++;
System.out.println(Thread.currentThread().getName() + " put:"+currCount);
condition.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}

public void take() throws Exception{
lock.lock();
try {
while (currCount == 0){
condition.await();
}
currCount--;
System.out.println(Thread.currentThread().getName() + " take:"+currCount);
condition.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}

...
   定义生产者、消费者、main 方法与上面一样,不再赘述。
}

使用 BlockingQueue 阻塞队列实现

阻塞队列无需额外的关键字或者方法进行控制,它的底层实现逻辑就是 Lock 方法实现,当数据不满足取或者存的条件时它就会阻塞等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TestC {

// 定义共享资源区
class Resource{
private BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);

public void put() throws Exception{
queue.put(1);
System.out.println(Thread.currentThread().getName()+" put:"+queue.size());
}

public void take() throws Exception{
queue.take();
System.out.println(Thread.currentThread().getName()+" take:"+queue.size());
}
}
...
   定义生产者、消费者、main 方法与上面一样,不再赘述。
}

TODO

任务同步,Java 提供很多实现方式。后续学习了其它工具类的用法再来更新这个例子。