前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Data Sink 介绍

Data Sink 介绍

作者头像
zeekling
发布2022-06-17 17:10:45
发布2022-06-17 17:10:45
1.2K00
代码可运行
举报
运行总次数:0
代码可运行

Data sink 有点把数据存储下来(落库)的意思。

如上图,Source 就是数据的来源,中间的 Compute 其实就是 Flink 干的事情,可以做一系列的操作,操作完后就把计算后的数据结果 Sink 到某个地方。(可以是 MySQL、ElasticSearch、Kafka、Cassandra 等)。这里我说下自己目前做告警这块就是把 Compute 计算后的结果 Sink 直接告警出来了(发送告警消息到钉钉群、邮件、短信等),这个 sink 的意思也不一定非得说成要把数据存储到某个地方去。其实官网用的 Connector 来形容要去的地方更合适,这个 Connector 可以有 MySQL、ElasticSearch、Kafka、Cassandra RabbitMQ 等。

Flink Data Sink

前面文章 Data Source 介绍 介绍了 Flink Data Source 有哪些,这里也看看 Flink Data Sink 支持的有哪些。

看下源码有哪些呢?

可以看到有 Kafka、ElasticSearch、Socket、RabbitMQ、JDBC、Cassandra POJO、File、Print 等 Sink 的方式。

SinkFunction

从上图可以看到 SinkFunction 接口有 invoke 方法,它有一个 RichSinkFunction 抽象类。

上面的那些自带的 Sink 可以看到都是继承了 RichSinkFunction 抽象类,实现了其中的方法,那么我们要是自己定义自己的 Sink 的话其实也是要按照这个套路来做的。

这里就拿个较为简单的 PrintSinkFunction 源码来讲下:

代码语言:javascript
代码运行次数:0
运行
复制
@PublicEvolving
public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
   private static final long serialVersionUID = 1L;

   private static final boolean STD_OUT = false;
   private static final boolean STD_ERR = true;

   private boolean target;
   private transient PrintStream stream;
   private transient String prefix;

   /**
    * Instantiates a print sink function that prints to standard out.
    */
   public PrintSinkFunction() {}

   /**
    * Instantiates a print sink function that prints to standard out.
    *
    * @param stdErr True, if the format should print to standard error instead of standard out.
    */
   public PrintSinkFunction(boolean stdErr) {
       target = stdErr;
   }

   public void setTargetToStandardOut() {
       target = STD_OUT;
   }

   public void setTargetToStandardErr() {
       target = STD_ERR;
   }

   @Override
   public void open(Configuration parameters) throws Exception {
       super.open(parameters);
       StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
       // get the target stream
       stream = target == STD_OUT ? System.out : System.err;

       // set the prefix if we have a >1 parallelism
       prefix = (context.getNumberOfParallelSubtasks() > 1) ?
               ((context.getIndexOfThisSubtask() + 1) + "> ") : null;
   }

   @Override
   public void invoke(IN record) {
       if (prefix != null) {
           stream.println(prefix + record.toString());
       }
       else {
           stream.println(record.toString());
       }
   }

   @Override
   public void close() {
       this.stream = null;
       this.prefix = null;
   }

   @Override
   public String toString() {
       return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
   }
}

可以看到它就是实现了 RichSinkFunction 抽象类,然后实现了 invoke 方法,这里 invoke 方法就是把记录打印出来了就是,没做其他的额外操作。

How to use?

代码语言:javascript
代码运行次数:0
运行
复制
SingleOutputStreamOperator.addSink(new PrintSinkFunction<>();

这样就可以了,如果是其他的 Sink Function 的话需要换成对应的。

使用这个 Function 其效果就是打印从 Source 过来的数据,和直接 Source.print() 效果一样。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020.05.04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flink Data Sink
  • SinkFunction
  • How to use?
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档