前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >客快物流大数据项目(五十三):实时ETL模块开发准备

客快物流大数据项目(五十三):实时ETL模块开发准备

作者头像
Lansonli
发布2022-03-02 08:50:51
4700
发布2022-03-02 08:50:51
举报
文章被收录于专栏:Lansonli技术博客

目录

实时ETL模块开发准备

一、编写配置文件

二、创建包结构

三、编写工具类加载配置文件

实时ETL模块开发准备

一、编写配置文件

  • 公共模块resources目录创建配置文件:config.properties
代码语言:javascript
复制
# CDH-6.2.1
bigdata.host=node2
# HDFS
dfs.uri=hdfs://node2:8020
# Local FS
local.fs.uri=file://
# Kafka
kafka.broker.host=node2
kafka.broker.port=9092
kafka.init.topic=kafka-topics --zookeeper node2:2181/kafka --create --replication-factor 1 --partitions 1 --topic logistics
kafka.logistics.topic=logistics
kafka.crm.topic=crm
# ZooKeeper
zookeeper.host=node2.
zookeeper.port=2181
# Kudu
kudu.rpc.host=node2
kudu.rpc.port=7051
kudu.http.host=node2
kudu.http.port=8051
# ClickHouse
clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
clickhouse.url=jdbc:clickhouse://node2:8123/logistics?characterEncoding=utf-8&useSSL=false
clickhouse.user=root
clickhouse.password=123456
# ElasticSearch
elasticsearch.host=node2
elasticsearch.rpc.port=9300
elasticsearch.http.port=9200
# Azkaban
app.first.runnable=true
# Oracle JDBC
db.oracle.url="jdbc:oracle:thin:@//192.168.88.10:1521/ORCL"
db.oracle.user=root
db.oracle.password=123456
# MySQL JDBC
db.mysql.driver=com.mysql.jdbc.Driver
db.mysql.url=jdbc:mysql://192.168.88.10:3306/crm?useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false
db.mysql.user=root
db.mysql.password=123456
## Data path of ETL program output ##
# Run in the yarn mode in Linux
spark.app.dfs.checkpoint.dir=/apps/logistics/dat-hdfs/spark-checkpoint
spark.app.dfs.data.dir=/apps/logistics/dat-hdfs/warehouse
spark.app.dfs.jars.dir=/apps/logistics/jars
# Run in the local mode in Linux
spark.app.local.checkpoint.dir=/apps/logistics/dat-local/spark-checkpoint
spark.app.local.data.dir=/apps/logistics/dat-local/warehouse
spark.app.local.jars.dir=/apps/logistics/jars
# Running in the local Mode in Windows
spark.app.win.checkpoint.dir=D://apps/logistics/dat-local/spark-checkpoint
spark.app.win.data.dir=D://apps/logistics/dat-local/warehouse
spark.app.win.jars.dir=D://apps/logistics/jars

二、​​​​​​​创建包结构

本次项目采用scala编程语言,因此创建scala目录

包名

说明

cn.it.logistics.etl.realtime

实时ETL程序所在包

cn.it.logistics.etl.parser

Canal和Ogg数据解析类所在包

三、编写工具类加载配置文件

实现步骤:

  • 公共模块scala目录下common包下创建 Configure  单例对象
  • 编写代码
    • 使用 ResourceBundle.getBundle 获取配置对象
    • 调用config.getString方法加载 config.properties 配置
    • 添加一个 main 方法测试,工具类是否能够正确读取出配置项

参考代码:

代码语言:javascript
复制
package cn.it.logistics.common

import java.util.{Locale, ResourceBundle}

/**
 * 读取配置文件的工具类
 */
class Configuration {
  /**
   * 定义配置文件操作的对象
   */
  private val resourceBundle: ResourceBundle = ResourceBundle.getBundle("config", new Locale("zh", "CN"))
  private val sep = ":"

