京公网安备 11010802034615号
经营许可证编号:京B2-20210330
详解Python实现多进程异步事件驱动引擎
本篇文章主要介绍了详解Python实现多进程异步事件驱动引擎,小编觉得挺不错的,现在分享给大家,也给大家做个参考。
多进程异步事件驱动逻辑
逻辑
code
# -*- coding: utf-8 -*-
'''
author: Jimmy
contact: 234390130@qq.com
file: eventEngine.py
time: 2017/8/25 上午10:06
description: 多进程异步事件驱动引擎
'''
__author__ = 'Jimmy'
from multiprocessing import Process, Queue
class EventEngine(object):
# 初始化事件事件驱动引擎
def __init__(self):
#保存事件列表
self.__eventQueue = Queue()
#引擎开关
self.__active = False
#事件处理字典{'event1': [handler1,handler2] , 'event2':[handler3, ...,handler4]}
self.__handlers = {}
#保存事件处理进程池
self.__processPool = []
#事件引擎主进程
self.__mainProcess = Process(target=self.__run)
#执行事件循环
def __run(self):
while self.__active:
#事件队列非空
if not self.__eventQueue.empty():
#获取队列中的事件 超时1秒
event = self.__eventQueue.get(block=True ,timeout=1)
#执行事件
self.__process(event)
else:
# print('无任何事件')
pass
#执行事件
def __process(self, event):
if event.type in self.__handlers:
for handler in self.__handlers[event.type]:
#开一个进程去异步处理
p = Process(target=handler, args=(event, ))
#保存到进程池
self.__processPool.append(p)
p.start()
#开启事件引擎
def start(self):
self.__active = True
self.__mainProcess.start()
#暂停事件引擎
def stop(self):
"""停止"""
# 将事件管理器设为停止
self.__active = False
# 等待事件处理进程退出
for p in self.__processPool:
p.join()
self.__mainProcess.join()
#终止事件引擎
def terminate(self):
self.__active = False
#终止所有事件处理进程
for p in self.__processPool:
p.terminate()
self.__mainProcess.join()
#注册事件
def register(self, type, handler):
"""注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则创建
try:
handlerList = self.__handlers[type]
except KeyError:
handlerList = []
self.__handlers[type] = handlerList
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList:
handlerList.append(handler)
def unregister(self, type, handler):
"""注销事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
try:
handlerList = self.__handlers[type]
# 如果该函数存在于列表中,则移除
if handler in handlerList:
handlerList.remove(handler)
# 如果函数列表为空,则从引擎中移除该事件类型
if not handlerList:
del self.__handlers[type]
except KeyError:
pass
def sendEvent(self, event):
#发送事件 像队列里存入事件
self.__eventQueue.put(event)
class Event(object):
#事件对象
def __init__(self, type =None):
self.type = type
self.dict = {}
#测试
if __name__ == '__main__':
import time
EVENT_ARTICAL = "Event_Artical"
# 事件源 公众号
class PublicAccounts:
def __init__(self, eventManager):
self.__eventManager = eventManager
def writeNewArtical(self):
# 事件对象,写了新文章
event = Event(EVENT_ARTICAL)
event.dict["artical"] = u'如何写出更优雅的代码\n'
# 发送事件
self.__eventManager.sendEvent(event)
print(u'公众号发送新文章\n')
# 监听器 订阅者
class ListenerTypeOne:
def __init__(self, username):
self.__username = username
# 监听器的处理函数 读文章
def ReadArtical(self, event):
print(u'%s 收到新文章' % self.__username)
print(u'%s 正在阅读新文章内容:%s' % (self.__username, event.dict["artical"]))
class ListenerTypeTwo:
def __init__(self, username):
self.__username = username
# 监听器的处理函数 读文章
def ReadArtical(self, event):
print(u'%s 收到新文章 睡3秒再看' % self.__username)
time.sleep(3)
print(u'%s 正在阅读新文章内容:%s' % (self.__username, event.dict["artical"]))
def test():
listner1 = ListenerTypeOne("thinkroom") # 订阅者1
listner2 = ListenerTypeTwo("steve") # 订阅者2
ee = EventEngine()
# 绑定事件和监听器响应函数(新文章)
ee.register(EVENT_ARTICAL, listner1.ReadArtical)
ee.register(EVENT_ARTICAL, listner2.ReadArtical)
for i in range(0, 20):
listner3 = ListenerTypeOne("Jimmy") # 订阅者X
ee.register(EVENT_ARTICAL, listner3.ReadArtical)
ee.start()
#发送事件
publicAcc = PublicAccounts(ee)
publicAcc.writeNewArtical()
test()
以上就是本文的全部内容,希望对大家的学习有所帮助.
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
在Python文件操作场景中,批量处理文件、遍历目录树是高频需求——无论是统计某文件夹下的文件数量、筛选特定类型文件,还是批量 ...
2026-01-05在神经网络模型训练过程中,开发者最担心的问题之一,莫过于“训练误差突然增大”——前几轮还平稳下降的损失值(Loss),突然在 ...
2026-01-05在数据驱动的业务场景中,“垃圾数据进,垃圾结果出”是永恒的警示。企业收集的数据往往存在缺失、异常、重复、格式混乱等问题, ...
2026-01-05在数字化时代,用户行为数据已成为企业的核心资产之一。从用户打开APP的首次点击,到浏览页面的停留时长,再到最终的购买决策、 ...
2026-01-04在数据分析领域,数据稳定性是衡量数据质量的核心维度之一,直接决定了分析结果的可靠性与决策价值。稳定的数据能反映事物的固有 ...
2026-01-04在CDA(Certified Data Analyst)数据分析师的工作链路中,数据读取是连接原始数据与后续分析的关键桥梁。如果说数据采集是“获 ...
2026-01-04尊敬的考生: 您好! 我们诚挚通知您,CDA Level III 考试大纲将于 2025 年 12 月 31 日实施重大更新,并正式启用,2026年3月考 ...
2025-12-31“字如其人”的传统认知,让不少“手残党”在需要签名的场景中倍感尴尬——商务签约时的签名歪歪扭扭,朋友聚会的签名墙不敢落笔 ...
2025-12-31在多元统计分析的因子分析中,“得分系数”是连接原始观测指标与潜在因子的关键纽带,其核心作用是将多个相关性较高的原始指标, ...
2025-12-31对CDA(Certified Data Analyst)数据分析师而言,高质量的数据是开展后续分析、挖掘业务价值的基础,而数据采集作为数据链路的 ...
2025-12-31在中介效应分析(或路径分析)中,间接效应是衡量“自变量通过中介变量影响因变量”这一间接路径强度与方向的核心指标。不同于直 ...
2025-12-30数据透视表是数据分析中高效汇总、多维度分析数据的核心工具,能快速将杂乱数据转化为结构化的汇总报表。在实际分析场景中,我们 ...
2025-12-30在金融投资、商业运营、用户增长等数据密集型领域,量化策略凭借“数据驱动、逻辑可验证、执行标准化”的优势,成为企业提升决策 ...
2025-12-30CDA(Certified Data Analyst),是在数字经济大背景和人工智能时代趋势下,源自中国,走向世界,面向全行业的专业技能认证,旨 ...
2025-12-29在数据分析领域,周期性是时间序列数据的重要特征之一——它指数据在一定时间间隔内重复出现的规律,广泛存在于经济、金融、气象 ...
2025-12-29数据分析师的核心价值在于将海量数据转化为可落地的商业洞察,而高效的工具则是实现这一价值的关键载体。从数据采集、清洗整理, ...
2025-12-29在金融、零售、互联网等数据密集型行业,量化策略已成为企业提升决策效率、挖掘商业价值的核心工具。CDA(Certified Data Analys ...
2025-12-29CDA中国官网是全国统一的数据分析师认证报名网站,由认证考试委员会与持证人会员、企业会员以及行业知名第三方机构共同合作,致 ...
2025-12-26在数字化转型浪潮下,审计行业正经历从“传统手工审计”向“大数据智能审计”的深刻变革。教育部发布的《大数据与审计专业教学标 ...
2025-12-26统计学作为数学的重要分支,是连接数据与决策的桥梁。随着数据规模的爆炸式增长和复杂问题的涌现,传统统计方法已难以应对高维、 ...
2025-12-26