前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >005. Flink DataSource API

005. Flink DataSource API

作者头像
CoderJed
发布2019-08-09 18:07:57
6990
发布2019-08-09 18:07:57
举报
文章被收录于专栏:Jed的技术阶梯Jed的技术阶梯

1. 从本地集合获取数据

import org.apache.flink.api.scala._

/**
  * author: YangYunhe
  * date: 2019/8/3 18:59
  * description: 从本地集合中获取数据
  */
object CollectionSource {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val ds1: DataSet[String] = env.fromElements[String]("Tom Jack", "Tony", "Bob Lucy Jed")
    val ds2: DataSet[Int] = env.fromCollection(Seq(1, 2, 3, 3))
    val ds3: DataSet[Int] = env.fromCollection(List(1, 2, 3, 3))
    val ds4: DataSet[Int] = env.fromCollection(Set(1, 2, 3, 3))
    val ds5: DataSet[(String, String)] = env.fromCollection(Map("name" -> "Tom", "sex" -> "male"))
    val ds6: DataSet[Long] = env.generateSequence(1L, 100L)  // [1, 100]的序列


    println("ds1: ")
    ds1.print()
    println()

    println("ds2: ")
    ds2.print()
    println()

    println("ds3: ")
    ds3.print()
    println()

    println("ds4: ")
    ds4.print()
    println()

    println("ds5: ")
    ds5.print()
    println()

  }

}

# 运行结果:

ds1: 
Tom Jack
Tony
Bob Lucy Jed

ds2: 
1
2
3
3

ds3: 
1
2
3
3

ds4: 
1
2
3

ds5: 
(name,Tom)
(sex,male)

2. 读文件获取数据

import org.apache.flink.api.scala._
import org.apache.flink.types.StringValue

/**
  * author: YangYunhe
  * date: 2019/8/3 19:09
  * description: 从文件中获取数据
  */
object FileSource {

  case class Student(id: String, name: String, age: Int, sex: String)

  def main(args: Array[String]): Unit = {

    val localPath1 = "D:\\space\\idea\\course\\learning-flink\\inputPath\\words.txt"
    val localPath2 = "D:\\space\\idea\\course\\learning-flink\\inputPath\\nums.txt"
    val hdfsPath = "hdfs://beh07:9000/flink/input/students.txt"

    val env = ExecutionEnvironment.getExecutionEnvironment

    // 1. 按行读取文件并将它们作为字符串返回
    val ds1: DataSet[String] = env.readTextFile(localPath1, "UTF-8")

    // 2. 读取文件中数据的原始类型
    val ds2: DataSet[Int] = env.readFileOfPrimitives[Int](localPath2, ",")

    // 3. 按行读取文件并将它们作为StringValues返回,StringValues是可变字符串
    val ds3: DataSet[StringValue] = env.readTextFileWithValue(localPath1, "UTF-8")

    // 4. 读取有标准结构化的数据(例如CSV文件)
    val ds4: DataSet[Student] = env.readCsvFile(
      filePath = hdfsPath, // 文件路径
      lineDelimiter = "\n", // 指定行分隔符,默认'\n'
      fieldDelimiter = ",", // 指定列分隔符,默认','
      /*
       * quoteCharacter: Character
       * 设置一个引号字符,启用带引号的字符串解析
       * 如果字段的第一个字符是引号字符,则字符串将被解析为带引号的字符串,引号字符串中的字段分隔符将被忽略
       * 如果带引号的字符串字段的最后一个字符不是引号字符,则引用的字符串解析将会失败
       * 如果启用了带引号的字符串解析并且该字段的第一个字符不是引号字符串,则该字符串将被解析为不带引号的字符串
       * 默认情况下,禁用带引号的字符串解析
       */
      quoteCharacter = null,
      ignoreFirstLine = false, // 是否忽略第一行,默认为false
      ignoreComments = null, // 设置注释的符号,例如设置为"#",那么#开头的数据都不会读取,默认不开启此功能
      lenient = false, // 是否启用宽松解析,即忽略无法正确解析的行,默认为false
      includedFields = Array[Int](0, 1, 2, 3), // Array[Int],定义从输入文件中读取的字段的下标,默认全部读取
      pojoFields = Array[String]("id", "name", "age", "sex") // Array[String],指定映射到CSV字段的POJO的字段,CSV字段的解析器将根据POJO字段的类型和顺序自动初始化
    )

    ds1.print()
    println("----------------")
    ds2.print()
    println("----------------")
    ds4.print()

  }

}

