专栏首页大数据入坑指南Python自学成才之路 分布式计算解决方案actor

Python自学成才之路 分布式计算解决方案actor

以下内容来自于cookbook,个人觉得这篇文章对于设计分布式计算任务有一定的借鉴意义,感兴趣的同学可以阅读原文: https://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p10_defining_an_actor_task.html

actor模式是一种最古老的也是最简单的并行和分布式计算解决方案。一个actor就是一个并发执行的任务,只是简单的执行发送给它的消息任务。actor之间的通信是单向和异步的。因此,消息发送者不知道消息是什么时候被发送, 也不会接收到一个消息已被处理的回应或通知。

使用线程加队列可以定义一个actor:

from queue import Queue
from threading import Thread, Event
import time

class ActorExit(Exception):
    pass

class Actor:
    def __init__(self):
        self._mailbox = Queue()

    def send(self, msg):
        '''
        发送消息 
        '''
        self._mailbox.put(msg)

    def recv(self):
        '''
        接受消息
        '''
        msg = self._mailbox.get()
        if msg is ActorExit:
            raise ActorExit()
        return msg

    def close(self):
        '''
        关闭actor
        '''
        self.send(ActorExit)

    def start(self):
        '''
        启动actor
        '''
        self._terminated = Event()
        t = Thread(target=self._bootstrap)

        t.daemon = True
        t.start()

    def _bootstrap(self):
        try:
            self.run()
        except ActorExit:
            pass
        finally:
            self._terminated.set()

    def join(self):
        self._terminated.wait()

    def run(self):
        '''
        消费者线程的run方法
        '''
        while True:
            msg = self.recv()

class PrintActor(Actor):
    def run(self):
        while True:
            msg = self.recv()
            print('Got:', msg)

p = PrintActor()
p.start()
time.sleep(1)
p.send('Hello')
time.sleep(2)
p.send('World')
time.sleep(1)
p.close()
p.join()

输出:
Got: Hello
Got: World

这个案例是这样的,先定义了一个actor,然后通过继承actor定义了PrintActor,并重写了其中的run方法。

这个actor中的每个方法起到什么作用,并如何实现的? Start:创建self._terminated信号(关于event我前面的文章有讲解),定义一个线程,将现场设置为守护线程,并启动线程。 _bootstrap:线程启动后会执行这个方法,并启动里面的run方法。 Run:这里的run方法被子类重写了,子类通过recv不断的获取队列中的消息。 Recv:获取队列中的消息,并返回消息,如果消息类型是ActorExit,则抛出ActorExit异常。 Send:往队列中写入消息 Close:发送一个ActorExit类型的消息,recv接受到这个消息后,抛出ActorExit异常,run方法结束,bootstrap中捕获这个异常后,将self._terminated信号修改为true。 Join:如果self._terminated为false线程阻塞。

思考:在程序最后面加了一个p.join(),为何要加这个呢? 因为close方法只是发送了一个ActorExit类型的消息到队列,而消费队列的线程是一个守护线程,如果close后面没有任何需要执行的代码,则主线程就结束了,子线程也会随着结束,那么很有可能‘world’这条消息也没消费到,子线程就退出了,所以加个p.join()是为了主线程被阻塞,知道子线程消费到ActorExit类型的消息,将self._terminated设置为true才会唤醒主线程,主线程退出,子线程也退出,此时队列中的消息肯定是已经消费完了。 上面这个案例中,actor只是简单的模拟了一个队列传递消息的例子,实际上这些消息还可以是一个函数,比如下面这个案例:

from threading import Event
class Result:
    def __init__(self):
        self._evt = Event()
        self._result = None

    def set_result(self, value):
        self._result = value

        self._evt.set()

    def result(self):
        self._evt.wait()
        return self._result

class Worker(Actor):
    def submit(self, func, *args, **kwargs):
        r = Result()
        self.send((func, args, kwargs, r))
        return r

    def run(self):
        while True:
            func, args, kwargs, r = self.recv()
            r.set_result(func(*args, **kwargs))

def add(a, b):
    return a+b


worker = Worker()
worker.start()
r = worker.submit(add, 2, 3)
worker.close()
worker.join()
print(r.result())
输出:
5

这个案例中,worker继承了actor,在submit方法中,将函数和函数的参数作为一个元祖发送到队列中,并返回一个Result实例,消费者线程拿到函数和参数后,执行这个函数,并将函数的返回值放到Result实例的_result属性上。_result属性值可以通过result()方法获取,这个方法是阻塞的,只有当线程将函数执行完并将函数结果通过set_result方法赋值给_result属性上时,result()方法才能得到返回结果,这里面的异步获取返回结果的过程是通过event()来控制的。

这种通过函数作为消息的方式来传递并由消费线程来执行的思想在一些分布式系统里面会经常用到。后面碰到这种案例再给兄弟们分享出来。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • python自学成才之路 类详细用法

    python是一门面向对象编程的语言,python的类和java中的类思想上有很多一样的地方,比如python类也是通过class修饰,里面也有成员属性,成员方...

    我是李超人
  • Python自学成才之路 魔术方法之一元,二元运算符

    一元运算符有+,-,~等,要想使这些一元运算符对对象起作用,需要在定义类的时候实现对应的魔术方法。

    我是李超人
  • CentOs7搭建rabbitmq集群

    环境:三台centos7.4.1708_x86_64 机器 192.168.1.186、192.168.1.187、192.168.1.188

    我是李超人
  • Python3备份

    py3study
  • Python:线程、进程与协程(3)——

        Queue模块是提供队列操作的模块,队列是线程间最常用的交换数据的形式。该模块提供了三种队列:

    py3study
  • PyQt5 图形界面-实现按钮监听事件

    PyQt5 生成的代码由独有的一套界面组件构成的,和 tkinter 有一定区别呢! 我们绑定点击事件的方法 clicked.connect()

    小蓝枣
  • 720p实时视频插帧 | 旷视科技&北大提出RIFE

    该文提出一种实时中间流估计(Intermediate Flow Estimation)算法RIFE用于视频插帧。现有视频插帧大多先估计双向光流,然后采用线性组...

    AIWalker
  • Python实现带有阻塞和超时放弃功能的队列结构

    本文代码对Python列表进行封装并模拟了队列结构,入队时如果队列已满则阻塞当前线程,超时则放弃;出队时如果队列已空则阻塞当前线程,超时则放弃。 import ...

    Python小屋屋主
  • 进阶的运维开发(三)- 反射

    反射就是通过字符串的形式,导入模块,通过字符串的形式,去模块寻找制定函数并执行。利用字符串的形式去对象(模块)中操作(查找/获取/删除/添加)成员,一种基于字符...

    奔跑的骆驼
  • python ImageDraw类实现几何图形的绘制与文字的绘制

    python PIL图像处理模块中的ImageDraw类支持各种几何图形的绘制和文本的绘制,如直线、椭圆、弧、弦、多边形以及文字等。

    砸漏

扫码关注云+社区

领取腾讯云代金券