专栏首页kk大数据Flink DataStream 内置数据源和外部数据源

Flink DataStream 内置数据源和外部数据源

1

内置数据源

(1)文件数据源

在 StreamExecutionEnvironment 中,可以使用 readTextFile 方法直接读取文本文件,也可以使用 readFile 方法通过指定文件 InputFormat 来读取特定数据类型的文件,如 CsvInputFormat。

下面的代码演示了使用 readTextFile 读取文本文件

import org.apache.flink.streaming.api.scala._

object Flink9 extends App {

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.readTextFile("d://1.txt")
    .print()

  env.execute("read_textfile")

}

(2)Socket 数据源

env.socketTextStream("localhost",9999)

在 unix 环境下,可以执行 nc -lk 9999 命令,启动端口,在客户端中输入数据,flink 就能接收到数据了

(3)集合数据源

可以直接将 Java 或 Scala 程序中的集合类 转换成 DataStream 数据集,本质上是将本地集合中的数据分发到远端并行执行的节点中。如:

val dataStream = env.fromElements(Tuple2(1L,3L),Tuple2(1L,5L))
val dataStream2 = env.fromCollection(List(1,2,3))

2

外部数据源

前面的数据源类型都是非常基础的数据接入方式,例如从文件,Socket 端口中接入数据,其本质是实现了不同的 SourceFunction,Flink 将其封装成高级的 API,减少了用户的使用成本。

企业中,大部分都是使用高性能的第三方存储介质和中间件,比如 Kafka,Elasticsearch,RabbitMQ 等。

下面以 Kafka 为例,来说明如何使用 kafka 作为 输入源。

首先,需要在 pom.xml 文件中引入依赖

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
   <version>1.8.0</version>
</dependency>

(我这里使用的是 kafka 0.10 ,flink 1.8.0 版本的)

引入 maven 配置后,就可以在 Flink 应用工程中创建和使用相应的 Connector了,主要的参数有 kafka topic,bootstrap.servers,zookeeper.connect,另外 Schema 参数的主要作用是根据事先定义好的 Schema 信息将数据序列化成该 Schema 定义的数据类型,默认是 SimpleStreamSchema,代表从 Kafka 中接入的数据转换成 String 类型。

package com.dsj361

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

object Flink10 extends App {
  
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val properties = new Properties()
  properties.setProperty("bootstrap.servers","192.168.17.21:9092")
  properties.setProperty("zookeeper.connect","192.168.17.22:2181")
  properties.setProperty("group.id","test")
  
  val input =
    env.addSource(
      new FlinkKafkaConsumer010[String](
        "topic1",
        new SimpleStringSchema(),
        properties))
  
  input.print()
  
  env.execute("connect_kafka")
  
}

同时,我们可以把 从 kafka 读到的数据反序列化成我们期望的格式,主要是实现 DeserializationSchema 来完成。

Flink 中已经实现了大多数主流的数据源连接器,但是 Flink 的整体架构非常开放,用户可以自定义连接器,以满足不同数据源的接入需求。

可以通过实现 SourceFunction 定义单个线程的数据接入器,也可以通过实现 ParallelSourceFunction 接口 或者继承 RichParallelSourceFunction 类定义并发数据源接入器

(关于 kafka 的接入会单独开辟一张来讲解)

本文分享自微信公众号 - kk大数据(kkbigdata),作者:kk大数据

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink 16种数据转换操作,满足所有数据处理场景

    数据转换(Transformation),即通过从一个或多个 DataStream 生成新的DataStream 的过程,是主要的数据处理的手段。Flink 提...

    kk大数据
  • Java虚拟机:我们写的java代码究竟是如何运行起来的

    首先假设咱们写好了一份Java代码,那这份Java代码中,是不是会包含很多的“.java”为后缀的代码文件?

    kk大数据
  • 高工也要补基础,wait,notify,join

    wait 和 notify 方法属于线程间通讯。所谓线程间通讯,是指线程 A 完成了一个动作,通知线程 B 可以继续做某动作了。

    kk大数据
  • 网游IP改编的是是非非,四大视角深度解读

    image.png 田小军  腾讯研究院版权研究中心副秘书长   曹建峰  腾讯研究院研究员   2015年,我国上市游戏企业达到171家,市值...

    腾讯研究院
  • Ceph 部署完整版 ( el7+jewel )

    这篇文章主要介绍了如何用三台虚拟机搭建一套 Ceph 分布式系统,步骤简洁但不失准确性。环境清理一小节可以解决绝大多数部署不成功的问题,最后一节介绍了常用的 C...

    腾讯云TStack
  • ​《七天数据可视化之旅》第三天:数据图表的选择(中)

    Destiny,某物流公司数据产品经理,目前从事数据平台搭建和可视化相关的工作。持续学习中,期望与大家多多交流数据相关的技术和实际应用,共同成长。

    木东居士
  • 哪些年从事自动化测试需要经历的往事

    在实战中,又去接触了watir,即ruby版的selenium,又是一通的直接看源码,顺便学习和了解ruby

    苦叶子
  • Java常见排序算法详解——冒泡排序

    我们在面试的时候时常会问到我们算法题,而算法题当中排序算法题是问到最多的。应广大同学的建议,我特意整理了一下Java常见的排序算法,我尽量从概念,原理,代码这几...

    Demo_Yang
  • 基于Docker部署ceph分布式文件系统(Luminous版本)

    本文记录分享了ceph分布式文件系统的详尽部署过程,既是为了方便自己后续回溯,也希望能给初次接触ceph的同学提供些许参考。

    张戈
  • 适用于IDA Pro的CGEN框架介绍

    一切都始于我想要分析一些MeP代码的时候。我通常在IDA Pro中做逆向工作,但是有一小部分处理器IDA并不支持。幸运的是,objdump可以支持这些小众的处理...

    FB客服

扫码关注云+社区

领取腾讯云代金券