事件驱动的Python实现

Java编程精选

点击右侧关注,免费入门到精通!

作者丨码匠信龙

https://www.jianshu.com/p/a605fab0ab11

EventManager事件管理类实现,大概就百来行代码左右。如果有不了事件驱动的工作原理的可以看前一篇《事件驱动的简明讲解》https://www.jianshu.com/p/2290bfbd75dd

# encoding: UTF-8# 系统模块fromQueueimportQueue, Emptyfromthreadingimport*########################################################################classEventManager:#----------------------------------------------------------------------def__init__(self):"""初始化事件管理器"""# 事件对象列表self.__eventQueue = Queue()# 事件管理器开关self.__active =False# 事件处理线程self.__thread = Thread(target = self.__Run)# 这里的__handlers是一个字典,用来保存对应的事件的响应函数# 其中每个键对应的值是一个列表,列表中保存了对该事件监听的响应函数,一对多self.__handlers = {}#----------------------------------------------------------------------def__Run(self):"""引擎运行"""whileself.__active ==True:try:# 获取事件的阻塞时间设为1秒event = self.__eventQueue.get(block =True, timeout =1)self.__EventProcess(event)exceptEmpty:pass#----------------------------------------------------------------------def__EventProcess(self, event):"""处理事件"""# 检查是否存在对该事件进行监听的处理函数ifevent.type_inself.__handlers:# 若存在,则按顺序将事件传递给处理函数执行forhandlerinself.__handlers[event.type_]:handler(event)#----------------------------------------------------------------------defStart(self):"""启动"""# 将事件管理器设为启动self.__active =True# 启动事件处理线程self.__thread.start()#----------------------------------------------------------------------defStop(self):"""停止"""# 将事件管理器设为停止self.__active =False# 等待事件处理线程退出self.__thread.join()#----------------------------------------------------------------------defAddEventListener(self, type_, handler):"""绑定事件和监听器处理函数"""# 尝试获取该事件类型对应的处理函数列表,若无则创建try:handlerList = self.__handlers[type_]exceptKeyError:handlerList = []self.__handlers[type_] = handlerList# 若要注册的处理器不在该事件的处理器列表中,则注册该事件ifhandlernotinhandlerList:handlerList.append(handler)#----------------------------------------------------------------------defRemoveEventListener(self, type_, handler):"""移除监听器的处理函数"""#读者自己试着实现#----------------------------------------------------------------------defSendEvent(self, event):"""发送事件,向事件队列中存入事件"""self.__eventQueue.put(event)########################################################################"""事件对象"""classEvent:def__init__(self, type_=None):self.type_ = type_# 事件类型self.dict = {}# 字典用于保存具体的事件数据

测试代码

测试代码#-------------------------------------------------------------------# encoding: UTF-8importsysfromdatetimeimportdatetimefromthreadingimport*fromEventManagerimport*#事件名称 新文章EVENT_ARTICAL ="Event_Artical"#事件源 公众号classPublicAccounts:def__init__(self,eventManager):self.__eventManager = eventManagerdefWriteNewArtical(self):#事件对象,写了新文章event = Event(type_=EVENT_ARTICAL)event.dict["artical"] =u'如何写出更优雅的代码\n'#发送事件self.__eventManager.SendEvent(event)printu'公众号发送新文章\n'#监听器 订阅者classListener:def__init__(self,username):self.__username = username#监听器的处理函数 读文章defReadArtical(self,event):print(u'%s 收到新文章'% self.__username)print(u'正在阅读新文章内容:%s'% event.dict["artical"])"""测试函数"""#--------------------------------------------------------------------deftest():listner1 = Listener("thinkroom")#订阅者1listner2 = Listener("steve")#订阅者2eventManager = EventManager()#绑定事件和监听器响应函数(新文章)eventManager.AddEventListener(EVENT_ARTICAL, listner1.ReadArtical)eventManager.AddEventListener(EVENT_ARTICAL, listner2.ReadArtical)eventManager.Start()publicAcc = PublicAccounts(eventManager)timer = Timer(2, publicAcc.WriteNewArtical)timer.start()if__name__ =='__main__':test()

  • 发表于:
  • 原文链接:https://kuaibao.qq.com/s/20181012B0L6YH00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券