前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink自定义metric监控流入量

Flink自定义metric监控流入量

作者头像
Flink实战剖析
发布2022-04-18 12:02:06
1.3K0
发布2022-04-18 12:02:06
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

flink任务本身提供了各种类型的指标监控,细化到了每一个Operator的流入/流出量、速率、Watermark值等,通常在实际应用中需要对接入数据做格式化例如转json,符合要求的数据会向下流动,不符合要求或者格式化异常称为脏数据会被过滤掉,现在目标实现一个通用化方式能够对正常数据与脏数据进行指标统计。 实现思路:

  1. flink metric类型分为Counter、Gauge、Histogram、Meter,需要统计的是一个累加值因此选取Counter类型的metirc
  2. 由于是对任务的流入监控,因此需要在Source端进行处理,通常对接的数据源是kafka, 而flink本身已经提供了kakfa connector,并且开放了数据反序列化的接口DeserializationSchema与抽象类AbstractDeserializationSchema,实现该接口或者继承抽象类可以完成数据的反序列化与格式化,由于每一条数据都需要进过反序列化处理,那么可以在反序列化的同时进行指标统计
  3. 在flink中自定义Metric入口是RuntimeContext, 但是在反序列化抽象类中并没有提供访问RuntimeContext的接口,一般是在RichFunction中,与其相关只有FlinkKafkaConsumer,那么就可以在FlinkKafkaConsumer中将获取到的RuntimeContext传给AbstractDeserializationSchema

实现步骤:

  1. 自定义一个继承AbstractDeserializationSchema的抽象类AbsDeserialization,里面包含RuntimeContext与两个统计的Counter,并且包含一个初始化Counter的方法initMetric
  2. 自定义一个继承FlinkKafkaConsumer010的抽象类,里面包含AbsDeserialization属性、构造化方法,并且重写run方法,在run方法里面给AbsDeserialization设置RuntimeContex对象并且调用其initMetric, 最后调用父类run方法

代码如下:

代码语言:javascript
复制
public abstract class AbsDeserialization<T> extends AbstractDeserializationSchema<T> {



    private RuntimeContext runtimeContext;

    private String DIRTY_DATA_NAME="dirtyDataNum";

    private String NORMAL_DATA_NAME="normalDataNum";



    protected transient Counter dirtyDataNum;



    protected transient Counter normalDataNum;



    public RuntimeContext getRuntimeContext() {

        return runtimeContext;

    }



    public void setRuntimeContext(RuntimeContext runtimeContext) {

        this.runtimeContext = runtimeContext;

    }



    public void initMetric()

    {

        dirtyDataNum=runtimeContext.getMetricGroup().counter(DIRTY_DATA_NAME);

        normalDataNum=runtimeContext.getMetricGroup().counter(NORMAL_DATA_NAME);

    }



}
 
代码语言:javascript
复制
public class CustomerKafkaConsumer<T> extends FlinkKafkaConsumer010<T> {



    private AbsDeserialization<T> valueDeserializer;



    public CustomerKafkaConsumer(String topic, AbsDeserialization<T> valueDeserializer, Properties props) {

        super(topic, valueDeserializer, props);

        this.valueDeserializer=valueDeserializer;

    }



    @Override public void run(SourceContext<T> sourceContext) throws Exception {

        valueDeserializer.setRuntimeContext(getRuntimeContext());

        valueDeserializer.initMetric();

        super.run(sourceContext);

    }

}

使用案例,只要定义一个继承AbsDeserialization类即可,

代码语言:javascript
复制
class ParseDeserialization extends AbsDeserialization[RawData] {



  override def deserialize(message: Array[Byte]): RawData = {



    try {

      val msg = new String(message)

      val rawData = JSON.parseObject(msg, classOf[RawData])

      normalDataNum.inc() //正常数据指标

      rawData

    } catch {

      case e:Exception=>{

        dirtyDataNum.inc()   //脏数据指标

        null

      }

    }

  }



}
 

source使用方式:

代码语言:javascript
复制
val consumer: CustomerKafkaConsumer[RawData] = new CustomerKafkaConsumer[RawData](topic, new ParseDeserialization, kafkaPro) 

那么在任务运行中,可以在flink web的监控界面查看到normalDataNum 、dirtyDataNum 两个指标值,另外在AbsDeserialization里面也可以定义一些流入速率等监控。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-11-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档