前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的CsvTableSource

聊聊flink的CsvTableSource

原创
作者头像
code4it
发布2019-02-05 09:44:00
1.3K0
发布2019-02-05 09:44:00
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink的CsvTableSource

TableSource

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/TableSource.scala

代码语言:javascript
复制
trait TableSource[T] {
​
  /** Returns the [[TypeInformation]] for the return type of the [[TableSource]].
    * The fields of the return type are mapped to the table schema based on their name.
    *
    * @return The type of the returned [[DataSet]] or [[DataStream]].
    */
  def getReturnType: TypeInformation[T]
​
  /**
    * Returns the schema of the produced table.
    *
    * @return The [[TableSchema]] of the produced table.
    */
  def getTableSchema: TableSchema
​
  /**
    * Describes the table source.
    *
    * @return A String explaining the [[TableSource]].
    */
  def explainSource(): String =
    TableConnectorUtil.generateRuntimeName(getClass, getTableSchema.getFieldNames)
}
  • TableSource定义了三个方法,分别是getReturnType、getTableSchema、explainSource

BatchTableSource

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/BatchTableSource.scala

代码语言:javascript
复制
trait BatchTableSource[T] extends TableSource[T] {
​
  /**
    * Returns the data of the table as a [[DataSet]].
    *
    * NOTE: This method is for internal use only for defining a [[TableSource]].
    *       Do not use it in Table API programs.
    */
  def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
}
  • BatchTableSource继承了TableSource,它定义了getDataSet方法

StreamTableSource

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/StreamTableSource.scala

代码语言:javascript
复制
trait StreamTableSource[T] extends TableSource[T] {
​
  /**
    * Returns the data of the table as a [[DataStream]].
    *
    * NOTE: This method is for internal use only for defining a [[TableSource]].
    *       Do not use it in Table API programs.
    */
  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}
  • StreamTableSource继承了TableSource,它定义了getDataStream方法

CsvTableSource

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/CsvTableSource.scala

代码语言:javascript
复制
class CsvTableSource private (
    private val path: String,
    private val fieldNames: Array[String],
    private val fieldTypes: Array[TypeInformation[_]],
    private val selectedFields: Array[Int],
    private val fieldDelim: String,
    private val rowDelim: String,
    private val quoteCharacter: Character,
    private val ignoreFirstLine: Boolean,
    private val ignoreComments: String,
    private val lenient: Boolean)
  extends BatchTableSource[Row]
  with StreamTableSource[Row]
  with ProjectableTableSource[Row] {
​
  def this(
    path: String,
    fieldNames: Array[String],
    fieldTypes: Array[TypeInformation[_]],
    fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
    rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
    quoteCharacter: Character = null,
    ignoreFirstLine: Boolean = false,
    ignoreComments: String = null,
    lenient: Boolean = false) = {
​
    this(
      path,
      fieldNames,
      fieldTypes,
      fieldTypes.indices.toArray, // initially, all fields are returned
      fieldDelim,
      rowDelim,
      quoteCharacter,
      ignoreFirstLine,
      ignoreComments,
      lenient)
​
  }
​
  def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) = {
    this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
      CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
  }
​
  if (fieldNames.length != fieldTypes.length) {
    throw new TableException("Number of field names and field types must be equal.")
  }
​
  private val selectedFieldTypes = selectedFields.map(fieldTypes(_))
  private val selectedFieldNames = selectedFields.map(fieldNames(_))
​
  private val returnType: RowTypeInfo = new RowTypeInfo(selectedFieldTypes, selectedFieldNames)
​
  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
    execEnv.createInput(createCsvInput(), returnType).name(explainSource())
  }
​
  /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */
  override def getReturnType: RowTypeInfo = returnType
​
  override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = {
    streamExecEnv.createInput(createCsvInput(), returnType).name(explainSource())
  }
​
  /** Returns the schema of the produced table. */
  override def getTableSchema = new TableSchema(fieldNames, fieldTypes)
​
  /** Returns a copy of [[TableSource]] with ability to project fields */
  override def projectFields(fields: Array[Int]): CsvTableSource = {
​
    val selectedFields = if (fields.isEmpty) Array(0) else fields
​
    new CsvTableSource(
      path,
      fieldNames,
      fieldTypes,
      selectedFields,
      fieldDelim,
      rowDelim,
      quoteCharacter,
      ignoreFirstLine,
      ignoreComments,
      lenient)
  }
​
  private def createCsvInput(): RowCsvInputFormat = {
    val inputFormat = new RowCsvInputFormat(
      new Path(path),
      selectedFieldTypes,
      rowDelim,
      fieldDelim,
      selectedFields)
​
    inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
    inputFormat.setLenient(lenient)
    if (quoteCharacter != null) {
      inputFormat.enableQuotedStringParsing(quoteCharacter)
    }
    if (ignoreComments != null) {
      inputFormat.setCommentPrefix(ignoreComments)
    }
​
    inputFormat
  }
​
  override def equals(other: Any): Boolean = other match {
    case that: CsvTableSource => returnType == that.returnType &&
        path == that.path &&
        fieldDelim == that.fieldDelim &&
        rowDelim == that.rowDelim &&
        quoteCharacter == that.quoteCharacter &&
        ignoreFirstLine == that.ignoreFirstLine &&
        ignoreComments == that.ignoreComments &&
        lenient == that.lenient
    case _ => false
  }
​
  override def hashCode(): Int = {
    returnType.hashCode()
  }
​
  override def explainSource(): String = {
    s"CsvTableSource(" +
      s"read fields: ${getReturnType.getFieldNames.mkString(", ")})"
  }
}
  • CsvTableSource同时实现了BatchTableSource及StreamTableSource接口;getDataSet方法使用ExecutionEnvironment.createInput创建DataSet;getDataStream方法使用StreamExecutionEnvironment.createInput创建DataStream
  • ExecutionEnvironment.createInput及StreamExecutionEnvironment.createInput接收的InputFormat为RowCsvInputFormat,通过createCsvInput创建而来
  • getTableSchema方法返回的TableSchema通过fieldNames及fieldTypes创建;getReturnType方法返回的RowTypeInfo通过selectedFieldTypes及selectedFieldNames创建;explainSource方法这里返回的是CsvTableSource开头的字符串

小结

  • TableSource定义了三个方法,分别是getReturnType、getTableSchema、explainSource;BatchTableSource继承了TableSource,它定义了getDataSet方法;StreamTableSource继承了TableSource,它定义了getDataStream方法
  • CsvTableSource同时实现了BatchTableSource及StreamTableSource接口;getDataSet方法使用ExecutionEnvironment.createInput创建DataSet;getDataStream方法使用StreamExecutionEnvironment.createInput创建DataStream
  • ExecutionEnvironment.createInput及StreamExecutionEnvironment.createInput接收的InputFormat为RowCsvInputFormat,通过createCsvInput创建而来;getTableSchema方法返回的TableSchema通过fieldNames及fieldTypes创建;getReturnType方法返回的RowTypeInfo通过selectedFieldTypes及selectedFieldNames创建;explainSource方法这里返回的是CsvTableSource开头的字符串

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TableSource
    • BatchTableSource
      • StreamTableSource
      • CsvTableSource
      • 小结
      • doc
      相关产品与服务
      批量计算
      批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档