前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >客快物流大数据项目(六十):将消费的kafka数据转换成bean对象

客快物流大数据项目(六十):将消费的kafka数据转换成bean对象

作者头像
Lansonli
发布2022-03-09 09:02:24
4520
发布2022-03-09 09:02:24
举报
文章被收录于专栏:Lansonli技术博客

目录

将消费的kafka数据转换成bean对象

一、将OGG数据转换成bean对象

二、​​​​​​​将Canal数据转换成bean对象

三、完整代码

将消费的kafka数据转换成bean对象

一、​​​​​​​将OGG数据转换成bean对象

实现步骤:

  • 消费kafka的 logistics Topic数据
  • 将消费到的数据转换成OggMessageBean对象
  • 递交作业启动运行

实现过程:

  • 消费kafka的 logistics Topic数据
代码语言:javascript
复制
//2.1:获取物流系统相关的数据
val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)
  • 将消费到的数据转换成OggMessageBean对象
    • 默认情况下表名带有数据库名,因此需要删除掉数据库名
代码语言:javascript
复制
//3.1:物流相关数据的转换
val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
  iters.map(row => {
    //获取到value列的值(字符串)
    val jsonStr: String = row.getAs[String](0)
    //将字符串转换成javabean对象
    JSON.parseObject(jsonStr, classOf[OggMessageBean])
  }).toList.iterator
})(Encoders.bean(classOf[OggMessageBean]))
  • 递交作业启动运行
代码语言:javascript
复制
// 设置Streaming应用输出及启动
logisticsDF.writeStream.outputMode(OutputMode.Update())
  .format("console").queryName("logistics").start()

二、​​​​​​​将Canal数据转换成bean对象

实现步骤:

  • 消费kafka的 crm Topic数据
  • 将消费到的数据转换成 CanalMessageBean 对象
  • 递交作业启动运行

实现过程:

  • 消费kafka的 crm Topic数据
代码语言:javascript
复制
//2.2:获取客户关系系统相关的数据
val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)
  • 将消费到的数据转换成CanalMessageBean 对象
代码语言:javascript
复制
//3.2:客户关系相关数据的转换
val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
  //canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
  iters.filter(row=>{
    //取到value列的数据
    val line: String = row.getAs[String](0)
    //如果value列的值不为空,且是清空表的操作
    if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
  }).map(row=>{
    //取到value列的数据
    val jsonStr: String = row.getAs[String](0)
    //将json字符串转换成javaBean对象
    JSON.parseObject(jsonStr, classOf[CanalMessageBean])
  }).toList.toIterator
})(Encoders.bean(classOf[CanalMessageBean]))
  • 递交作业启动运行
代码语言:javascript
复制
crmDF.writeStream.outputMode(OutputMode.Update())
  .format("console").queryName("crm").start()

三、完整代码

代码语言:javascript
复制
package cn.it.logistics.etl.realtime
import java.sql.Connection

import cn.it.logistics.common.{Configuration, SparkUtils, TableMapping, Tools}
import cn.it.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean}
import cn.it.logistics.etl.parser.DataParser
import com.alibaba.fastjson.JSON
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}

/**
 * 实现KUDU数据库的实时ETL操作
 */
object KuduStreamApp2 extends StreamApp {

  /**
   * 入口方法
   * @param args
   */
  def main(args: Array[String]): Unit = {
    //创建sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(this.getClass.getSimpleName)
    )

    //数据处理
    execute(sparkConf)
  }

  /**
   * 数据的处理
   *
   * @param sparkConf
   */
  override def execute(sparkConf: SparkConf): Unit = {
    /**
     * 实现步骤:
     * 1)创建sparksession对象
     * 2)获取数据源(获取物流相关数据以及crm相关数据)
     * 3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
     * 4)抽取每条数据的字段信息
     * 5)将过滤出来的每张表写入到kudu数据库
     */
    //1)创建sparksession对象
    val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //2)获取数据源(获取物流相关数据以及crm相关数据)
    //2.1:获取物流系统相关的数据
    val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)

    //2.2:获取客户关系系统相关的数据
    val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)

    //导入隐式转换
    import  sparkSession.implicits._

    //导入自定义的POJO的隐士转换
    import  cn.itcast.logistics.common.BeanImplicit._

    //3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
    //3.1:物流相关数据的转换
    val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
      iters.map(row => {
        //获取到value列的值(字符串)
        val jsonStr: String = row.getAs[String](0)
        //将字符串转换成javabean对象
        JSON.parseObject(jsonStr, classOf[OggMessageBean])
      }).toList.iterator
    })(Encoders.bean(classOf[OggMessageBean]))

    //3.2:客户关系相关数据的转换
    val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
      //canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
      iters.filter(row=>{
        //取到value列的数据
        val line: String = row.getAs[String](0)
        //如果value列的值不为空,且是清空表的操作
        if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
      }).map(row=>{
        //取到value列的数据
        val jsonStr: String = row.getAs[String](0)
        //将json字符串转换成javaBean对象
        JSON.parseObject(jsonStr, classOf[CanalMessageBean])
      }).toList.toIterator
    })(Encoders.bean(classOf[CanalMessageBean]))

    //输出数据
    /**
     * +--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
     * |               after|              before|          current_ts|               op_ts|op_type|                 pos|              table|
     * +--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
     * |[eid -> [], cdt -...|[eid -> [], cdt -...|2020-10-10T02:35:...|2020-10-10 02:35:...|      U|00000000200006647808|tbl_collect_package|
     * +--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
     */
    logsticsMessageBean.writeStream.outputMode(OutputMode.Update()).format("console").queryName("logistics").start()

    /**
     * +--------------------+--------+-----+-------------+---+--------------------+------------------+---+--------------------+-----------+-------------+------+
     * |                data|database|  ddl|           es| id|           mysqlType|               old|sql|             sqlType|      table|           ts|  type|
     * +--------------------+--------+-----+-------------+---+--------------------+------------------+---+--------------------+-----------+-------------+------+
     * |[[cdt -> [], gis_...|     crm|false|1602297244000| 18|[cdt -> [], gis_a...|[ {"gis_addr":"1"}]|   |[cdt -> [], gis_a...|tbl_address|1602297244211|UPDATE|
     * +--------------------+--------+-----+-------------+---+--------------------+------------------+---+--------------------+-----------+-------------+------+
     */
    crmMessageBean.writeStream.outputMode(OutputMode.Update()).format("console").queryName("crm").start()

    //8)启动运行等待停止
    val stream = sparkSession.streams
    //stream.active:获取当前活动流式查询的列表
    stream.active.foreach(query => println(s"准备启动的查询:${query.name}"))
    //线程阻塞,等待终止
    stream.awaitAnyTermination()
  }

  /**
   * 数据的保存
   * @param dataFrame
   * @param tableName
   * @param isAutoCreateTable
   */
  override def save(dataFrame: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
  }
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/03/09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 将消费的kafka数据转换成bean对象
    • 一、​​​​​​​将OGG数据转换成bean对象
      • 二、​​​​​​​将Canal数据转换成bean对象
        • 三、完整代码
        相关产品与服务
        数据库
        云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档