  // CDH-6.2.1
  val bigdataHost: String = resourceBundle.getString("bigdata.host")
  // HDFS
  val dfsUri: String = resourceBundle.getString("dfs.uri")
  // Local FS
  val localFsUri: String = resourceBundle.getString("local.fs.uri")
  // Kafka
  val kafkaBrokerHost: String = resourceBundle.getString("kafka.broker.host")
  val kafkaBrokerPort: Int = Integer.valueOf(resourceBundle.getString("kafka.broker.port"))
  val kafkaInitTopic: String = resourceBundle.getString("kafka.init.topic")
  val kafkaLogisticsTopic: String = resourceBundle.getString("kafka.logistics.topic")
  val kafkaCrmTopic: String = resourceBundle.getString("kafka.crm.topic")
  val kafkaAddress = kafkaBrokerHost+sep+kafkaBrokerPort
  // Spark
  val LOG_OFF = "OFF"
  val LOG_DEBUG = "DEBUG"
  val LOG_INFO = "INFO"
  val LOCAL_HADOOP_HOME = "E:\\softs\\hadoop-3.0.0"
  val SPARK_KAFKA_FORMAT = "kafka"
  val SPARK_KUDU_FORMAT = "kudu"
  val SPARK_ES_FORMAT = "es"
  val SPARK_CLICKHOUSE_FORMAT = "clickhouse"
  // ZooKeeper
  val zookeeperHost: String = resourceBundle.getString("zookeeper.host")
  val zookeeperPort: Int = Integer.valueOf(resourceBundle.getString("zookeeper.port"))
  // Kudu
  val kuduRpcHost: String = resourceBundle.getString("kudu.rpc.host")
  val kuduRpcPort: Int = Integer.valueOf(resourceBundle.getString("kudu.rpc.port"))
  val kuduHttpHost: String = resourceBundle.getString("kudu.http.host")
  val kuduHttpPort: Int = Integer.valueOf(resourceBundle.getString("kudu.http.port"))
  val kuduRpcAddress = kuduRpcHost+sep+kuduRpcPort
  // ClickHouse
  val clickhouseDriver: String = resourceBundle.getString("clickhouse.driver")
  val clickhouseUrl: String = resourceBundle.getString("clickhouse.url")
  val clickhouseUser: String = resourceBundle.getString("clickhouse.user")
  val clickhousePassword: String = resourceBundle.getString("clickhouse.password")
  // ElasticSearch
  val elasticsearchHost: String = resourceBundle.getString("elasticsearch.host")
  val elasticsearchRpcPort: Int = Integer.valueOf(resourceBundle.getString("elasticsearch.rpc.port"))
  val elasticsearchHttpPort: Int = Integer.valueOf(resourceBundle.getString("elasticsearch.http.port"))
  val elasticsearchAddress = elasticsearchHost+sep+elasticsearchHttpPort
  // Azkaban
  val isFirstRunnable = java.lang.Boolean.valueOf(resourceBundle.getString("app.first.runnable"))
  // ## Data path of ETL program output ##
  // # Run in the yarn mode in Linux
  val sparkAppDfsCheckpointDir = resourceBundle.getString("spark.app.dfs.checkpoint.dir")// /apps/logistics/dat-hdfs/spark-checkpoint
  val sparkAppDfsDataDir = resourceBundle.getString("spark.app.dfs.data.dir")// /apps/logistics/dat-hdfs/warehouse
  val sparkAppDfsJarsDir = resourceBundle.getString("spark.app.dfs.jars.dir")// /apps/logistics/jars
  // # Run in the local mode in Linux
  val sparkAppLocalCheckpointDir = resourceBundle.getString("spark.app.local.checkpoint.dir")// /apps/logistics/dat-local/spark-checkpoint
  val sparkAppLocalDataDir = resourceBundle.getString("spark.app.local.data.dir")// /apps/logistics/dat-local/warehouse
  val sparkAppLocalJarsDir = resourceBundle.getString("spark.app.local.jars.dir")// /apps/logistics/jars
  // # Running in the local Mode in Windows
  val sparkAppWinCheckpointDir = resourceBundle.getString("spark.app.win.checkpoint.dir")// D://apps/logistics/dat-local/spark-checkpoint
  val sparkAppWinDataDir = resourceBundle.getString("spark.app.win.data.dir")// D://apps/logistics/dat-local/warehouse
  val sparkAppWinJarsDir = resourceBundle.getString("spark.app.win.jars.dir")// D://apps/logistics/jars

  val dbOracleUrl = resourceBundle.getString("db.oracle.url")
  val dbOracleUser = resourceBundle.getString("db.oracle.user")
  val dbOraclePassword = resourceBundle.getString("db.oracle.password")
  val dbMySQLDriver = resourceBundle.getString("db.mysql.driver")
  val dbMySQLUrl = resourceBundle.getString("db.mysql.url")
  val dbMySQLUser = resourceBundle.getString("db.mysql.user")
  val dbMySQLPassword = resourceBundle.getString("db.mysql.password")
}
object Configuration extends Configuration {
  def main(args: Array[String]): Unit = {
    println(Configuration.dbOracleUrl)
    println(Configuration.dbMySQLDriver)
    println(Configuration.dbMySQLUrl)
    println(Configuration.dbMySQLPassword)
  }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/03/02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实时ETL模块开发准备
    • 一、编写配置文件
      • 二、​​​​​​​创建包结构
        • 三、编写工具类加载配置文件
        相关产品与服务
        Elasticsearch Service
        腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档