《从0到1学习Flink》——如何自定义 Data Source?

前言

《从0到1学习Flink》—— Data Source 介绍

文章中,我给大家介绍了 Flink Data Source 以及简短的介绍了一下自定义 Data Source,这篇文章更详细的介绍下,并写一个 demo 出来让大家理解。

Flink Kafka source

准备工作

我们先来看下 Flink 从 Kafka topic 中获取数据的 demo,首先你需要安装好了 FLink 和 Kafka 。

运行启动 Flink、Zookepeer、Kafka,

好了,都启动了!

maven 依赖

测试发送数据到 kafka topic

实体类,Metric.java

往 kafka 中写数据工具类:KafkaUtils.java

运行:

如果出现如上图标记的,即代表能够不断的往 kafka 发送数据的。

Flink 程序

Main.java

运行起来:

看到没程序,Flink 程序控制台能够源源不断的打印数据呢。

自定义 Source

上面就是 Flink 自带的 Kafka source,那么接下来就模仿着写一个从 MySQL 中读取数据的 Source。

首先 pom.xml 中添加 MySQL 依赖

数据库建表如下:

插入数据

新建实体类:Student.java

新建 Source 类SourceFromMySQL.java,该类继承 RichSourceFunction ,实现里面的 open、close、run、cancel 方法:

Flink 程序

运行 Flink 程序,控制台日志中可以看见打印的 student 信息。

RichSourceFunction

从上面自定义的 Source 可以看到我们继承的就是这个 RichSourceFunction 类,那么来了解一下:

一个抽象类,继承自 AbstractRichFunction。为实现一个 Rich SourceFunction 提供基础能力。该类的子类有三个,两个是抽象类,在此基础上提供了更具体的实现,另一个是 ContinuousFileMonitoringFunction。

MessageAcknowledgingSourceBase :它针对的是数据源是消息队列的场景并且提供了基于 ID 的应答机制。

MultipleIdsMessageAcknowledgingSourceBase : 在 MessageAcknowledgingSourceBase 的基础上针对 ID 应答机制进行了更为细分的处理,支持两种 ID 应答模型:session id 和 unique message id。

ContinuousFileMonitoringFunction:这是单个(非并行)监视任务,它接受 FileInputFormat,并且根据 FileProcessingMode 和 FilePathFilter,它负责监视用户提供的路径;决定应该进一步读取和处理哪些文件;创建与这些文件对应的 FileInputSplit 拆分,将它们分配给下游任务以进行进一步处理。

最后

本文主要讲了下 Flink 使用 Kafka Source 的使用,并提供了一个 demo 教大家如何自定义 Source,从 MySQL 中读取数据,当然你也可以从其他地方读取,实现自己的数据源 source。可能平时工作会比这个更复杂,需要大家灵活应对!

关注我

转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/10/30/flink-create-source/

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181120G1YJFD00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券