前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink DataStream 内置数据源和外部数据源

Flink DataStream 内置数据源和外部数据源

作者头像
kk大数据
发布2019-10-10 15:42:14
2.8K0
发布2019-10-10 15:42:14
举报
文章被收录于专栏:kk大数据kk大数据

1

内置数据源

(1)文件数据源

在 StreamExecutionEnvironment 中,可以使用 readTextFile 方法直接读取文本文件,也可以使用 readFile 方法通过指定文件 InputFormat 来读取特定数据类型的文件,如 CsvInputFormat。

下面的代码演示了使用 readTextFile 读取文本文件

代码语言:javascript
复制
import org.apache.flink.streaming.api.scala._

object Flink9 extends App {

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.readTextFile("d://1.txt")
    .print()

  env.execute("read_textfile")

}

(2)Socket 数据源

代码语言:javascript
复制
env.socketTextStream("localhost",9999)

在 unix 环境下,可以执行 nc -lk 9999 命令,启动端口,在客户端中输入数据,flink 就能接收到数据了

(3)集合数据源

可以直接将 Java 或 Scala 程序中的集合类 转换成 DataStream 数据集,本质上是将本地集合中的数据分发到远端并行执行的节点中。如:

代码语言:javascript
复制
val dataStream = env.fromElements(Tuple2(1L,3L),Tuple2(1L,5L))
val dataStream2 = env.fromCollection(List(1,2,3))

2

外部数据源

前面的数据源类型都是非常基础的数据接入方式,例如从文件,Socket 端口中接入数据,其本质是实现了不同的 SourceFunction,Flink 将其封装成高级的 API,减少了用户的使用成本。

企业中,大部分都是使用高性能的第三方存储介质和中间件,比如 Kafka,Elasticsearch,RabbitMQ 等。

下面以 Kafka 为例,来说明如何使用 kafka 作为 输入源。

首先,需要在 pom.xml 文件中引入依赖

代码语言:javascript
复制
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
   <version>1.8.0</version>
</dependency>
代码语言:javascript
复制

(我这里使用的是 kafka 0.10 ,flink 1.8.0 版本的)

引入 maven 配置后,就可以在 Flink 应用工程中创建和使用相应的 Connector了,主要的参数有 kafka topic,bootstrap.servers,zookeeper.connect,另外 Schema 参数的主要作用是根据事先定义好的 Schema 信息将数据序列化成该 Schema 定义的数据类型,默认是 SimpleStreamSchema,代表从 Kafka 中接入的数据转换成 String 类型。

代码语言:javascript
复制
package com.dsj361

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

object Flink10 extends App {
  
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val properties = new Properties()
  properties.setProperty("bootstrap.servers","192.168.17.21:9092")
  properties.setProperty("zookeeper.connect","192.168.17.22:2181")
  properties.setProperty("group.id","test")
  
  val input =
    env.addSource(
      new FlinkKafkaConsumer010[String](
        "topic1",
        new SimpleStringSchema(),
        properties))
  
  input.print()
  
  env.execute("connect_kafka")
  
}

同时,我们可以把 从 kafka 读到的数据反序列化成我们期望的格式,主要是实现 DeserializationSchema 来完成。

Flink 中已经实现了大多数主流的数据源连接器,但是 Flink 的整体架构非常开放,用户可以自定义连接器,以满足不同数据源的接入需求。

可以通过实现 SourceFunction 定义单个线程的数据接入器,也可以通过实现 ParallelSourceFunction 接口 或者继承 RichParallelSourceFunction 类定义并发数据源接入器

(关于 kafka 的接入会单独开辟一张来讲解)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 KK架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档