京公网安备 11010802034615号
经营许可证编号:京B2-20210330
详解Python实现多进程异步事件驱动引擎
本篇文章主要介绍了详解Python实现多进程异步事件驱动引擎,小编觉得挺不错的,现在分享给大家,也给大家做个参考。
多进程异步事件驱动逻辑
逻辑
code
# -*- coding: utf-8 -*-
'''
author: Jimmy
contact: 234390130@qq.com
file: eventEngine.py
time: 2017/8/25 上午10:06
description: 多进程异步事件驱动引擎
'''
__author__ = 'Jimmy'
from multiprocessing import Process, Queue
class EventEngine(object):
# 初始化事件事件驱动引擎
def __init__(self):
#保存事件列表
self.__eventQueue = Queue()
#引擎开关
self.__active = False
#事件处理字典{'event1': [handler1,handler2] , 'event2':[handler3, ...,handler4]}
self.__handlers = {}
#保存事件处理进程池
self.__processPool = []
#事件引擎主进程
self.__mainProcess = Process(target=self.__run)
#执行事件循环
def __run(self):
while self.__active:
#事件队列非空
if not self.__eventQueue.empty():
#获取队列中的事件 超时1秒
event = self.__eventQueue.get(block=True ,timeout=1)
#执行事件
self.__process(event)
else:
# print('无任何事件')
pass
#执行事件
def __process(self, event):
if event.type in self.__handlers:
for handler in self.__handlers[event.type]:
#开一个进程去异步处理
p = Process(target=handler, args=(event, ))
#保存到进程池
self.__processPool.append(p)
p.start()
#开启事件引擎
def start(self):
self.__active = True
self.__mainProcess.start()
#暂停事件引擎
def stop(self):
"""停止"""
# 将事件管理器设为停止
self.__active = False
# 等待事件处理进程退出
for p in self.__processPool:
p.join()
self.__mainProcess.join()
#终止事件引擎
def terminate(self):
self.__active = False
#终止所有事件处理进程
for p in self.__processPool:
p.terminate()
self.__mainProcess.join()
#注册事件
def register(self, type, handler):
"""注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则创建
try:
handlerList = self.__handlers[type]
except KeyError:
handlerList = []
self.__handlers[type] = handlerList
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList:
handlerList.append(handler)
def unregister(self, type, handler):
"""注销事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
try:
handlerList = self.__handlers[type]
# 如果该函数存在于列表中,则移除
if handler in handlerList:
handlerList.remove(handler)
# 如果函数列表为空,则从引擎中移除该事件类型
if not handlerList:
del self.__handlers[type]
except KeyError:
pass
def sendEvent(self, event):
#发送事件 像队列里存入事件
self.__eventQueue.put(event)
class Event(object):
#事件对象
def __init__(self, type =None):
self.type = type
self.dict = {}
#测试
if __name__ == '__main__':
import time
EVENT_ARTICAL = "Event_Artical"
# 事件源 公众号
class PublicAccounts:
def __init__(self, eventManager):
self.__eventManager = eventManager
def writeNewArtical(self):
# 事件对象,写了新文章
event = Event(EVENT_ARTICAL)
event.dict["artical"] = u'如何写出更优雅的代码\n'
# 发送事件
self.__eventManager.sendEvent(event)
print(u'公众号发送新文章\n')
# 监听器 订阅者
class ListenerTypeOne:
def __init__(self, username):
self.__username = username
# 监听器的处理函数 读文章
def ReadArtical(self, event):
print(u'%s 收到新文章' % self.__username)
print(u'%s 正在阅读新文章内容:%s' % (self.__username, event.dict["artical"]))
class ListenerTypeTwo:
def __init__(self, username):
self.__username = username
# 监听器的处理函数 读文章
def ReadArtical(self, event):
print(u'%s 收到新文章 睡3秒再看' % self.__username)
time.sleep(3)
print(u'%s 正在阅读新文章内容:%s' % (self.__username, event.dict["artical"]))
def test():
listner1 = ListenerTypeOne("thinkroom") # 订阅者1
listner2 = ListenerTypeTwo("steve") # 订阅者2
ee = EventEngine()
# 绑定事件和监听器响应函数(新文章)
ee.register(EVENT_ARTICAL, listner1.ReadArtical)
ee.register(EVENT_ARTICAL, listner2.ReadArtical)
for i in range(0, 20):
listner3 = ListenerTypeOne("Jimmy") # 订阅者X
ee.register(EVENT_ARTICAL, listner3.ReadArtical)
ee.start()
#发送事件
publicAcc = PublicAccounts(ee)
publicAcc.writeNewArtical()
test()
以上就是本文的全部内容,希望对大家的学习有所帮助.
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
在数据分析、机器学习的实操场景中,聚类分析与主成分分析(PCA)是两种高频使用的统计与数据处理方法。二者常被用于数据预处理 ...
2026-02-24在聚类分析的实操场景中,K-Means算法因其简单高效、易落地的特点,成为处理无监督分类问题的首选工具——无论是用户画像分层、 ...
2026-02-24数字化浪潮下,数据已成为企业核心竞争力,“用数据说话、用数据决策”成为企业发展的核心逻辑。CDA(Certified Data Analyst) ...
2026-02-24CDA一级知识点汇总手册 第五章 业务数据的特征、处理与透视分析考点52:业务数据分析基础考点53:输入和资源需求考点54:业务数 ...
2026-02-23CDA一级知识点汇总手册 第四章 战略与业务数据分析考点43:战略数据分析基础考点44:表格结构数据的使用考点45:输入数据和资源 ...
2026-02-22CDA一级知识点汇总手册 第三章 商业数据分析框架考点27:商业数据分析体系的核心逻辑——BSC五视角框架考点28:战略视角考点29: ...
2026-02-20CDA一级知识点汇总手册 第二章 数据分析方法考点7:基础范式的核心逻辑(本体论与流程化)考点8:分类分析(本体论核心应用)考 ...
2026-02-18第一章:数据分析思维考点1:UVCA时代的特点考点2:数据分析背后的逻辑思维方法论考点3:流程化企业的数据分析需求考点4:企业数 ...
2026-02-16在数据分析、业务决策、科学研究等领域,统计模型是连接原始数据与业务价值的核心工具——它通过对数据的规律提炼、变量关联分析 ...
2026-02-14在SQL查询实操中,SELECT * 与 SELECT 字段1, 字段2,...(指定个别字段)是最常用的两种查询方式。很多开发者在日常开发中,为了 ...
2026-02-14对CDA(Certified Data Analyst)数据分析师而言,数据分析的核心不是孤立解读单个指标数值,而是构建一套科学、完整、贴合业务 ...
2026-02-14在Power BI实操中,函数是实现数据清洗、建模计算、可视化呈现的核心工具——无论是简单的数据筛选、异常值处理,还是复杂的度量 ...
2026-02-13在互联网运营、产品迭代、用户增长等工作中,“留存率”是衡量产品核心价值、用户粘性的核心指标——而次日留存率,作为留存率体 ...
2026-02-13对CDA(Certified Data Analyst)数据分析师而言,指标是贯穿工作全流程的核心载体,更是连接原始数据与业务洞察的关键桥梁。CDA ...
2026-02-13在机器学习建模实操中,“特征选择”是提升模型性能、简化模型复杂度、解读数据逻辑的核心步骤——而随机森林(Random Forest) ...
2026-02-12在MySQL数据查询实操中,按日期分组统计是高频需求——比如统计每日用户登录量、每日订单量、每日销售额,需要按日期分组展示, ...
2026-02-12对CDA(Certified Data Analyst)数据分析师而言,描述性统计是贯穿实操全流程的核心基础,更是从“原始数据”到“初步洞察”的 ...
2026-02-12备考CDA的小伙伴,专属宠粉福利来啦! 不用拼运气抽奖,不用复杂操作,只要转发CDA真题海报到朋友圈集赞,就能免费抱走实用好礼 ...
2026-02-11在数据科学、机器学习实操中,Anaconda是必备工具——它集成了Python解释器、conda包管理器,能快速搭建独立的虚拟环境,便捷安 ...
2026-02-11在Tableau数据可视化实操中,多表连接是高频操作——无论是将“产品表”与“销量表”连接分析产品销量,还是将“用户表”与“消 ...
2026-02-11