京公网安备 11010802034615号
经营许可证编号:京B2-20210330
用Python多线程实现生产者消费者模式
什么是生产者消费者模式
在软件开发的过程中,经常碰到这样的场景:
某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数、线程、进程等)。产生数据的模块称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式。
结构图如下:

为了大家容易理解,我们举一个寄信的例子。假设你要寄一封信,大致过程如下:
你把信写好——相当于生产者生产数据
你把信放入邮箱——相当于生产者把数据放入缓冲区
邮递员把信从邮箱取出,做相应处理——相当于消费者把数据取出缓冲区,处理数据
生产者消费者模式的优点
解耦
假设生产者和消费者分别是两个线程。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。如果未来消费者的代码发生变化,可能会影响到生产者的代码。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
举个例子,我们去邮局投递信件,如果不使用邮箱(也就是缓冲区),你必须得把信直接交给邮递员。有同学会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须 得认识谁是邮递员,才能把信给他。这就产生了你和邮递员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员 换人了,你还要重新认识一下(相当于消费者变化导致修改生产者代码)。而邮箱相对来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱耦合)。
并发
由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区通信的,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。
继续上面的例子,如果我们不使用邮箱,就得在邮局等邮递员,直到他回来,把信件交给他,这期间我们啥事儿都不能干(也就是生产者阻塞)。或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。
支持忙闲不均
当生产者制造数据快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中,慢慢处理掉。而不至于因为消费者的性能造成数据丢失或影响生产者生产。
我们再拿寄信的例子,假设邮递员一次只能带走1000封信,万一碰上情人节(或是圣诞节)送贺卡,需要寄出去的信超过了1000封,这时候邮箱这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮箱中,等下次过来时再拿走。
通过上面的介绍大家应该已经明白了生产者消费者模式。
Python中的多线程编程
在实现生产者消费者模式之前,我们先学习下Python中的多线程编程。
线程是操作系统直接支持的执行单元,高级语言通常都内置多线程的支持,Python也不例外,并且Python的线程是真正的Posix Thread,而不是模拟出来的线程。
Python的标准库提供了两个模块:_thread和threading,_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。
下面我们先看一段在Python中实现多线程的代码。
import time,threading
#线程代码
class TaskThread(threading.Thread):
def __init__(self,name):
threading.Thread.__init__(self,name=name)
def run(self):
print('thread %s is running...' % self.getName())
for i in range(6):
print('thread %s >>> %s' % (self.getName(), i))
time.sleep(1)
print('thread %s finished.' % self.getName())
taskthread = TaskThread('TaskThread')
taskthread.start()
taskthread.join()
下面是程序的执行结果:
thread TaskThread is running...
thread TaskThread >>> 0
thread TaskThread >>> 1
thread TaskThread >>> 2
thread TaskThread >>> 3
thread TaskThread >>> 4
thread TaskThread >>> 5
thread TaskThread finished.
TaskThread类继承自threading模块中的Thread线程类。构造函数的name参数指定线程的名字,通过重载基类run函数实现具体任务。
在简单熟悉了Python的线程后,下面我们实现一个生产者消费者模式。
from Queue import Queue
import random,threading,time
#生产者类
class Producer(threading.Thread):
def __init__(self, name,queue):
threading.Thread.__init__(self, name=name)
self.data=queue
def run(self):
for i in range(5):
print("%s is producing %d to the queue!" % (self.getName(), i))
self.data.put(i)
time.sleep(random.randrange(10)/5)
print("%s finished!" % self.getName())
#消费者类
class Consumer(threading.Thread):
def __init__(self,name,queue):
threading.Thread.__init__(self,name=name)
self.data=queue
def run(self):
for i in range(5):
val = self.data.get()
print("%s is consuming. %d in the queue is consumed!" % (self.getName(),val))
time.sleep(random.randrange(10))
print("%s finished!" % self.getName())
def main():
queue = Queue()
producer = Producer('Producer',queue)
consumer = Consumer('Consumer',queue)
producer.start()
consumer.start()
producer.join()
consumer.join()
print 'All threads finished!'
if __name__ == '__main__':
main()
执行结果可能如下:
Producer is producing 0 to the queue!
Consumer is consuming. 0 in the queue is consumed!
Producer is producing 1 to the queue!
Producer is producing 2 to the queue!
Consumer is consuming. 1 in the queue is consumed!
Consumer is consuming. 2 in the queue is consumed!
Producer is producing 3 to the queue!
Producer is producing 4 to the queue!
Producer finished!
Consumer is consuming. 3 in the queue is consumed!
Consumer is consuming. 4 in the queue is consumed!
Consumer finished!
All threads finished!
因为多线程是抢占式执行的,所以打印出的运行结果不一定和上面的完全一致。
小结
本例通过Python实现了一个简单的生产者消费者模型。Python中的Queue模块已经提供了对线程同步的支持,所以本文并没有涉及锁、同步、死锁等多线程问题。
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
【核心关键词】大数据、可视化、存储、架构、客户、离线、产品、同步、实时、数据仓库、数据分析、数据可视化、存储数据、离线 ...
2026-05-21在电商流量红利消退、公域获客成本持续走高的当下,存量用户深度挖掘已成为店铺增收增效的核心抓手。相较于付费投放获取的陌生新 ...
2026-05-21 很多数据分析师每天盯着几十个指标,但当被问到“这套指标要支撑什么业务目标”“指标之间是什么逻辑关系”“业务变化时如何 ...
2026-05-21在数据驱动决策的时代,数据质量直接决定分析结果的可靠性与准确性,而异常值作为数据清洗中的核心痛点,往往会扭曲分析结论、误 ...
2026-05-20 很多数据分析师每天盯着GMV、DAU、转化率,但当被问到“哪些指标在所有行业都适用”“哪些指标只对电商有意义”“二者如何搭 ...
2026-05-20Agent的能力边界,很大程度上取决于其掌握的Skill质量和数量。传统做法是靠人工编写和维护Skill,但这条路很快会遇到瓶颈。业务 ...
2026-05-20在统计分析中,方差分析(ANOVA)是一种常用的假设检验方法,核心用于分析“一个或多个自变量对单个因变量的影响”,广泛应用于 ...
2026-05-19 很多数据分析师每天盯着GMV、DAU、转化率,但当被问到“什么是指标”“指标和维度有什么区别”“如何定义指标值的计算规则和 ...
2026-05-19想高效备考 CDA 一级,拒绝盲目刷题、冗余学习?《CDA 一级教材知识手册》重磅来袭!以官方教材为核心,浓缩 13 章 103 个核心考 ...
2026-05-19在数据统计分析中,卡方检验是一种常用的非参数检验方法,核心用于判断两个或多个分类变量之间是否存在显著关联,广泛应用于市场 ...
2026-05-18在企业数字化转型的浪潮中,很多企业陷入了“技术堆砌”的误区——上线了ERP、CRM、BI等各类系统,积累了海量数据,却依然面临“ ...
2026-05-18小陈是某电商平台的数据分析师。老板交给他一个任务:“我们平台的注册用户已经突破1000万了,想了解一下用户的平均月消费金额。 ...
2026-05-18【专访摘要】本次CDA持证专访邀请到拥有丰富物流供应链数据分析经验的赖尧,他结合自身在京东、华莱士、兰格赛等企业的从业经历 ...
2026-05-15在数字化时代,企业的每一次业务优化、每一项技术迭代,都需要回答一个核心问题:这个动作到底能带来多少价值?是提升了用户转化 ...
2026-05-15在数据仓库建设中,事实表与维度表是两大核心组件,二者相互关联、缺一不可,共同构成数据仓库的基础架构。事实表聚焦“发生了什 ...
2026-05-15 很多数据分析师沉迷于复杂的机器学习算法,却忽略了数据分析最基础也最核心的能力——描述性统计。事实上,80%的商业分析问 ...
2026-05-15【核心关键词】互联网、机会、运营、关键词、账户、数字化、后台、客户、成本、网络、数据分析、底层逻辑、市场推广、数据反馈 ...
2026-05-14在Python数据分析中,Pandas作为核心工具库,凭借简洁高效的数据处理能力,成为数据分析从业者的必备技能。其中,基于两列(或多 ...
2026-05-14 很多人把统计学理解为“一堆公式和计算”,却忽略了它的本质——一门让数据“开口说话”的科学。真正的数据分析高手,不是会 ...
2026-05-14在零售行业存量竞争日趋激烈的当下,客户流失已成为侵蚀企业利润的“隐形杀手”——据行业数据显示,零售企业平均客户流失率高达 ...
2026-05-13