前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 源码深度解析-Async IO的实现

Flink 源码深度解析-Async IO的实现

作者头像
Spark学习技巧
发布2022-04-18 15:24:12
2950
发布2022-04-18 15:24:12
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

Async I/O的使用方式

在Flink中使用Async I/O的话,需要有一个支持异步请求的客户端,或者以多线程异步的方式来将同步操作转化为异步操作调用;

以官方文档给出的说明为例:

代码语言:javascript
复制
// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

    /** The database specific client that can issue concurrent requests with callbacks */
    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        client = new DatabaseClient(host, post, credentials);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
        // issue the asynchronous request, receive a future for result
        // 发起异步请求,返回结果是一个Future
        final Future<String> result = client.query(key);

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        // 请求完成时的回调,将结果交给 ResultFuture
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Normally handled explicitly.
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
        });
    }
}

// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation
// 应用async I/O转换,设置等待模式、超时时间、以及进行中的异步请求的最大数量
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

AsyncDataStream提供了两种调用方法,分别是orderedWait和unorderedWait,这分别对应了有序和无序两种输出模式。

之所以会提供两种输出模式,是因为异步请求的完成时间是不确定的,先发出的请求的完成时间可能会晚于后发出的请求。

  • 在“有序”的输出模式下,所有计算结果的提交完全和消息的到达顺序一致;
  • 而在“无序”的输出模式下,计算结果的提交则是和请求的完成顺序相关的,先处理完成的请求的计算结果会先提交。

值得注意的是,在使用“事件时间”的情况下,“无序”输出模式仍然可以保证watermark的正常处理,即在两个watermark之间的消息的异步请求结果可能是异步提交的,但在watermark之后的消息不能先于该watermark之前的消息提交。

由于异步请求的完成时间不确定,需要设置请求的超时时间,并配置同时进行中的异步请求的最大数量。

Async I/O的实现

AsyncDataStream在运行时被转换为AsyncWaitOperator算子,它是AbstractUdfStreamOperator的子类。其AsyncWaitOperator的基本实现原理如下:

基本原理

AsyncWaitOperator算子相比于其它算子的最大不同在于,它的输入和输出并不是同步的。

因此,在AsyncWaitOperator内部采用了一种“生产者-消费者”模型,基于一个队列解耦异步计算和计算结果的提交。StreamElementQueue提供了一种队列的抽象,一个“消费者”线程Emitter从中取出已完成的计算结果,并提交给下游算子,而异步请求则充当了队列“生产者”的角色;

如图所示,AsyncWaitOperator主要由两部分组成:StreamElementQueue和Emitter。

StreamElementQueue是一个Promise队列,所谓Promise是一种异步抽象表示将来会有一个值,这个队列是未完成的Promise队列,也就是进行中的请求队列。Emitter是一个单独的线程,负责发送消息(收到的异步回复)给下游。

图中E5表示进入该算子的第五个元素(”Element-5”),在执行过程中首先会将其包装成一个“Promise” P5,然后将P5放入队列。最后调用AsyncFunction的asyncInvoke方法,该方法会向外部服务发起一个异步的请求,并注册回调。

该回调会在异步请求成功返回时调用AsyncCollector.collect方法将返回的结果交给框架处理。

实际上AsyncCollector是一个Promise,也就是 P5,在调用collect的时候会标记Promise为完成状态,并通知Emitter线程有完成的消息可以发送了。Emitter就会从队列中拉取完成的Promise,并从Promise中取出消息发送给下游。

