前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源

基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源

作者头像
Maynor
发布2023-10-17 08:51:00
2120
发布2023-10-17 08:51:00
举报

04:数据源

目标了解数据源的格式及实现模拟数据的生成

路径

  • step1:数据格式
  • step2:数据生成

实施

数据格式

image-20210905200540304
image-20210905200540304

消息时间

发件人昵称

发件人账号

发件人性别

发件人IP

发件人系统

发件人手机型号

发件人网络制式

发件人GPS

收件人昵称

收件人IP

收件人账号

收件人系统

收件人手机型号

收件人网络制式

收件人GPS

收件人性别

消息类型

双方距离

消息

msg_time

sender_nickyname

sender_account

sender_sex

sender_ip

sender_os

sender_phone_type

sender_network

sender_gps

receiver_nickyname

receiver_ip

receiver_account

receiver_os

receiver_phone_type

receiver_network

receiver_gps

receiver_sex

msg_type

distance

message

2020/05/08 15:11:33

古博易

14747877194

48.147.134.255

Android 8.0

小米 Redmi K30

4G

94.704577,36.247553

莱优

97.61.25.52

17832829395

IOS 10.0

Apple iPhone 10

4G

84.034145,41.423804

TEXT

77.82KM

天涯海角惆怅渡,牛郎织女隔天河。佛祖座前长顿首,只求共度一百年。

数据生成

创建原始文件目录

代码语言:javascript
复制
mkdir /export/data/momo_init

上传模拟数据程序

代码语言:javascript
复制
cd /export/data/momo_init
rz
image-20210905142015948
image-20210905142015948

创建模拟数据目录

代码语言:javascript
复制
mkdir /export/data/momo_data

运行程序生成数据

语法

代码语言:javascript
复制
java -jar /export/data/momo_init/MoMo_DataGen.jar 原始数据路径 模拟数据路径 随机产生数据间隔ms时间

测试:每500ms生成一条数据

代码语言:javascript
复制
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
500

结果:生成模拟数据文件MOMO_DATA.dat,并且每条数据中字段分隔符为\001

image-20210929100901349
image-20210929100901349

小结

  • 了解数据源的格式及实现模拟数据的生成

05:技术架构及技术选型

  • 目标掌握实时案例的技术架构及技术选型
  • 路径
    • step1:需求分析
    • step2:技术选型
    • step3:技术架构
  • 实施
    • 需求分析
      • 离线存储计算
        • 提供离线T + 1的统计分析
        • 提供离线数据的即时查询
      • 实时存储计算
        • 提供实时统计分析
    • 技术选型
      • 离线
        • 数据采集:Flume
        • 离线存储:Hbase
        • 离线分析:Hive:复杂计算
        • 即时查询:Phoenix:高效查询
      • 实时
        • 数据采集:Flume
        • 实时存储:Kafka
        • 实时计算:Flink
        • 实时应用:MySQL + FineBI 或者 Redis + JavaWeb可视化
    • 技术架构
    image-20210905162218286
    image-20210905162218286
    • 为什么不直接将Flume的数据给Hbase,而统一的给了Kafka,再由Kafka到Hbase?
      • 避免高并发写导致机器负载过高、实现架构解耦、实现异步高效
      • 保证数据一致性
  • 小结
    • 掌握实时案例的技术架构及技术选型

06:Flume的回顾及安装

目标回顾Flume基本使用及实现Flume的安装测试

路径

  • step1:Flume回顾
  • step2:Flume的安装
  • step3:Flume的测试

实施

Flume的回顾

  • 功能:实时对文件或者网络端口进行数据流监听采集
  • 场景:文件实时采集
  • 开发
    • step1:先开发一个配置文件:properties【K=V】
    • step2:运行这个文件即可
  • 组成
    • Agent:一个Agent就是一个Flume程序
    • Source:负责监听数据源,将数据源的动态数据变成每一条Event数据,将Event数据流放入Channel
    • Channel:负责临时存储Source发送过来的数据,供Sink来取数据
    • Sink:负责从Channel拉取数据写入目标地
    • Event:代表一条数据对象
      • head:Map集合[KV]
      • body:byte[]

Flume的安装

上传安装包

代码语言:javascript
复制
cd /export/software/
rz
image-20210905162948401
image-20210905162948401

