京公网安备 11010802034615号
经营许可证编号:京B2-20210330
详解Python实现多进程异步事件驱动引擎
本篇文章主要介绍了详解Python实现多进程异步事件驱动引擎,小编觉得挺不错的,现在分享给大家,也给大家做个参考。
多进程异步事件驱动逻辑
逻辑
code
# -*- coding: utf-8 -*-
'''
author: Jimmy
contact: 234390130@qq.com
file: eventEngine.py
time: 2017/8/25 上午10:06
description: 多进程异步事件驱动引擎
'''
__author__ = 'Jimmy'
from multiprocessing import Process, Queue
class EventEngine(object):
# 初始化事件事件驱动引擎
def __init__(self):
#保存事件列表
self.__eventQueue = Queue()
#引擎开关
self.__active = False
#事件处理字典{'event1': [handler1,handler2] , 'event2':[handler3, ...,handler4]}
self.__handlers = {}
#保存事件处理进程池
self.__processPool = []
#事件引擎主进程
self.__mainProcess = Process(target=self.__run)
#执行事件循环
def __run(self):
while self.__active:
#事件队列非空
if not self.__eventQueue.empty():
#获取队列中的事件 超时1秒
event = self.__eventQueue.get(block=True ,timeout=1)
#执行事件
self.__process(event)
else:
# print('无任何事件')
pass
#执行事件
def __process(self, event):
if event.type in self.__handlers:
for handler in self.__handlers[event.type]:
#开一个进程去异步处理
p = Process(target=handler, args=(event, ))
#保存到进程池
self.__processPool.append(p)
p.start()
#开启事件引擎
def start(self):
self.__active = True
self.__mainProcess.start()
#暂停事件引擎
def stop(self):
"""停止"""
# 将事件管理器设为停止
self.__active = False
# 等待事件处理进程退出
for p in self.__processPool:
p.join()
self.__mainProcess.join()
#终止事件引擎
def terminate(self):
self.__active = False
#终止所有事件处理进程
for p in self.__processPool:
p.terminate()
self.__mainProcess.join()
#注册事件
def register(self, type, handler):
"""注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则创建
try:
handlerList = self.__handlers[type]
except KeyError:
handlerList = []
self.__handlers[type] = handlerList
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList:
handlerList.append(handler)
def unregister(self, type, handler):
"""注销事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
try:
handlerList = self.__handlers[type]
# 如果该函数存在于列表中,则移除
if handler in handlerList:
handlerList.remove(handler)
# 如果函数列表为空,则从引擎中移除该事件类型
if not handlerList:
del self.__handlers[type]
except KeyError:
pass
def sendEvent(self, event):
#发送事件 像队列里存入事件
self.__eventQueue.put(event)
class Event(object):
#事件对象
def __init__(self, type =None):
self.type = type
self.dict = {}
#测试
if __name__ == '__main__':
import time
EVENT_ARTICAL = "Event_Artical"
# 事件源 公众号
class PublicAccounts:
def __init__(self, eventManager):
self.__eventManager = eventManager
def writeNewArtical(self):
# 事件对象,写了新文章
event = Event(EVENT_ARTICAL)
event.dict["artical"] = u'如何写出更优雅的代码\n'
# 发送事件
self.__eventManager.sendEvent(event)
print(u'公众号发送新文章\n')
# 监听器 订阅者
class ListenerTypeOne:
def __init__(self, username):
self.__username = username
# 监听器的处理函数 读文章
def ReadArtical(self, event):
print(u'%s 收到新文章' % self.__username)
print(u'%s 正在阅读新文章内容:%s' % (self.__username, event.dict["artical"]))
class ListenerTypeTwo:
def __init__(self, username):
self.__username = username
# 监听器的处理函数 读文章
def ReadArtical(self, event):
print(u'%s 收到新文章 睡3秒再看' % self.__username)
time.sleep(3)
print(u'%s 正在阅读新文章内容:%s' % (self.__username, event.dict["artical"]))
def test():
listner1 = ListenerTypeOne("thinkroom") # 订阅者1
listner2 = ListenerTypeTwo("steve") # 订阅者2
ee = EventEngine()
# 绑定事件和监听器响应函数(新文章)
ee.register(EVENT_ARTICAL, listner1.ReadArtical)
ee.register(EVENT_ARTICAL, listner2.ReadArtical)
for i in range(0, 20):
listner3 = ListenerTypeOne("Jimmy") # 订阅者X
ee.register(EVENT_ARTICAL, listner3.ReadArtical)
ee.start()
#发送事件
publicAcc = PublicAccounts(ee)
publicAcc.writeNewArtical()
test()
以上就是本文的全部内容,希望对大家的学习有所帮助.
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
在企业数字化转型过程中,“业务模型”与“数据模型”常被同时提及,却也频繁被混淆——业务团队口中的“用户增长模型”聚焦“如 ...
2025-11-20在游戏行业“高获客成本、低留存率”的痛点下,“提前预测用户流失并精准召回”成为运营核心命题。而用户流失并非突发行为——从 ...
2025-11-20在商业数据分析领域,“懂理论、会工具”只是入门门槛,真正的核心竞争力在于“实践落地能力”——很多分析师能写出规范的SQL、 ...
2025-11-20在数据可视化领域,树状图(Tree Diagram)是呈现层级结构数据的核心工具——无论是电商商品分类、企业组织架构,还是数据挖掘中 ...
2025-11-17核心结论:“分析前一天浏览与第二天下单的概率提升”属于数据挖掘中的关联规则挖掘(含序列模式挖掘) 技术——它聚焦“时间序 ...
2025-11-17在数据驱动成为企业核心竞争力的今天,很多企业陷入“数据多但用不好”的困境:营销部门要做用户转化分析却拿不到精准数据,运营 ...
2025-11-17在使用Excel透视表进行数据汇总分析时,我们常遇到“需通过两个字段相乘得到关键指标”的场景——比如“单价×数量=金额”“销量 ...
2025-11-14在测试环境搭建、数据验证等场景中,经常需要将UAT(用户验收测试)环境的表数据同步到SIT(系统集成测试)环境,且两者表结构完 ...
2025-11-14在数据驱动的企业中,常有这样的困境:分析师提交的“万字数据报告”被束之高阁,而一张简洁的“复购率趋势图+核心策略标注”却 ...
2025-11-14在实证研究中,层次回归分析是探究“不同变量组对因变量的增量解释力”的核心方法——通过分步骤引入自变量(如先引入人口统计学 ...
2025-11-13在实时数据分析、实时业务监控等场景中,“数据新鲜度”直接决定业务价值——当电商平台需要实时统计秒杀订单量、金融系统需要实 ...
2025-11-13在数据量爆炸式增长的今天,企业对数据分析的需求已从“有没有”升级为“好不好”——不少团队陷入“数据堆砌却无洞察”“分析结 ...
2025-11-13在主成分分析(PCA)、因子分析等降维方法中,“成分得分系数矩阵” 与 “载荷矩阵” 是两个高频出现但极易混淆的核心矩阵 —— ...
2025-11-12大数据早已不是单纯的技术概念,而是渗透各行业的核心生产力。但同样是拥抱大数据,零售企业的推荐系统、制造企业的设备维护、金 ...
2025-11-12在数据驱动的时代,“数据分析” 已成为企业决策的核心支撑,但很多人对其认知仍停留在 “用 Excel 做报表”“写 SQL 查数据” ...
2025-11-12金融统计不是单纯的 “数据计算”,而是贯穿金融业务全流程的 “风险量化工具”—— 从信贷审批中的客户风险评估,到投资组合的 ...
2025-11-11这个问题很有实战价值,mtcars 数据集是多元线性回归的经典案例,通过它能清晰展现 “多变量影响分析” 的核心逻辑。核心结论是 ...
2025-11-11在数据驱动成为企业核心竞争力的今天,“不知道要什么数据”“分析结果用不上” 是企业的普遍困境 —— 业务部门说 “要提升销量 ...
2025-11-11在大模型(如 Transformer、CNN、多层感知机)的结构设计中,“每层神经元个数” 是决定模型性能与效率的关键参数 —— 个数过少 ...
2025-11-10形成购买决策的四个核心推动力的是:内在需求驱动、产品价值感知、社会环境影响、场景便捷性—— 它们从 “为什么买”“值得买吗 ...
2025-11-10