代码语言:javascript
复制
public class AsyncWaitOperator<IN, OUT>
      extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
      implements OneInputStreamOperator<IN, OUT>, OperatorActions {
          
    /** Queue to store the currently in-flight stream elements into. */
    private transient StreamElementQueue queue;               // 存储带有异步返回值的请求队列
    
    /** Pending stream element which could not yet added to the queue. */
    private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;
    
    private transient ExecutorService executor;
    
    /** Emitter for the completed stream element queue entries. */
    private transient Emitter<OUT> emitter;                  // 异步返回后的消费线程
    
    /** Thread running the emitter. */
    private transient Thread emitterThread;
    
    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
       super.setup(containingTask, config, output);
       this.checkpointingLock = getContainingTask().getCheckpointLock();
       this.inStreamElementSerializer = new StreamElementSerializer<>(getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));
    
       // create the operators executor for the complete operations of the queue entries
       this.executor = Executors.newSingleThreadExecutor();
       // 根据不同的数据输出模式 有序、无序;选择构建不同的StreamElementQueue queue
       switch (outputMode) {
          case ORDERED:
             queue = new OrderedStreamElementQueue(
                capacity,
                executor,
                this);
             break;
          case UNORDERED:
             queue = new UnorderedStreamElementQueue(
                capacity,
                executor,
                this);
             break;
          default:
             throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
       }
    }
    
    @Override
    public void open() throws Exception {
       super.open();
       // create the emitter
       this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
    
       // start the emitter thread
       // 构建 消费者线程 emitter Thread 
       this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
       emitterThread.setDaemon(true);
       emitterThread.start();
       // .........
    }
    
    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
       final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
       // 注册一个定时器,在超时时调用 timeout 方法
       if (timeout > 0L) {
          // register a timeout for this AsyncStreamRecordBufferEntry
          long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
          final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
             timeoutTimestamp,
             new ProcessingTimeCallback() {
                @Override
                public void onProcessingTime(long timestamp) throws Exception {
                   userFunction.timeout(element.getValue(), streamRecordBufferEntry);
                }
             });
          // Cancel the timer once we've completed the stream record buffer entry. This will remove
          // the register trigger task
          streamRecordBufferEntry.onComplete(
             (StreamElementQueueEntry<Collection<OUT>> value) -> {
                timerFuture.cancel(true);
             },
             executor);
       }
       // 加入队列
       addAsyncBufferEntry(streamRecordBufferEntry);
       // 发送异步请求
       userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
    }
 
 //尝试将待完成的请求加入队列,如果队列已满(到达异步请求的上限),会阻塞
 private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
       assert(Thread.holdsLock(checkpointingLock));
       pendingStreamElementQueueEntry = streamElementQueueEntry;
       while (!queue.tryPut(streamElementQueueEntry)) { // 将该请求加入队列;如果队列已满(到达异步请求的上限),会阻塞
          // we wait for the emitter to notify us if the queue has space left again
          checkpointingLock.wait();
       }
       pendingStreamElementQueueEntry = null;
    }
}

public class Emitter<OUT> implements Runnable {
    @Override
    public void run() {
       try {
          while (running) {
             LOG.debug("Wait for next completed async stream element result.");
             // 从队列阻塞地获取元素,之后再向下游传递
             AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();
             output(streamElementEntry);
          }
       } catch (InterruptedException e) {
             // .........
       } 
    }
}

有序模式

在“有序”模式下,所有异步请求的结果必须按照消息的到达顺序提交到下游算子。在这种模式下,StreamElementQueue的具体是实现是OrderedStreamElementQueue。

OrderedStreamElementQueue的底层是一个有界的队列,异步请求的计算结果按顺序加入到队列中,只有队列头部的异步请求完成后才可以从队列中获取计算结果。

有序模式比较简单,使用一个队列就能实现。所有新进入该算子的元素(包括watermark),都会包装成Promise并按到达顺序放入该队列。

如下图所示,尽管P4的结果先返回,但并不会发送,只有P1(队首)的结果返回了才会触发Emitter拉取队首元素进行发送。如下图所示:

代码语言:javascript
复制
public class OrderedStreamElementQueue implements StreamElementQueue {
 /** Capacity of this queue. */
 private final int capacity;

 /** Queue for the inserted StreamElementQueueEntries. */
 private final ArrayDeque<StreamElementQueueEntry<?>> queue;
 
    @Override
    public AsyncResult peekBlockingly() throws InterruptedException {  // 从队列中阻塞地获取已异步完成的元素
       lock.lockInterruptibly();
       try {
          while (queue.isEmpty() || !queue.peek().isDone()) {
             headIsCompleted.await();
          }
          // 只有队列头部的请求完成后才解除阻塞状态
          LOG.debug("Peeked head element from ordered stream element queue with filling degree " + "({}/{}).", queue.size(), capacity);
          return queue.peek();
       } finally {
          lock.unlock();
       }
    }
 
  @Override
    public AsyncResult poll() throws InterruptedException {
       lock.lockInterruptibly();
       try {
          while (queue.isEmpty() || !queue.peek().isDone()) {
             headIsCompleted.await();
          }
          notFull.signalAll();
          LOG.debug("Polled head element from ordered stream element queue. New filling degree " + "({}/{}).", queue.size() - 1, capacity);
          return queue.poll();
       } finally {
          lock.unlock();
       }
    }
    
