前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >客快物流大数据项目(五十五):封装公共接口(根据存储介质抽取特质)

客快物流大数据项目(五十五):封装公共接口(根据存储介质抽取特质)

作者头像
Lansonli
发布2022-03-04 10:03:20
2470
发布2022-03-04 10:03:20
举报
文章被收录于专栏:Lansonli技术博客

目录

封装公共接口(根据存储介质抽取特质)

封装公共接口(根据存储介质抽取特质)

Structured Streaming 流处理程序消费kafka数据以后,会将数据分别存储到Kudu、ES、ClickHouse中,因此可以根据存储介质不同,封装其公共接口,每个流处理程序继承自该接口

实现步骤:

  • etl模块realtime 包下创建 StreamApp  特质
  • 实现方法:创建读取kafka集群指定主题的数据
  • 实现方法:创建execute方法
  • 实现方法:创建save方法
代码语言:javascript
复制
package cn.it.logistics.etl.realtime

import cn.it.logistics.common.Configuration
import org.apache.kafka.common.internals.Topic
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 这是所有ETL流式处理的基类
 * kudu、es、ck都要实现这个特质
 * 定义三个方法:
 * 1)读取数据
 * 2)处理数据
 * 3)保存数据
 */
trait StreamApp {

  /**
   * 读取数据的方法
   * @param sparkSession    SparkSession
   * @param topic           指定消费的主题
   * @param selectExpr      默认值:CAST(value AS STRING)
   */
  def getKafkaSource(sparkSession: SparkSession, topic: String, selectExpr:String = "CAST(value AS STRING)") = {
    sparkSession.readStream.format(Configuration.SPARK_KAFKA_FORMAT)
      .options(Map(
        "kafka.bootstrap.servers" -> Configuration.kafkaAddress,
        "subscribe" -> topic,
        "group.id" -> "logistics", //该参数可以省略,不需要指定(官网提到改参数不能设置: kafka的source会在每次query的时候自定创建唯一的group id)
        //表示数据丢失以后(topic被删除,或者offset不存在可用的范围的时候)
        "failOnDataLoss" -> "false"
      )).load().selectExpr(selectExpr)
  }

  /**
   * 数据的处理
   * @param sparkConf
   */
  def execute(sparkConf: SparkConf)

  /**
   * 数据的保存
   * @param dataFrame
   * @param tableName
   * @param isAutoCreateTable
   */
  def save(dataFrame:DataFrame, tableName:String, isAutoCreateTable:Boolean = true)
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/03/04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 封装公共接口(根据存储介质抽取特质)
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档