京公网安备 11010802034615号
经营许可证编号:京B2-20210330
基于python yield机制的异步操作同步化编程模型
本文总结下如何在编写python代码时对异步操作进行同步化模拟,从而提高代码的可读性和可扩展性。
游戏引擎一般都采用分布式框架,通过一定的策略来均衡服务器集群的资源负载,从而保证服务器运算的高并发性和CPU高利用率,最终提高游戏的性能和负载。由于引擎的逻辑层调用是非抢占式的,服务器之间都是通过异步调用来进行通讯,导致游戏逻辑无法同步执行,所以在代码层不得不人为地添加很多回调函数,使一个原本完整的功能碎片化地分布在各个回调函数中。
异步逻辑
以游戏中的副本评分逻辑为例,在副本结束时副本管理进程需要收集副本中每个玩家的战斗信息,再结合管理进程内部的统计信息最终给出一个副本评分,发放相应奖励。因为每个玩家实体都随机分布在不同进程中,所以管理进程需要通过异步调用来获取玩家身上的战斗信息。
实现代码如下所示:
# -*- coding: gbk -*-
import random
# 玩家实体类
class Player(object):
def __init__(self, entityId):
super(Player, self).__init__()
# 玩家标识
self.entityId = entityId
def onFubenEnd(self, mailBox):
score = random.randint(1, 10)
print "onFubenEnd player %d score %d"%(self.entityId, score)
# 向副本管理进程发送自己的id和战斗信息
mailBox.onEvalFubenScore(self.entityId, score)
# 副本管理类
class FubenStub(object):
def __init__(self, players):
super(FubenStub, self).__init__()
self.players = players
def evalFubenScore(self):
self.playerRelayCnt = 0
self.totalScore = 0
# 通知每个注册的玩家,副本已经结束,索取战斗信息
for player in self.players:
player.onFubenEnd(self)
def onEvalFubenScore(self, entityId, score):
# 收到其中一个玩家的战斗信息
print "onEvalFubenScore player %d score %d"%(entityId, score)
self.playerRelayCnt += 1
self.totalScore += score
# 当收集完所有玩家的信息后,打印评分
if len(self.players) == self.playerRelayCnt:
print 'The fuben totalScore is %d'%self.totalScore
if __name__ == '__main__':
# 模拟创建玩家实体
players = [Player(i) for i in xrange(3)]
# 副本开始时,每个玩家将自己的MailBox注册到副本管理进程
fs = FubenStub(players)
# 副本进行中
# ....
# 副本结束,开始评分
fs.evalFubenScore()
代码简化了副本评分逻辑的实现,其中Player类表示游戏的玩家实体,在游戏运行时无缝地在不同服务器中切换,FubenStub表示副本的管理进程,在副本刚开始的时候该副本内所有玩家会将自己的MailBox注册到管理进程中,其中MailBox表示各个实体的远程调用句柄。在副本结束时,FubenStub首先向各个玩家发送副本结束消息,同时请求玩家的战斗信息,玩家在得到消息后,将自己的战斗信息发送给FubenStub;然后当FubenStub收集完所有玩家的信息后,最终打印副本评分。
同步逻辑
如果Player和FubenStub在同一进程中的话,那所有的操作都可以同步完成,在FubenStub向玩家发送副本结束消息的同时可以马上得到该玩家的战斗信息,实现代码如下所示:
从以上两份代码可以看到由于异步操作,FubenStub中的评分逻辑人为地分成两个功能点:1)向玩家发送副本结束消息;2)接受玩家的战斗信息;并且两个功能点分布在两个不同的函数中。如果游戏逻辑一旦复杂,势必会造成功能点分散,出现过多onXXX异步回调函数,最终导致代码的开发成本和维护成本提高,可读性和可扩展性下降。
如果有一种方法,可以让函数在异步调用时暂时挂起,并且在回调函数得到返回值后恢复执行,那么就可以用同步化的编程模式开发异步逻辑。
yield 关键字
yield 是 Python中的一个关键字,凡是函数体中出现了 yield 关键字, Python将改变整个函数的上下文,调用该函数不再返回值, 而是一个生成器对象。只有调用这个生成器的迭代函数next才能开始执行生成器对象,当生成器对象执行到包含 yield 表达式时, 函数将暂时挂起,等待下一次next调用来恢复执行,具体机制如下:
1)调用生成器对象的next方法,启动函数执行;
2)当生成器对象执行到包含 yield 表达式时, 函数挂起;
3)下一次 next 函数调用又会驱动该生成器对象继续执行此后的语句, 直到遇见下一个 yield 再次挂起;
4)如果某次 next 调用驱动了生成器继续执行, 而此后函数正常结束,生成器会抛出 StopIteration 异常;
如下代码所示:
def f():
print "Before first yield"
yield 1
print "Before second yield"
yield 2
print "After second yield"
g = f()
print "Before first next"
g.next()
print "Before second next"
g.next()
print "Before third yield"
g.next()
执行结果为:
Before first next
Before first yield
Before second next
Before second yield
Before third yield
After second yield
StopIteration
哈,有了让函数暂时挂起的机制,最后就剩下如何传递异步调用的返回值问题了。其实生成器的next函数已经实现了将参数从生成器对象内部向外传递的机制,并且python还提供了一个send函数将参数从外向生成器对象内部传递的机制,具体机制如下:
1) 调用next 函数驱动生成器时, next会同时等待生成器中下一个 yield 挂起,并将该yield后面的参数返回给next;
2)往生成器中传递参数,需要将next函数替换成send,此时send的功能与next相同(驱动生成器执行,等待返回值),同时send将后面的参数传递给生成器内部之前挂起的yield;
如下代码所示:
def f():
msg = yield 'first yield msg'
print "generator inner receive:", msg
msg = yield 'second yield msg'
print "generator inner receive:", msg
g = f()
msg = g.next()
print "generator outer receive:", msg
msg = g.send('first send msg')
print "generator outer receive:", msg
g.send('second send msg')
执行结果为:
generator outer receive: first yield msg
generator inner receive: first send msg
generator outer receive: second yield msg
generator inner receive: second send msg
StopIteration
同步化实现
好了,万事俱备只欠东风,下面就是简单对yield机制进行工程上封装以方便之后开发。下面的代码提供了一个叫IFakeSyncCall的interface,所有包含异步操作的逻辑类都可以继承这个接口:
class IFakeSyncCall(object):
def __init__(self):
super(IFakeSyncCall, self).__init__()
self.generators = {}
@staticmethod
def FAKE_SYNCALL():
def fwrap(method):
def fakeSyncCall(instance, *args, **kwargs):
instance.generators[method.__name__] = method(instance, *args, **kwargs)
func, args = instance.generators[method.__name__].next()
func(*args)
return fakeSyncCall
return fwrap
def onFakeSyncCall(self, identify, result):
try:
func, args = self.generators[identify].send(result)
func(*args)
except StopIteration:
self.generators.pop(identify)
其中interface中属性generators用来保存类中已经开始执行的生成器对象;函数FAKE_SYNCALL是一个decorator,装饰类中包含有yield的函数,改变函数的调用上下文,在fakeSyncCall内部封装了对生成器对象的next调用;函数onFakeSyncCall封装了所有onXXX函数的逻辑,其他实体通过调用这个函数传递异步回调的返回值。
下面就是经过同步化改进后的异步副本评分逻辑代码:
# -*- coding: gbk -*-
import random
class Player(object):
def __init__(self, entityId):
super(Player, self).__init__()
self.entityId = entityId
def onFubenEnd(self, mailBox):
score = random.randint(1, 10)
print "onFubenEnd player %d score %d"%(self.entityId, score)
mailBox.onFakeSyncCall('evalFubenScore', (self.entityId, score))
class FubenStub(IFakeSyncCall):
def __init__(self, players):
super(FubenStub, self).__init__()
self.players = players
@IFakeSyncCall.FAKE_SYNCALL()
def evalFubenScore(self):
totalScore = 0
for player in self.players:
entityId, score = yield (player.onFubenEnd, (self,))
print "onEvalFubenScore player %d score %d"%(entityId, score)
totalScore += score
print 'the totalScore is %d'%totalScore
if __name__ == '__main__':
players = [Player(i) for i in xrange(3)]
fs = FubenStub(players)
fs.evalFubenScore()
比较evalFubenScore函数,基本已经和原本的同步逻辑代码相差无几。
利用yield机制实现同步化编程模型的另外一个优点是可以保证所有异步调用的逻辑串行化,从而保证数据的一致性和有效性,特别是在各种异步初始化流程中可以摒弃传统的timer sleep机制,从源头上扼杀一些隐藏很深的由于数据不一致性所导致的bug。
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
在统计学分析、实验研究、业务数据复盘过程中,单因素方差分析是检验自变量对因变量是否存在显著影响的核心方法。其中,两个水平 ...
2026-05-26【核心关键词】算法、客户、大数据、互联网、调优、建模、模型优化、机器学习、评分卡模型、模型开发、智能风控、业务场景、数 ...
2026-05-26 很多数据分析师写过无数个 SELECT,但当被问到“新建一张表,该如何定义字段类型来保证数据质量”“创建视图和存储物理表有 ...
2026-05-26在数据清洗、统计分析与数据质量检测工作中,箱型图(又称箱线图、Box Plot)是最直观、最高效的可视化分析工具之一。相较于柱状 ...
2026-05-25在大数据分析、数据清洗、质量管控、风险监测等领域,异常数据识别是保障数据质量、确保分析结论精准、规避业务决策失误的核心基 ...
2026-05-25 很多数据分析师精通Excel函数和透视表,但当被问到“数据从哪里来”“表和视图有什么区别”“数据库管理系统和SQL是什么关系 ...
2026-05-25数字化经营时代,企业的市场竞争早已从经验决策转向数据决策。门店营收、用户转化、产品销量、成本损耗、存量资产等所有经营行为 ...
2026-05-22在MySQL数据库日常运维、业务数据校验、数据迁移与数据清洗场景中,自增主键ID的连续性校验是一项基础且关键的工作。MySQL的Auto ...
2026-05-22 很多企业团队并非缺乏指标,而是陷入“指标失控”:仪表盘上堆满实时跳动的数据,却无法回答“当前瓶颈在哪、下一步该做什么 ...
2026-05-22【核心关键词】大数据、可视化、存储、架构、客户、离线、产品、同步、实时、数据仓库、数据分析、数据可视化、存储数据、离线 ...
2026-05-21在电商流量红利消退、公域获客成本持续走高的当下,存量用户深度挖掘已成为店铺增收增效的核心抓手。相较于付费投放获取的陌生新 ...
2026-05-21 很多数据分析师每天盯着几十个指标,但当被问到“这套指标要支撑什么业务目标”“指标之间是什么逻辑关系”“业务变化时如何 ...
2026-05-21在数据驱动决策的时代,数据质量直接决定分析结果的可靠性与准确性,而异常值作为数据清洗中的核心痛点,往往会扭曲分析结论、误 ...
2026-05-20 很多数据分析师每天盯着GMV、DAU、转化率,但当被问到“哪些指标在所有行业都适用”“哪些指标只对电商有意义”“二者如何搭 ...
2026-05-20Agent的能力边界,很大程度上取决于其掌握的Skill质量和数量。传统做法是靠人工编写和维护Skill,但这条路很快会遇到瓶颈。业务 ...
2026-05-20在统计分析中,方差分析(ANOVA)是一种常用的假设检验方法,核心用于分析“一个或多个自变量对单个因变量的影响”,广泛应用于 ...
2026-05-19 很多数据分析师每天盯着GMV、DAU、转化率,但当被问到“什么是指标”“指标和维度有什么区别”“如何定义指标值的计算规则和 ...
2026-05-19想高效备考 CDA 一级,拒绝盲目刷题、冗余学习?《CDA 一级教材知识手册》重磅来袭!以官方教材为核心,浓缩 13 章 103 个核心考 ...
2026-05-19在数据统计分析中,卡方检验是一种常用的非参数检验方法,核心用于判断两个或多个分类变量之间是否存在显著关联,广泛应用于市场 ...
2026-05-18在企业数字化转型的浪潮中,很多企业陷入了“技术堆砌”的误区——上线了ERP、CRM、BI等各类系统,积累了海量数据,却依然面临“ ...
2026-05-18