前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink异步io 转

flink异步io 转

作者头像
stys35
发布2019-03-20 09:57:33
1.2K0
发布2019-03-20 09:57:33
举报
文章被收录于专栏:工作笔记精华工作笔记精华

状态

现状:已发布

讨论主题http:  //apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Proposal-for-Asynchronous-IO-in-FLINK-tt13497.html

JIRA:  FLINK-4391-为已解决的流提供异步操作支持

发布:  Flink 1.2

Google文档https:  //docs.google.com/document/d/1Lr9UYXEz6s6R_3PWg3bZQLF3upGaNEkc0rQCFSzaYDI/edit

请将讨论保留在邮件列表上,而不是评论维基(维基讨论快速笨拙)。

动机

在大多数情况下,I / O访问是一个耗时的过程,使得单个操作员的TPS远低于内存计算,特别是对于流式作业,低延迟是用户最关心的问题。启动多个线程可能是处理此问题的一个选项,但缺点是显而易见的:最终用户的编程模型可能会变得更加复杂,因为他们必须在运算符中实现线程模型。此外,他们必须注意与检查点协调。

条款

AsyncFunction:异步I / O将在AsyncFunction中触发。

AsyncWaitOperator:一个将调用AsyncFunction的StreamOperator。

AsyncCollector:对于每个输入流记录,将创建AsyncCollector并将其传递到用户的回调以获取异步i / o结果。

AsyncCollectorBuffer:保留所有AsyncCollector的缓冲区。

发送器线程:AsyncCollectorBuffer中的一个工作线程,当一些AsyncCollectors完成异步i / o并将结果发送到以下操作符时发出信号。

公共接口

添加了一个名为AsyncDataStream的辅助类,以提供将AsyncFunction(将执行异步i / o操作)添加到FLINK流作业的方法。

AsyncDataStream.java

1 2 3 4 五 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23

public class AsyncDataStream {  /**   * Add an AsyncWaitOperator. The order of output stream records may be reordered.   *   * @param in Input data stream   * @param func AsyncFunction   * @bufSize The max number of async i/o operation that can be triggered   * @return A new DataStream.   */  public static DataStream<OUT> unorderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, int bufSize);  public static DataStream<OUT> unorderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func);     /**   * Add an AsyncWaitOperator. The order of output stream records is guaranteed to be the same as input ones.   *   * @param func AsyncWaitFunction   * @param func AsyncFunction   * @bufSize The max number of async i/o operation that can be triggered   * @return A new DataStream.   */  public static DataStream<OUT> orderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, int bufSize);  public static DataStream<OUT> orderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func); }

提议的变更

概观

下图说明了如何处理流式传输记录

  • 到达AsyncWaitOperator
  • 从任务故障转移中恢复
  • 快照状态
  • 由Emitter Thread发出

序列图

AsyncFunction

AsyncFunction 在AsyncWaitOperator中用作函数,它看起来像StreamFlatMap运算符,具有open()/ processElement(StreamRecord <IN> record)/ processWatermark(Watermark mark)。

对于用户的混凝土AsyncFunction,所述asyncInvoke(IN输入,AsyncCollector <OUT>集电极)必须重写以供应代码开始异步操作。

AsyncFunction.java

1 2 3 4 五 6 7 8 9 10 11 12 13 14 15 16

public interface AsyncFunction<IN, OUT> extends Function, Serializable {   /**    * Trigger async operation for each stream input.    * The AsyncCollector should be registered into async client.    *    * @param input Stream Input    * @param collector AsyncCollector    */   void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception; }    public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction     implements AsyncFunction<IN, OUT> {   @Override   public abstract void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception; }

对于AsyncWaitOperator的每个输入流的记录,它们将被处理通过AsyncFunction.asyncInvoke(IN输入,AsyncCollector <OUT> CB)。然后AsyncCollector将附加到AsyncCollectorBuffer中。稍后我们将介绍AsyncCollector和AsyncCollectorBuffer。

AsyncCollector

AsyncCollector由AsyncWaitOperator创建,并传递到AsyncFunction,它应该被添加到用户的回调中。它充当从用户代码获取结果或错误的角色,并通知AsyncCollectorBuffer发出结果。

特定于用户的函数是collect,并且应该在异步操作完成或抛出错误时调用它们。

AsyncCollector.java

1 2 3 4 五 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 三十

public class AsyncCollector<OUT> {   private List<OUT> result;   private Throwable error;   private AsyncCollectorBuffer<OUT> buffer;     /**    * Set result    * @param result A list of results.    */   public void collect(List<OUT> result) {     this.result = result;     buffer.mark(this);   }     /**    * Set error    * @param error A Throwable object.    */   public void collect(Throwable error) {     this.error = error;     buffer.mark(this);   }     /**    * Get result. Throw RuntimeException while encountering an error.    * @return A List of result.    * @throws RuntimeException RuntimeException wrapping errors from user codes.    */   public List<OUT> getResult() throws RuntimeException { ... } }

它是如何使用的