    @Override
    public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
       lock.lockInterruptibly();  // 将该请求加入队列;如果队列已满(到达异步请求的上限),返回false,其外部会阻塞
       try {
          if (queue.size() < capacity) {   // 未达容量上限
             addEntry(streamElementQueueEntry);  
             LOG.debug("Put element into ordered stream element queue. New filling degree " + "({}/{}).", queue.size(), capacity);
             return true;
          } else {
             LOG.debug("Failed to put element into ordered stream element queue because it " + "was full ({}/{}).", queue.size(), capacity);
             return false;
          }
       } finally {
          lock.unlock();
       }
    }
}

无序模式

在“无序”模式下,异步计算结果的提交不是由消息到达的顺序确定的,而是取决于异步请求的完成顺序。

当然,在使用“事件时间”的情况下,要保证watermark语义的正确性。

在使用“处理时间”的情况下,由于不存在Watermark,因此可以看作一种特殊的情况。

在UnorderedStreamElementQueue中巧妙地实现了这两种情况。

ProcessingTime无序

ProcessingTime无序也比较简单,因为没有watermark,不需要协调watermark与消息的顺序性,所以使用两个队列就能实现,一个uncompletedQueue、一个completedQueue。所有新进入该算子的元素,同样的包装成Promise并放入uncompletedQueue队列,当uncompletedQueue队列中任意的Promise返回了数据,则将该Promise移到completedQueue队列中,并通知Emitter消费。如下图所示:

EventTime无序

EventTime无序类似于有序与ProcessingTime无序的结合体。因为有watermark,需要协调watermark与消息之间的顺序性,所以uncompletedQueue中存放的元素从原先的Promise变成了Promise集合。

如果进入算子的是消息元素,则会包装成Promise放入队尾的集合中。

如果进入算子的是watermark,也会包装成Promise并放到一个独立的集合中,再将该集合加入到uncompletedQueue队尾,最后再创建一个空集合加到uncompletedQueue队尾。

这样,watermark就成了消息顺序的边界。

只有处在队首的集合中的Promise返回了数据,才能将该Promise移到completedQueue队列中,由Emitter消费发往下游。

只有队首集合空了,才能处理第二个集合。这样就保证了当且仅当某个watermark之前所有的消息都已经被发送了,该watermark才能被发送。

过程如下图所示:

代码语言:javascript
复制
public class UnorderedStreamElementQueue implements StreamElementQueue {
    /** Queue of uncompleted stream element queue entries segmented by watermarks. */
    private final ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue;
    
    /** Queue of completed stream element queue entries. */
    private final ArrayDeque<StreamElementQueueEntry<?>> completedQueue;
    
    /** First (chronologically oldest) uncompleted set of stream element queue entries. */
    private Set<StreamElementQueueEntry<?>> firstSet;
    
    // Last (chronologically youngest) uncompleted set of stream element queue entries. New
    // stream element queue entries are inserted into this set.
    private Set<StreamElementQueueEntry<?>> lastSet;

    @Override
    public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
       lock.lockInterruptibly();
       try {
          if (numberEntries < capacity) {
             addEntry(streamElementQueueEntry);
             LOG.debug("Put element into unordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity);
             return true;
          } else {
             LOG.debug("Failed to put element into unordered stream element queue because it " + "was full ({}/{}).", numberEntries, capacity);
             return false;
          }
       } finally {
          lock.unlock();
       }
    }
    
    private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
       assert(lock.isHeldByCurrentThread());
       if (streamElementQueueEntry.isWatermark()) {
          // 如果是watermark,就要构造一个只包含这个watermark的set加入到uncompletedQueue队列中
          lastSet = new HashSet<>(capacity);
          if (firstSet.isEmpty()) {
             firstSet.add(streamElementQueueEntry);
          } else {
             Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
             watermarkSet.add(streamElementQueueEntry);
             uncompletedQueue.offer(watermarkSet);
          }
          uncompletedQueue.offer(lastSet);
       } else {
          lastSet.add(streamElementQueueEntry);  // 正常记录,加入lastSet中
       }
    
