电信客服分析平台_学习总结
电信项目:
一、idea 项目构建
1、安装 jdk 并配置环境变量。
2、安装 maven,解压离线仓库,并设置 settings。
** conf 目录下的 setttings.xml 文件复制到离线仓库的 m2 目录下,并修改 mirror 标签以及离线仓库路径。
** 设置 idea 工具的 maven 选项,涉及到 4 个地方:Work offline(脱网工作/离线模式),以及 3 个 maven 设置。注意:Override 选项。
3、新建 ct 主项目目录(相当于 eclipse 的 workset)。
** 一个项目对应一个文件夹,举例:
workspace:
ct:
ct_producer:
该项目的各种包
ct_analysis:
该项目的各种包
4、新建 ct_producer 模块,用于数据生产代码的编写或构建。
** 构建该项目选择 maven,ct 项目下所有的模块(module)都是 maven 工程。(maven 要是用 3.3.9 的,maven3.5,有部分兼容性问题)
5、设置常用选项
** View -> Toolbar 和 Tool Buttons 勾选上
** 取消 idea 自动打开之前项目的功能(搜索 Reopen,关闭相关标签即可)
** 设置字体大小(Editor -> Font -> Size)进行设置
** 设置字符编码:搜索:File Encodings,3 个位置全部改为 UTF-8
** 自动导包以及自动提示设置(搜索 Auto,设置自动导包为 Ask,代码自动提示为 First letter)
尖叫提示:
** idea -> File -> Setttings 设置的是当前项目的配置(只针对当前项目生效)
** idea -> File -> Others Setttings -> Default settings 设置的是全局默认配置(也就是说,以后新建项目都是按照这个默认配置)
二、数据生产
1、新建Producer.java
** 初始化联系人集合用于随机数据使用
** 随机两个电话号码
** 随机通话建立的时间,返回 String,格式:yyyy-MM-dd HH:mm:ss
** 随机通话持续时间
** 将产生的数据写入到本地磁盘中(日志文件)
三、数据消费(数据存储)
flume:Cloudera 公司研发
适合下游数据消费者不多的情况;
适合数据安全性要求不高的操作;
适合与 Hadoop 生态圈对接的操作。
kafka:Linkedin 公司研发
适合数据下游消费众多的情况;
适合数据安全性要求较高的操作(支持 Replication)。
1、安装运行 zookeeper
2、安装配置 kafka,此时我使用的版本是 2.11-0.11.0.2
** 修改 server.properties ,配置分发,重启 kafka 集群
3、启动 kafka 集群
** 启动 kafka 集群,分别在 3 台机器上执行:
$ /opt/module/kafka/bin/kafka-server-start.sh /opt/module/kafka/config/server.properties
** 创建 kafka 主题:calllog
$ /opt/module/kafka/bin/kafka-topics.sh --zookeeper hadoop102:2181 --topic calllog --create --replication-factor 1 --partitions 3
** 查看主题列表
$ /opt/module/kafka/bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
** 启动 kafka 控制台消费者,用于测试
$ /opt/module/kafka/bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic calllog --from-beginning
4、配置 flume,用于监听实时产生的数据文件
** 创建 flume 的 job 配置文件
尖叫提示:由于在配置 flume 的过程中,涉及到了数据监听读取方式的操作“tail -F -c +0”,即每次读取完整的文件,
所以修改了 java 代码中,输出流的写出方式为:非追加,即覆盖文件。
** 启动 flume 服务
$ /opt/module/flume/bin/flume-ng agent --conf conf/ --name a1 --conf-file /opt/module/flume/jobs/ct/flume-kafka.conf
5、生产日志
6、使用 Java KafkaAPI 读取 Kafka 中缓存的数据
** 通过 https://mvnrepository.com/ 网站找到你需要使用的依赖
** 导入依赖
** 建立包结构
7、成功拿到数据之后,使用 Java HBaseAPI 将数据放入
** 拿到一条数据,我要把这条数据放到 Hbase 表中
** 创建表(粘!)
** 突然发现没有命名空间,粘命名空间初始化方法(粘!)
** 预分区键生成的方法(粘!)
** 创建 rowkey 的分区号
** 创建 rowkey
** 使用 Put 对象插入数据,需要 rowkey
8、写入数据到 HBase 中
** 先确保表是成功创建的
** 检查 Hbase 各个节点均为正常,通过浏览器:http://hadoop102:16010/master-status 查看
** maven 导包,不要导错了,不要重复导包,不要导错版本
** 代码逻辑执行顺序要注意
** 超时时间设置:
*** kafka 根目录下的 config 目录下,修改 server.properties文 件对于 zookeeper 超时时间的限定
*** 项目的 resoureces 目录下的 kafka.properties 文件中,关于 zookeeper 超时的设置(设置自动确认 offset 的时间间隔)
以上两个值,设置都稍大一些,比如 50000
** 思路没有捋清楚:
1、创建命名空间
2、创建表(先不要添加协处理器)(注意,需要预分区)
3、创建 rowkey 生成方法
4、创建预分区号生成方法
5、在 HBaseDAO 中的构造方法里,初始化命名空间,初始化表(注意判断表是否存在)
6、在 HBaseDao 中创建 put 方法,用于存放数据
7、在 kafka 取得数据时,使用 HbaseDao 的实例化对象,调用 put 方法,将数据存入即可。
9、优化数据存储方案:使用协处理器
1、同一条数据,存储两遍。
rowkey:实现了针对某个人,查询该人范围时间内的所有通话记录
分区号 + call1 + buildTime + call2 + flag + duration
15837312345_20170101000000_13733991234
2、讨论使用协处理器的原因
3、操作过程
** 创建协处理器类:CalleeWriteObserver extends BaseRegionObserver
** 覆写 postPut 方法
** 编写代码一大堆(实现将被叫数据存入)
** 找到你的建表的那部分代码,在创建表方法中:添加你成功创建的协处理器
** 修改 resources 中 hbase-site.xml 配置文件(即注册)
** 打包
** 将包分别放于 3 台机器中的 hbase 根目录中的 lib 目录下
** 3 台机器中的 hbase-site.xml 文件要注册协处理器
** 重启 hbase
** 测试
** 如果测试成功,记得把表初始化一下
4、打包,执行消费数据方法
方式一:
在 windows 环境下:
java -Djava.ext.dirs=C:\Users\bruce\Desktop\maven-lib\lib\ -cp C:\Users\bruce\Desktop\maven-lib\ct_consumer-1.0-SNAPSHOT.jar com.china.kafka.HBaseConsumer
在 Linux 环境下:
java -Djava.ext.dirs=/opt/module/flume/job/ct/lib/ -cp /opt/module/flume/job/ct/ct_consumer-1.0-SNAPSHOT.jar com.china.kafka.HBaseConsumer
方式二:
windows:当工程 ct_consumer-1.0-SNAPSHOT.jar 与所依赖的 jar 放在同一的目录中
java -cp C:\Users\bruce\Desktop\maven-lib\lib\* com.china.kafka.HBaseConsumer
linux:
java -cp /opt/module/flume/job/ct/lib/*:ct_consumer-1.0-SNAPSHOT.jar com.china.kafka.HBaseConsumer
5、idea 打包,并提交到 linux 执行
** 打包,并将打好的包以及第三方依赖整个拷贝出来
** 上传该文件夹(ct_consumer-1.0-SNAPSHOT.jar)到 linux 中
** 运行:java -Djava.ext.dirs=/opt/module/flume/job/ct/lib/ -cp /opt/module/flume/job/ct/ct_consumer-1.0-SNAPSHOT.jar com.china.kafka.HBaseConsumer
6、某个用户,传入指定的时间范围,查询该时间范围内的该用户的所有通话记录(包含主叫和被叫)
15837312345 2017-01-01 2017-05-01
rowkey:01_15837312345_20171102181630_13737312345_1_0180
** scan.setStartRow
2017-01-01
** scan.setStopRow
2017-02-01
** 组装 rowkey
01_15837312345_201711
01_15837312345_201712
7、Filter 测试讲解
四、数据分析
1、统计所有用户每月通话记录(通话次数,通话时长)
2、统计所有用户每日通话记录(通话次数,通话时长)
3、导入 Mysql 建表语句(db_telecom.sql)
4、新建项目,构建包结构,创建能够想到的需要使用的类,不需要任何实现
5、整理思路:
** Key:电话号码 + 时间
** Value:本次通话(1) + 通话时间
6、复习第三天内容:
** 构建数据分析项目
** 构建数据表结构
** Mapper、Reducer
** 封装各种 JavaBean,KeyClasss,ValueCalss
** 自定义 OutPutFormat(对数据的操作)
** 根据维度对象查询数据库得到已有维度对象的id(如果没有则新增一条,并返回 id,封装 Convert)
(考虑数据插入时批处理)
** 工具类:
*** JDBCUtil
*** LRUCache
** Runner 组装 Job 任务
** 初始化 Mapper
** 设置键值对 class
** 初始化 Reducer
** 设置键值对 class
** 设置 OutputForamt
** 提交运行 job
** 将 Linux 上配置文件拷贝到 resources 文件夹中
** 运行测试
问题总结:
1、ComDimension 构造方法中,实例化时间维度和联系人维度对象。
2、MySQLOutputformat 的 close 方法中,没有关闭资源,关闭:JDBCUtil.close(conn, ps, null);
3、Runner,Mapper,Reducer 中的泛型,不要使用抽象类
4、DimensionConverterImpl 中的 genSQL方法写反了,需要调换位置。
5、DimensionConverterImpl 中设置 JVM 退出时,关闭资源,如:Runtime.getRuntime().addShutdownHook(new Thread(() -> JDBCUtil.close(threadLocal.get(), null, null)));
6、Mysql 的 url 连接一定要是具体的主机名或者IP地址
7、DimensionConverterImpl 中的 close 方法关闭数据库连接
8、调试时,打包 jar,上传到 linux,拔掉网线,进行测试。
9、数据库连接到底何时关闭,要梳理清楚。解决 Too many connections;
MySQLOutputformat -> MysqlRecordWriter -> DimensionConverterImpl
10、mysql-driver 包没有导入成功
五、数据展示
1、展示数据所需要的字段都有哪些:
call_sum,call_duration_sum,telephone,name,year,month,day
2、通话通话次数与通话时长,展示用户关系。
3、通过表格,展示一个人当年所有的话单信息。
IDEA 常用快捷键:
Alt + Enter :智能修复
Ctrl + Alt + V :自动生成当前对象名
Ctrl + Alt + T :自动呼出包裹菜单
Ctrl + O :呼出覆写菜单
Ctrl + Alt + L :格式化代码
Ctrl + Shift + Enter :自动补全当前行代码缺失的符号
IDEA 方式导出工程所依赖的 jar 包:
File 选项卡 -> Project Structure -> Artifacts -> 点击加号 -> Jar -> From modules with dependencies -> 选择对应的 Module 工程 ->
勾选 copy to the output directory and link via manifest -> Ok -> Ok -> Build 选项卡 -> Build Artifacts -> Build 或者 Rebuild -> 在工程目录下出现 out 文件夹
IDEA 中配置 web 服务器(Tomcat):
Run 选项卡 -> Edit Configurations -> 点击“+”按钮 -> Tomcat Server -> Local -> Name(自定义名称) -> 选择 Tomcat 的安装目录 -> Fix -> 选择 xxx exploded ->
Update classes and resources -> Update classes and resources -> Ok