前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的ActorGateway

聊聊flink的ActorGateway

原创
作者头像
code4it
发布2019-03-16 11:42:02
5050
发布2019-03-16 11:42:02
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下flink的ActorGateway

ActorGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java

代码语言:javascript
复制
public interface ActorGateway extends Serializable {
​
    /**
     * Sends a message asynchronously and returns its response. The response to the message is
     * returned as a future.
     *
     * @param message Message to be sent
     * @param timeout Timeout until the Future is completed with an AskTimeoutException
     * @return Future which contains the response to the sent message
     */
    Future<Object> ask(Object message, FiniteDuration timeout);
​
    /**
     * Sends a message asynchronously without a result.
     *
     * @param message Message to be sent
     */
    void tell(Object message);
​
    /**
     * Sends a message asynchronously without a result with sender being the sender.
     *
     * @param message Message to be sent
     * @param sender Sender of the message
     */
    void tell(Object message, ActorGateway sender);
​
    /**
     * Forwards a message. For the receiver of this message it looks as if sender has sent the
     * message.
     *
     * @param message Message to be sent
     * @param sender Sender of the forwarded message
     */
    void forward(Object message, ActorGateway sender);
​
    /**
     * Retries to send asynchronously a message up to numberRetries times. The response to this
     * message is returned as a future. The message is re-sent if the number of retries is not yet
     * exceeded and if an exception occurred while sending it.
     *
     * @param message Message to be sent
     * @param numberRetries Number of times to retry sending the message
     * @param timeout Timeout for each sending attempt
     * @param executionContext ExecutionContext which is used to send the message multiple times
     * @return Future of the response to the sent message
     */
    Future<Object> retry(
            Object message,
            int numberRetries,
            FiniteDuration timeout,
            ExecutionContext executionContext);
​
    /**
     * Returns the path of the remote instance.
     *
     * @return Path of the remote instance.
     */
    String path();
​
    /**
     * Returns the underlying actor with which is communicated
     *
     * @return ActorRef of the target actor
     */
    ActorRef actor();
​
    /**
     * Returns the leaderSessionID associated with the remote actor or null.
     *
     * @return Leader session ID if its associated with this gateway, otherwise null
     */
    UUID leaderSessionID();
}
  • ActorGateway接口定义了ask、tell、forward、retry、path、actor、leaderSessionID方法;它有一个实现类为AkkaActorGateway

AkkaActorGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java

代码语言:javascript
复制
public class AkkaActorGateway implements ActorGateway, Serializable {
​
    private static final long serialVersionUID = 42L;
​
    // ActorRef of the remote instance
    private final ActorRef actor;
​
    // Associated leader session ID, which is used for RequiresLeaderSessionID messages
    private final UUID leaderSessionID;
​
    // Decorator for messages
    private final MessageDecorator decorator;
​
    public AkkaActorGateway(ActorRef actor, UUID leaderSessionID) {
        this.actor = Preconditions.checkNotNull(actor);
        this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID);
        // we want to wrap RequiresLeaderSessionID messages in a LeaderSessionMessage
        this.decorator = new LeaderSessionMessageDecorator(leaderSessionID);
    }
​
    /**
     * Sends a message asynchronously and returns its response. The response to the message is
     * returned as a future.
     *
     * @param message Message to be sent
     * @param timeout Timeout until the Future is completed with an AskTimeoutException
     * @return Future which contains the response to the sent message
     */
    @Override
    public Future<Object> ask(Object message, FiniteDuration timeout) {
        Object newMessage = decorator.decorate(message);
        return Patterns.ask(actor, newMessage, new Timeout(timeout));
    }
​
    /**
     * Sends a message asynchronously without a result.
     *
     * @param message Message to be sent
     */
    @Override
    public void tell(Object message) {
        Object newMessage = decorator.decorate(message);
        actor.tell(newMessage, ActorRef.noSender());
    }
​
    /**
     * Sends a message asynchronously without a result with sender being the sender.
     *
     * @param message Message to be sent
     * @param sender Sender of the message
     */
    @Override
    public void tell(Object message, ActorGateway sender) {
        Object newMessage = decorator.decorate(message);
        actor.tell(newMessage, sender.actor());
    }
​
    /**
     * Forwards a message. For the receiver of this message it looks as if sender has sent the
     * message.
     *
     * @param message Message to be sent
     * @param sender Sender of the forwarded message
     */
    @Override
    public void forward(Object message, ActorGateway sender) {
        Object newMessage = decorator.decorate(message);
        actor.tell(newMessage, sender.actor());
    }
