定义

某个模块负责产生数据,这些数据由另一个模块来负责处理。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。

该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。

1
2
3
4
1、生产者仅仅在仓储未满时候生产,仓满则停止生产。
2、消费者仅仅在仓储有产品时候才能消费,仓空则等待。
3、当消费者发现仓储没产品可消费时候会通知生产者生产。
4、生产者在生产出可消费产品时候,应该通知等待的消费者去消费。

缓冲区作用

  • 解耦,生产者和消费者只依赖缓冲区,而不互相依赖。
  • 支持并发和异步。

网上看到一个代码示例,觉得很不错。
代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumer {

public class Product {
private int id;

public Product(int id) {
this.id = id;
}

public String toString() {
return "产品:" + this.id;
}
}

public class Storage {
BlockingQueue<Product> queues = new LinkedBlockingQueue<Product>(10);

public void push(Product p) throws InterruptedException {
queues.put(p);
}

public Product pop() throws InterruptedException {
return queues.take();
}
}

class Producer implements Runnable {
private String name;
private Storage s = null;

public Producer(String name, Storage s) {
this.name = name;
this.s = s;
}

public void run() {
try {
while (true) {
Product product = new Product((int) (Math.random() * 10000)); // 产生0~9999随机整数
System.out.println(name + "准备生产(" + product.toString() + ").");
s.push(product);
System.out.println(name + "已生产(" + product.toString() + ").");
System.out.println("===============");
Thread.sleep(500);
}
} catch (InterruptedException e1) {
e1.printStackTrace();
}

}
}

class Consumer implements Runnable {
private String name;
private Storage s = null;

public Consumer(String name, Storage s) {
this.name = name;
this.s = s;
}

public void run() {
try {
while (true) {
System.out.println(name + "准备消费产品.");
Product product = s.pop();
System.out.println(name + "已消费(" + product.toString() + ").");
System.out.println("===============");
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}

}

}

public static void main(String[] args) {
ProducerConsumer pc = new ProducerConsumer();

Storage s = pc.new Storage();

Producer p = pc.new Producer("张三", s);
Producer p2 = pc.new Producer("李四", s);

Consumer c = pc.new Consumer("王五", s);
Consumer c2 = pc.new Consumer("老刘", s);
Consumer c3 = pc.new Consumer("老林", s);

ExecutorService service = Executors.newCachedThreadPool();

service.submit(p);
//service.submit(p2);
service.submit(c);
service.submit(c2);
service.submit(c3);
}
}