在调用AsyncFunction.asyncInvoke(IN输入,AsyncCollector <OUT>收集器)之前,AsyncWaitOperator将尝试从AsyncCollectorBuffer 获取AsyncCollector 的实例。然后它将被带入用户的回调函数。如果缓冲区已满,它将等待一些正在进行的回调完成。

异步操作完成后,AsyncCollector.collect()将获取结果或错误,并将通知AsyncCollectorBuffer。

AsyncCollector由FLINK实现。

AsyncCollectorBuffer

AsyncCollectorBuffer保留所有AsyncCollectors,并将结果发送到下一个节点。

调用AsyncCollector.collect()时,标记将放在AsyncCollectorBuffer中,表示已完成的AsyncCollectors。一个名为Emitter的工作线程也将在AsyncCollector获取结果后发出信号,然后根据有序或无序设置尝试发出结果。

为简单起见,我们将在以下文本中将任务引用到AsyncCollectorBuffer中的AsycnCollector。

有序和无序

根据用户配置,将保证或不保证输出元素的顺序。如果不能保证,稍后发布的完成的AsyncCollectors将会更早发出。

线程

线程将等待完成的AsyncCollectors。在发出信号时,它将处理缓冲区中的任务,如下所示:

  • 有序模式

如果缓冲区中的第一个任务完成,则Emitter将收集其结果,然后继续执行第二个任务。如果第一项任务尚未完成,再次等待

  • 无序模式

检查缓冲区中的所有已完成任务,并从缓冲区中最早的水印之前的那些任务中收集结果。

该线程和任务线程将访问完全 通过获取/释放锁。

信号 任务线程在所有任务完成后通知它已经处理完所有数据,并且可以关闭操作员。

从缓冲区中删除一些任务后的Signal Task Thread。

传播任务线程的异常。

任务线程

针对发射qi线程访问AsyncCollectorBuffer 。

获取并向缓冲区添加新的AsyncCollector,等待缓冲区已满。

水印

所有水印也将保存在AsyncCollectorBuffer中。当且仅当在发出当前水印之前的所有AsyncCollector之后才会发出水印。

状态,故障转移和检查点

州和检查站

所有输入StreamRecords都将保持状态。而不是在处理时逐个将每个输入流记录存储到状态,AsyncWaitOperator将在快照操作符状态时将AsyncCollectorBuffer中的所有输入流记录置于状态。在持久保存这些记录之前,将清除状态中的旧数据。

当所有障碍,在操作员已经抵达,安检点可以进行立即。

故障转移

在恢复操作员状态时,操作员将扫描状态中的所有元素,获取AsyncCollectors,调用AsyncFunction.asyncInvoke()并将它们插回AsyncCollectorBuffer。

笔记

异步资源共享

对于在同一个TaskManager(也就是相同的JVM)中的不同插槽(任务工作者)之间共享异步资源(如连接到hbase,netty连接)的情况,我们可以使连接静态,以便同一进程中的所有线程都可以共享相同的实例。

当然,请在使用这些资源时注意线程安全

用于回调

Example.java

1 2 3 4 五 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20

/***  ***/ public class HBaseAsyncFunction implements AsyncFunction<String, String> {   // initialize it while reading object   transient Connection connection;     @Override   public void asyncInvoke(String val, AsyncCollector<String> c) {     Get get = new Get(Bytes.toBytes(val));     Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test")));     // UserCallback is from user’s async client.     ((AsyncableHTableInterface) ht).asyncGet(get, new UserCallback(c));   } }   // create data stream public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {   DataStream<String> source = getDataStream(env);   DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());   stream.print(); }

对于ListenableFuture

Example2.java

1 2 3 4 五 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 三十 31 32 33 34 35 36 37 38

import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ListenableFuture;   public class HBaseAsyncFunction implements AsyncFunction<String, String> {   // initialize it while reading object   transient Connection connection;     @Override   public void asyncInvoke(String val, AsyncCollector<String> c) {     Get get = new Get(Bytes.toBytes(val));     Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test")));       ListenableFuture<Result> future = ht.asyncGet(get);     Futures.addCallback(future,       new FutureCallback<Result>() {         @Override public void onSuccess(Result result) {           List ret = new ArrayList<String>();           ret.add(result.get(...));           c.collect(ret);         }           @Override public void onFailure(Throwable t) {           c.collect(t);         }       },       MoreExecutors.newDirectExecutorService()     );   } }    // create data stream public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {   DataStream<String> source = getDataStream(env);   DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());   stream.print(); }

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673

(adsbygoogle = window.adsbygoogle || []).push({});

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 状态
  • 动机
  • 条款
  • 公共接口
  • 提议的变更
    • 概观
      • 序列图
        • AsyncFunction
          • AsyncCollector
            • 它是如何使用的
          • AsyncCollectorBuffer
            • 有序和无序
            • 线程
            • 任务线程
            • 水印
          • 状态,故障转移和检查点
            • 州和检查站
            • 故障转移
        • 笔记
          • 异步资源共享
            • 用于回调
              • 对于ListenableFuture
              相关产品与服务
              TDSQL MySQL 版
              TDSQL MySQL 版(TDSQL for MySQL)是腾讯打造的一款分布式数据库产品,具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档