​
    /**
     * Retries to send asynchronously a message up to numberRetries times. The response to this
     * message is returned as a future. The message is re-sent if the number of retries is not yet
     * exceeded and if an exception occurred while sending it.
     *
     * @param message Message to be sent
     * @param numberRetries Number of times to retry sending the message
     * @param timeout Timeout for each sending attempt
     * @param executionContext ExecutionContext which is used to send the message multiple times
     * @return Future of the response to the sent message
     */
    @Override
    public Future<Object> retry(
            Object message,
            int numberRetries,
            FiniteDuration timeout,
            ExecutionContext executionContext) {
​
        Object newMessage = decorator.decorate(message);
​
        return AkkaUtils.retry(
            actor,
            newMessage,
            numberRetries,
            executionContext,
            timeout);
    }
​
    /**
     * Returns the ActorPath of the remote instance.
     *
     * @return ActorPath of the remote instance.
     */
    @Override
    public String path() {
        return actor.path().toString();
    }
​
    /**
     * Returns {@link ActorRef} of the target actor
     *
     * @return ActorRef of the target actor
     */
    @Override
    public ActorRef actor() {
        return actor;
    }
​
    @Override
    public UUID leaderSessionID() {
        return leaderSessionID;
    }
​
    @Override
    public String toString() {
        return String.format("AkkaActorGateway(%s, %s)", actor.path(), leaderSessionID);
    }
}
  • AkkaActorGateway实现了ActorGateway接口,它的构造器要求输入ActorRef及leaderSessionID,同时基于leaderSessionID创建了LeaderSessionMessageDecorator;ask、tell、forward、retry方法均首先调用LeaderSessionMessageDecorator.decorate方法包装message参数,然后再去调用ActorRef的相应方法

MessageDecorator

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java

代码语言:javascript
复制
public interface MessageDecorator extends java.io.Serializable {
​
    /**
     * Decorates a message
     *
     * @param message Message to decorate
     * @return Decorated message
     */
    Object decorate(Object message);
}
  • MessageDecorator接口定义了decorate方法用于包装message,它有一个实现类为LeaderSessionMessageDecorator

LeaderSessionMessageDecorator

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/messages/LeaderSessionMessageDecorator.java

代码语言:javascript
复制
public class LeaderSessionMessageDecorator implements MessageDecorator {
​
    private static final long serialVersionUID = 5359618147408392706L;
    
    /** Leader session ID with which the RequiresLeaderSessionID messages will be decorated */
    private final UUID leaderSessionID;
​
    /**
     * Sets the leader session ID with which the messages will be decorated.
     *
     * @param leaderSessionID Leader session ID to be used for decoration
     */
    public LeaderSessionMessageDecorator(UUID leaderSessionID) {
        this.leaderSessionID = leaderSessionID;
    }
​
    @Override
    public Object decorate(Object message) {
        if (message instanceof RequiresLeaderSessionID) {
            return new JobManagerMessages.LeaderSessionMessage(leaderSessionID, message);
        } else {
            return message;
        }
    }
}
  • LeaderSessionMessageDecorator实现了MessageDecorator接口,其decorate方法判断message是RequiresLeaderSessionID类型的话,则返回JobManagerMessages.LeaderSessionMessage,否则返回原始的message

JobManagerMessages.LeaderSessionMessage

flink-1.7.2/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala

代码语言:javascript
复制
object JobManagerMessages {
​
  /** Wrapper class for leader session messages. Leader session messages implement the
    * [[RequiresLeaderSessionID]] interface and have to be wrapped in a [[LeaderSessionMessage]],
    * which also contains the current leader session ID.
    *
    * @param leaderSessionID Current leader session ID
    * @param message [[RequiresLeaderSessionID]] message to be wrapped in a [[LeaderSessionMessage]]
    */
  case class LeaderSessionMessage(leaderSessionID: UUID, message: Any)
​
  //......
}
  • JobManagerMessages.LeaderSessionMessage是一个case class,它有两个属性,分别是leaderSessionID及message

小结

  • ActorGateway接口定义了ask、tell、forward、retry、path、actor、leaderSessionID方法;它有一个实现类为AkkaActorGateway
  • AkkaActorGateway实现了ActorGateway接口,它的构造器要求输入ActorRef及leaderSessionID,同时基于leaderSessionID创建了LeaderSessionMessageDecorator;ask、tell、forward、retry方法均首先调用LeaderSessionMessageDecorator.decorate方法包装message参数,然后再去调用ActorRef的相应方法
  • MessageDecorator接口定义了decorate方法用于包装message,它有一个实现类为LeaderSessionMessageDecorator;LeaderSessionMessageDecorator实现了MessageDecorator接口,其decorate方法判断message是RequiresLeaderSessionID类型的话,则返回JobManagerMessages.LeaderSessionMessage,否则返回原始的message;JobManagerMessages.LeaderSessionMessage是一个case class,它有两个属性,分别是leaderSessionID及message

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ActorGateway
  • AkkaActorGateway
  • MessageDecorator
  • LeaderSessionMessageDecorator
  • JobManagerMessages.LeaderSessionMessage
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档