解压安装

代码语言:javascript
复制
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/
cd /export/server
mv apache-flume-1.9.0-bin flume-1.9.0-bin

修改配置

代码语言:javascript
复制
#集成HDFS,拷贝HDFS配置文件
cd /export/server/flume-1.9.0-bin
cp /export/server/hadoop/etc/hadoop/core-site.xml  ./conf/
#修改Flume环境变量
cd /export/server/flume-1.9.0-bin/conf/
mv flume-env.sh.template flume-env.sh
vim flume-env.sh 
代码语言:javascript
复制
#修改22行
export JAVA_HOME=/export/server/jdk1.8.0_65
#修改34行
export HADOOP_HOME=/export/server/hadoop-3.3.0

删除Flume自带的guava包,替换成Hadoop的

代码语言:javascript
复制
cd /export/server/flume-1.9.0-bin 
rm -rf lib/guava-11.0.2.jar
cp /export/server/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar lib/

创建目录

代码语言:javascript
复制
cd /export/server/flume-1.9.0-bin
#程序配置文件存储目录
mkdir usercase
#Taildir元数据存储目录
mkdir position

Flume的测试

需求:采集聊天数据,写入HDFS

分析

  • Source:taildir:动态监听多个文件实现实时数据采集
  • Channel:mem:将数据缓存在内存
  • Sink:hdfs

开发

代码语言:javascript
复制
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_hdfs.properties
代码语言:javascript
复制
# define a1
a1.sources = s1 
a1.channels = c1
a1.sinks = k1

#define s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_hdfs.json
#将所有需要监控的数据源变成一个组
a1.sources.s1.filegroups = f1
#指定了f1是谁:监控目录下所有文件
a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
#指定f1采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f1.type = momo
a1.sources.s1.fileHeader = true

#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

#define k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/momo/test/daystr=%Y-%m-%d
a1.sinks.k1.hdfs.fileType = DataStream
#指定按照时间生成文件,一般关闭
a1.sinks.k1.hdfs.rollInterval = 0
#指定文件大小生成文件,一般120 ~ 125M对应的字节数
a1.sinks.k1.hdfs.rollSize = 102400
#指定event个数生成文件,一般关闭
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = momo
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true

#bound
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

启动HDFS

代码语言:javascript
复制
start-dfs.sh

运行Flume

代码语言:javascript
复制
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_hdfs.properties -Dflume.root.logger=INFO,console

运行模拟数据

代码语言:javascript
复制
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
100

查看结果

image-20210905171157230
image-20210905171157230

小结

  • 回顾Flume基本使用及实现Flume的安装测试

07:Flume采集程序开发

目标实现案例Flume采集程序的开发

路径

  • step1:需求分析
  • step2:程序开发
  • step3:测试实现

实施

需求分析

需求:采集聊天数据,实时写入Kafka

Source:taildir

Channel:mem

Sink:Kafka sink

代码语言:javascript
复制
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

程序开发

代码语言:javascript
复制
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_kafka.properties
代码语言:javascript
复制
# define a1
a1.sources = s1 
a1.channels = c1
a1.sinks = k1

#define s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_kafka.json
#将所有需要监控的数据源变成一个组
a1.sources.s1.filegroups = f1
#指定了f1是谁:监控目录下所有文件
a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
#指定f1采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f1.type = momo
a1.sources.s1.fileHeader = true

#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

#define k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = MOMO_MSG
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 100

#bound
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

测试实现

启动Kafka

代码语言:javascript
复制
start-zk-all.sh
start-kafka.sh 

创建Topic

代码语言:javascript
复制
kafka-topics.sh --create --topic MOMO_MSG  --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092

列举

代码语言:javascript
复制
kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092

启动消费者

代码语言:javascript
复制
kafka-console-consumer.sh --topic MOMO_MSG --bootstrap-server node1:9092,node2:9092,node3:9092

启动Flume程序

代码语言:javascript
复制
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console

启动模拟数据

代码语言:javascript
复制
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
50

观察结果

小结

  • 实现案例Flume采集程序的开发
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-10-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 04:数据源
  • 05:技术架构及技术选型
  • 06:Flume的回顾及安装
  • 07:Flume采集程序开发
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档