# localPath1文件中的内容为:
Tom Tony Jack Jed Tom
Tony Jed Bob Tony Jed
Tom Harry James Bob Gary
Allen Kobe Tom Kobe Bob
Ben Allen Jed Tom Tom

# localPath2文件中的内容为:
0,1,2,3

# hdfsPath文件中的内容为:
0001,Tom,18,男,23.4
0002,Bob,19,男,21.2
0003,Jack,32,男,78.1
0004,Jed,27,男,99.9

# 运行结果:
Tom Harry James Bob Gary
Tom Tony Jack Jed Tom
Allen Kobe Tom Kobe Bob
Ben Allen Jed Tom Tom
Tony Jed Bob Tony Jed
----------------
1
0
2
3
----------------
Student(0001,Tom,18,男)
Student(0004,Jed,27,男)
Student(0002,Bob,19,男)
Student(0003,Jack,32,男)

注意:本地访问HDFS路径需要添加hadoop-client依赖

3. 监听网络端口

import org.apache.flink.streaming.api.scala._

/**
  * author: YangYunhe
  * date: 2019/8/8 20:55
  * description: 监听网络端口,流式数据源
  */
object SocketSource {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 指定监听的主机名、端口、行分隔符以及重试的时间间隔
    val source: DataStream[String] = env.socketTextStream("beh07", 9999, '\n', 1)

    source.print().setParallelism(1)

    env.execute("SocketSource")

  }

}

# 监听端口处输入:
[hadoop@beh07 data]$ nc -lk 9999
hello flink

# 程序输出:
hello flink

4. 自定义数据源

以读取MySQL中的数据为例

  • 首先完成自定义Source类的开发
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}

/**
  * 与表结构对应的实体类
  */
case class Student(id: String, name: String, age: Int, score: Double)

/**
  * author: YangYunhe
  * date: 2019/7/29 14:26
  * description: 自定义的数据源,从MySQL中读取数据
  */
class JDBCSource extends RichSourceFunction[Student] {

  val DRIVER = "com.mysql.jdbc.Driver"
  val URL = "jdbc:mysql://beh07:3306/test?characterEncoding=UTF-8" // &
  val USER = "root"
  val PASSWORD = "root"

  private var statement: PreparedStatement = _
  private var conn: Connection = _

  /**
    * 在整个Source对象初始化之后执行
    */
  override def open(parameters: Configuration): Unit = {
    Class.forName(DRIVER)
    conn = DriverManager.getConnection(URL, USER, PASSWORD)
    val sql = "SELECT * FROM students"
    statement = conn.prepareStatement(sql)
  }


  /**
    * 不停的执行,发送数据到下游
    */
  override def run(ctx: SourceFunction.SourceContext[Student]): Unit = {

    val resultSet: ResultSet = statement.executeQuery()

    while(resultSet.next()) {

      val id = resultSet.getString("id")
      val name = resultSet.getString("name")
      val age = resultSet.getInt("age")
      val score = resultSet.getDouble("score")

      // 数据封装到Student对象中
      ctx.collect(Student(id, name, age, score))

    }

  }

  /**
    * 取消发送数据
    */
  override def cancel(): Unit = {

  }

  /**
    * 关闭数据源
    */
  override def close(): Unit = {
    statement.close()
    conn.close()
  }

}
  • 然后运行主程序读取数据
import org.apache.flink.streaming.api.scala._

/**
  * author: YangYunhe
  * date: 2019/7/29 15:40
  * description: 从MySQL批量读取数据的主程序
  */
object JDBCSourceApp {

  def main(args: Array[String]): Unit = {

    // 注意,即使是批量读取,也需要使用StreamExecutionEnvironment对象
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val source = environment.addSource(new JDBCSource)

    source.print().setParallelism(1)

    environment.execute("JDBCSourceApp")

  }

}

# 运行结果:
Student(001,Tom,23,99.8)
Student(002,Tony,21,98.1)
Student(003,Jed,24,97.1)
Student(004,Bob,21,98.2)
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.08.08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 从本地集合获取数据
  • 2. 读文件获取数据
  • 3. 监听网络端口
  • 4. 自定义数据源
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档