生产者/消费者模式

概念

生产者消费者问题,也称有限缓冲问题(Bounded-buffer problem)是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

  • 生产者在缓冲区满时休眠,等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据
  • 消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者
1
2
3
4
# -*- coding: utf-8 -*
from threading import Condition
import time
import random
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
condition = Condition()
class ConsumerThread(Thread):
""" Consumer """
def run(self):
global queue
while True:
condition.acquire()
# 在消费前检查队列是否为空
if not queue:
print "Nothing in queue, consumer is waiting"
# 如果为空,调用condition实例的wait()方法
condition.wait()
print "Producer added something to queue and notified the consumer"
num = queue.pop(0)
print "Consumed", num
condition.release()
time.sleep(random.random())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class ProducerThread(Thread):
""" Producer """
def run(self):
nums = range(5)
global queue
while True:
condition.acquire()
num = random.choice(nums)
queue.append(num)
print "Produced", num
# 调用notify()方法后,consumer被唤醒,但唤醒不意味着它可以运行
condition.notify()
# notify()并不是释放lock,调用notify()后,lock依然被生产者所持有
condition.release()
time.sleep(random.random())
if __name__ == '__main__':
ProducerThread().start()
ConsumerThread().start()

队列限制

为队列增加大小限制,即生产者不能向一个满队列继续加入数据。

  • 在加入数据前,生产者检查队列是否为满
  • 如果不为满,生产者可以继续正常流程
  • 如果为满,生产者必须等待,调用condition实例的wait()
  • 消费者消耗对列,然后notify生产者
  • 当消费者释放lock,消费可以acquire这个lock然后往队列中加入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# -*- coding: utf-8 -*
from threading import Thread, Condition
import time
import random
queue = []
MAX_NUM = 10
condition = Condition()
class ProducerThread(Thread):
def run(self):
nums = range(5)
global queue
while True:
condition.acquire()
if len(queue) == MAX_NUM:
print "Queue full, producer is waiting"
condition.wait()
print "Space in queue, Consumer notified the producer"
num = random.choice(nums)
queue.append(num)
print "Produced", num
condition.notify()
condition.release()
time.sleep(random.random())
class ConsumerThread(Thread):
def run(self):
global queue
while True:
condition.acquire()
if not queue:
print "Nothing in queue, consumer is waiting"
condition.wait()
print "Producer added something to queue and notified the consumer"
num = queue.pop(0)
print "Consumed", num
condition.notify()
condition.release()
time.sleep(random.random())
if __name__ == '__main__':
ProducerThread().start()
ConsumerThread().start()

特别地,Queue封装了Condition的行为,如wait(),notify(),acquire()。使用Queue可以方便的实现以上功能。注意get()和put()都有适当的notify()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# -*- coding: utf-8 -*
from threading import Thread
import time
import random
from Queue import Queue
queue = Queue(10)
class ProducerThread(Thread):
def run(self):
nums = range(5)
global queue
while True:
num = random.choice(nums)
# put()在插入数据前有一个获取lock的逻辑,同时,put()也会检查队列是否已满。如果已满,它会在内部调用wait(),生产者开始等待
queue.put(num)
print "Produced", num
time.sleep(random.random())
class ConsumerThread(Thread):
def run(self):
global queue
while True:
# get()从队列中移出数据前会获取lock
# get()会检查队列是否为空,如果为空,消费者进入等待状态
num = queue.get()
# 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号,详见Python之多线程
# Queue.task_done()用于统计在队列中未完成的任务,这样Queue.join()就能获知任务的结束
queue.task_done()
print "Consumed", num
time.sleep(random.random())
if __name__ == '__main__':
ProducerThread().start()
ConsumerThread().start()
文章目录
  1. 1. 概念
  2. 2. 队列限制
|