前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 中的一把锁

Flink 中的一把锁

作者头像
Flink实战剖析
发布2022-04-18 13:17:59
6150
发布2022-04-18 13:17:59
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

那把锁

锁用于多线程安全场景下,在Flink中存在一把锁,被用于数据处理线程、定时器调用线程、checkpoint线程。在StreamTask中定义了一个Object对象lock,通过使用synchronized方式进行同步,在task的初始化过程中该对象传给了SystemProcessingTimeService、StreamInputProcessor、StreamTwoInputProcessor。

数据处理线程

这里所说的数据处理线程表示正常的数据处理流程,可以认为就是processElement处理过程,StreamInputProcessor/StreamTwoInputProcessor主要工作就是读取数据然后调用对应的operator处理读取到的数据也就是调用processElement方法:

代码语言:javascript
复制
StreamRecord<IN> record = recordOrMark.asRecord();
            synchronized (lock) {
              numRecordsIn.inc();
              streamOperator.setKeyContextElement1(record);
              streamOperator.processElement(record);
            }

通过源码可以发现每次调用processElement之前都会使用synchronized锁住lock,然后才能进行后续的处理。

定时器调用线程

Flink中有一个很重要的功能那就是定时器,窗口触发需要定时器、用户自定义注册定时器需要定时器,但是定时器又可以按照时间属性分为两种:事件时间语义下watermark推进触发的定时器、处理时间语义下定时调度的定时器。

watermark也是作为一种StreamElement在管道中流动,在watermark向前推进时也是需要获得锁才能触发定时器:

代码语言:javascript
复制
public void handleWatermark(Watermark watermark) {
      try {
        synchronized (lock) {
          watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
          operator.processWatermark(watermark);
        }
      } catch (Exception e) {
        throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
      }
    }

同样在处理时间触发的定时器也是需要获得锁才能执行:

代码语言:javascript
复制
    //SystemProcessingTimeServuce中
    public void run() {
      synchronized (lock) {
        try {
          if (serviceStatus.get() == STATUS_ALIVE) {
            target.onProcessingTime(timestamp);
          }
        } catch (Throwable t) {
          TimerException asyncException = new TimerException(t);
          exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
        }
      }
    }

定时触发为什么需要锁?在processElement中可能会操作状态、在定时回调onTimer中也可能会操作状态,那么状态就是作为共享数据,为了保证数据的一致性,所以这里加了锁。

checkpoint线程

checkpoint是由jobmaster协调完成的,会定时向source端发送barrier标记然后在数据流中流动,checkpoint是为了对状态某一个时间点的备份,同样与processElement存在状态数据的竞争,为了保证数据的一致性,在checkpoint过程中会存在锁竞争:

代码语言:javascript
复制
//StreamTask中performCheckpoint方法
synchronized (lock) {
      if (isRunning) {
        operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());

        // Step (2): Send the checkpoint barrier downstream
        operatorChain.broadcastCheckpointBarrier(
            checkpointMetaData.getCheckpointId(),
            checkpointMetaData.getTimestamp(),
            checkpointOptions);

        // Step (3): Take the state snapshot. This should be largely asynchronous, to not
        //           impact progress of the streaming topology
        checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
        return true;
      }
      else {
      .....

      }
    }

—END—

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-01-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档