前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >客快物流大数据项目(五十七):创建Kudu-ETL流式计算程序

客快物流大数据项目(五十七):创建Kudu-ETL流式计算程序

作者头像
Lansonli
发布2022-03-07 10:48:23
3910
发布2022-03-07 10:48:23
举报
文章被收录于专栏:Lansonli技术博客

创建Kudu-ETL流式计算程序

实现步骤:

  • realtime目录创建 KuduStreamApp 单例对象,继承自 StreamApp 特质
  • 重写特质内的方法
  • 编写代码接入kafka集群消费其数据
代码语言:javascript
复制
package cn.it.logistics.etl.realtime
import cn.itcast.logistics.common.{Configuration, SparkUtils}
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}

/**
 * Kudu数据管道应用
 * 实现KUDU数据库的实时ETL操作
 */
object KuduStreamApp extends StreamApp {

  /**
   * 入口方法
   * @param args
   */
  def main(args: Array[String]): Unit = {
    //创建sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(this.getClass.getSimpleName)
    )

    //数据处理
    execute(sparkConf)
  }

  /**
   * 数据的处理
   *
   * @param sparkConf
   */
  override def execute(sparkConf: SparkConf): Unit = {
    /**
     * 实现步骤:
     * 1)创建sparksession对象
     * 2)获取数据源(获取物流相关数据以及crm相关数据)
     * 3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
     * 4)抽取每条数据的字段信息
     * 5)将过滤出来的每张表写入到kudu数据库
     */
    //1)创建sparksession对象
    val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //2)获取数据源(获取物流相关数据以及crm相关数据)
    //2.1:获取物流系统相关的数据
    val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)

    //2.2:获取客户关系系统相关的数据
    val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)

    // 设置Streaming应用输出及启动
    logisticsDF.writeStream.outputMode(OutputMode.Update())
      .format("console").queryName("logistics").start()

    crmDF.writeStream.outputMode(OutputMode.Update())
      .format("console").queryName("crm").start()

    //8)启动运行等待停止
    val stream = sparkSession.streams
    //stream.active:获取当前活动流式查询的列表
    stream.active.foreach(query => println(s"准备启动的查询:${query.name}"))
    //线程阻塞,等待终止
    stream.awaitAnyTermination()
  }

  /**
   * 数据的保存
   * @param dataFrame
   * @param tableName
   * @param isAutoCreateTable
   */
  override def save(dataFrame: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
  }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/03/06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 创建Kudu-ETL流式计算程序
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档