概念
生产者消费者问题,也称有限缓冲问题(Bounded-buffer problem)是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
- 生产者在缓冲区满时休眠,等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据
- 消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者
1 2 3 4
| from threading import Condition import time import random
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| condition = Condition() class ConsumerThread(Thread): """ Consumer """ def run(self): global queue while True: condition.acquire() if not queue: print "Nothing in queue, consumer is waiting" condition.wait() print "Producer added something to queue and notified the consumer" num = queue.pop(0) print "Consumed", num condition.release() time.sleep(random.random())
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| class ProducerThread(Thread): """ Producer """ def run(self): nums = range(5) global queue while True: condition.acquire() num = random.choice(nums) queue.append(num) print "Produced", num condition.notify() condition.release() time.sleep(random.random()) if __name__ == '__main__': ProducerThread().start() ConsumerThread().start()
|
队列限制
为队列增加大小限制,即生产者不能向一个满队列继续加入数据。
- 在加入数据前,生产者检查队列是否为满
- 如果不为满,生产者可以继续正常流程
- 如果为满,生产者必须等待,调用condition实例的wait()
- 消费者消耗对列,然后notify生产者
- 当消费者释放lock,消费可以acquire这个lock然后往队列中加入数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| from threading import Thread, Condition import time import random queue = [] MAX_NUM = 10 condition = Condition() class ProducerThread(Thread): def run(self): nums = range(5) global queue while True: condition.acquire() if len(queue) == MAX_NUM: print "Queue full, producer is waiting" condition.wait() print "Space in queue, Consumer notified the producer" num = random.choice(nums) queue.append(num) print "Produced", num condition.notify() condition.release() time.sleep(random.random()) class ConsumerThread(Thread): def run(self): global queue while True: condition.acquire() if not queue: print "Nothing in queue, consumer is waiting" condition.wait() print "Producer added something to queue and notified the consumer" num = queue.pop(0) print "Consumed", num condition.notify() condition.release() time.sleep(random.random()) if __name__ == '__main__': ProducerThread().start() ConsumerThread().start()
|
特别地,Queue封装了Condition的行为,如wait(),notify(),acquire()。使用Queue可以方便的实现以上功能。注意get()和put()都有适当的notify()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| from threading import Thread import time import random from Queue import Queue queue = Queue(10) class ProducerThread(Thread): def run(self): nums = range(5) global queue while True: num = random.choice(nums) queue.put(num) print "Produced", num time.sleep(random.random()) class ConsumerThread(Thread): def run(self): global queue while True: num = queue.get() queue.task_done() print "Consumed", num time.sleep(random.random()) if __name__ == '__main__': ProducerThread().start() ConsumerThread().start()
|