       streamElementQueueEntry.onComplete(       // 设置异步请求完成后的回调
          (StreamElementQueueEntry<T> value) -> {
             try {
                onCompleteHandler(value);
             } catch (InterruptedException e) {
                // ......
             }
          }, executor);
       numberEntries++;
    }

    // 异步请求完成的回调
    public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
       lock.lockInterruptibly();
       try {
          // 如果完成的异步请求在firstSet中,那么就将firstSet中已完成的异步请求转移到completedQueue中
          if (firstSet.remove(streamElementQueueEntry)) {  
             completedQueue.offer(streamElementQueueEntry);
             while (firstSet.isEmpty() && firstSet != lastSet) {
                // 如果firset中所有的异步请求都完成了,那么就从uncompletedQueue获取下一个集合作为firstSet
                firstSet = uncompletedQueue.poll();
                Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();
                while (it.hasNext()) {
                   StreamElementQueueEntry<?> bufferEntry = it.next();
                   if (bufferEntry.isDone()) {
                      completedQueue.offer(bufferEntry);
                      it.remove();
                   }
                }
             }
             LOG.debug("Signal unordered stream element queue has completed entries.");
             hasCompletedEntries.signalAll();
          }
       } finally {
          lock.unlock();
       }
    }
    
    @Override
    public AsyncResult poll() throws InterruptedException {
       lock.lockInterruptibly();
       try {
          // 等待completedQueue中的元素
          while (completedQueue.isEmpty()) {
             hasCompletedEntries.await();
          }
          numberEntries--;
          notFull.signalAll();
          LOG.debug("Polled element from unordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity);
          return completedQueue.poll();
       } finally {
          lock.unlock();
       }
    }    
}

容错

在异步调用模式下,可能会同时有很多个请求正在处理中。因而在进行快照的时候,需要将异步调用尚未完成,以及结果尚未提交给下游的消息加入到状态中。在恢复的时候,从状态中取出这些消息,再重新处理一遍。为了保证exactly-once特性,对于异步调用已经完成,且结果已经由emitter提交给下游的消息就无需保存在快照中。

代码语言:javascript
复制
public class AsyncWaitOperator<IN, OUT>
      extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
      implements OneInputStreamOperator<IN, OUT>, OperatorActions {
          
 /** Recovered input stream elements. */
 private transient ListState<StreamElement> recoveredStreamElements;

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
       super.initializeState(context);
       recoveredStreamElements = context
          .getOperatorStateStore()
          .getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
    }
    
    @Override
    public void open() throws Exception {
       super.open();
       // create the emitter
       // 创建emitter消费线程
       
       // process stream elements from state, since the Emit thread will start as soon as all
       // elements from previous state are in the StreamElementQueue, we have to make sure that the
       // order to open all operators in the operator chain proceeds from the tail operator to the
       // head operator.
       // 状态恢复的时候,从状态中取出所有未完成的消息,重新处理一遍
       if (recoveredStreamElements != null) {
          for (StreamElement element : recoveredStreamElements.get()) {
             if (element.isRecord()) {
                processElement(element.<IN>asRecord());
             }
             else if (element.isWatermark()) {
                processWatermark(element.asWatermark());
             }
             else if (element.isLatencyMarker()) {
                processLatencyMarker(element.asLatencyMarker());
             }
             else {
                throw new IllegalStateException("Unknown record type " + element.getClass() + " encountered while opening the operator.");
             }
          }
          recoveredStreamElements = null;
       }
    }
    
    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
       super.snapshotState(context);
       // 先清除状态
       ListState<StreamElement> partitionableState =
          getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
       partitionableState.clear();
       
       // 将所有未完成处理请求对应的消息加入状态中
       Collection<StreamElementQueueEntry<?>> values = queue.values();
       try {
          for (StreamElementQueueEntry<?> value : values) {
             partitionableState.add(value.getStreamElement());
          }
    
          // add the pending stream element queue entry if the stream element queue is currently full
          if (pendingStreamElementQueueEntry != null) {
             partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
          }
       } catch (Exception e) {
          partitionableState.clear();
          throw new Exception("Could not add stream element queue entries to operator state " + "backend of operator " + getOperatorName() + '.', e);
       }
    }
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-03-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Async I/O的使用方式
  • Async I/O的实现
  • 基本原理
  • 有序模式
  • 无序模式
    • ProcessingTime无序
      • EventTime无序
      • 容错
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档