前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数据仓库实战 2

数据仓库实战 2

作者头像
soundhearer
发布2020-12-18 11:46:26
3800
发布2020-12-18 11:46:26
举报
文章被收录于专栏:数据湖数据湖

我们接着来看数据采集模块

Flume采集日志数据到Kafka

首先我们需要用Flume采集日志数据到Kafka

配置

日志采集我们采用的是flume,比较传统成熟的日志采集项目。

首先我们从实时生成的日志文件通过flume采集到kafka中。log日志的格式是app-yyyy-mm-dd.log

CDH7.1.1中移除了flume组件,代替的是Nifi。我们直接将flume下载到节点中,具体的配置如下。

在/data0/apache-flume-1.9.0-bin/conf目录下创建file-flume-kafka.conf文件

代码语言:javascript
复制
[root@cdh3 conf]# cat file-flume-kafka.conf 
a1.sources=r1
a1.channels=c1 c2

# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /data0/apache-flume-1.9.0-bin/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2

#interceptor
a1.sources.r1.interceptors =  i1 i2
a1.sources.r1.interceptors.i1.type = com.soundhearer.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.soundhearer.flume.interceptor.LogTypeInterceptor$Builder

a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2

# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = cdh1.macro.com:9092,cdh2.macro.com:9092,cdh2.macro.com:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer

a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = cdh1.macro.com:9092,cdh2.macro.com:9092,cdh2.macro.com:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer

com.soundhearer.flume.interceptor.LogETLInterceptor和com.soundhearer.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

Flume的ETL和分类型拦截器

本项目中自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。

ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志

日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。

具体的代码,在我的github上。

Kafka创建topic

我们通过命令行创建两个topic

代码语言:javascript
复制
kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --create --topic topic_start --partitions 3 --replication-factor 2

查看topic

代码语言:javascript
复制
[root@cdh3 conf]# kafka-topics --zookeeper cdh1.macro.com:2181,cdh2.macro.com:2181,cdh3.macro.com:2181/kafka --list
20/11/24 18:41:34 INFO zookeeper.ClientCnxn: Session establishment complete on server cdh2.macro.com/192.168.0.207:2181, sessionid = 0x1007949b99a034d, negotiated timeout = 30000
20/11/24 18:41:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Connected.
ATLAS_ENTITIES
ATLAS_HOOK
ATLAS_SPARK_HOOK
__consumer_offsets
fill
topic_event
topic_start
20/11/24 18:41:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Closing.
20/11/24 18:41:34 INFO zookeeper.ZooKeeper: Session: 0x1007949b99a034d closed
20/11/24 18:41:34 INFO zookeeper.ClientCnxn: EventThread shut down for session: 0x1007949b99a034d
20/11/24 18:41:34 INFO zookeeper.ZooKeeperClient: [ZooKeeperClient] Closed.

kafka机器数量计算

Kafka机器数量(经验公式)=2(峰值生产速度副本数/100)+1

先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。

比如我们的峰值生产速度是50M/s。副本数为2。

Kafka机器数量=2(502/100)+ 1=3台

启动Flume采集

在/data0/apache-flume-1.9.0-bin/bin目录下执行如下命令

代码语言:javascript
复制
nohup flume-ng agent --name a1 --conf-file ../conf/file-flume-kafka.conf &

消费kafka topic数据,发现已经有数据了

代码语言:javascript
复制
kafka-console-consumer --bootstrap-server cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092 --from-beginning --topic topic_start

kafka-console-consumer --bootstrap-server cdh1.macro.com:9092,cdh2.macro.com:9092,cdh3.macro.com:9092 --from-beginning --topic topic_event

1606152976690|{"cm":{"ln":"-90.5","sv":"V2.0.7","os":"8.2.8","g":"ZPWDFI86@gmail.com","mid":"991","nw":"3G","l":"en","vc":"5","hw":"640*960","ar":"MX","uid":"991","t":"1606064013535","la":"-40.9","md":"Huawei-1","vn":"1.3.6","ba":"Huawei","sr":"Q"},"ap":"app","et":[{"ett":"1606139735962","en":"display","kv":{"goodsid":"244","action":"2","extend1":"1","place":"5","category":"81"}},{"ett":"1606060625200","en":"newsdetail","kv":{"entry":"1","goodsid":"245","news_staytime":"18","loading_time":"0","action":"1","showtype":"3","category":"23","type1":"102"}},{"ett":"1606148719063","en":"loading","kv":{"extend2":"","loading_time":"45","action":"2","extend1":"","type":"2","type1":"102","loading_way":"1"}},{"ett":"1606112496011","en":"comment","kv":{"p_comment_id":1,"addtime":"1606069010840","praise_count":692,"other_id":0,"comment_id":1,"reply_count":58,"userid":5,"content":"爹钧异"}},{"ett":"1606138524102","en":"favorites","kv":{"course_id":8,"id":0,"add_time":"1606078090460","userid":2}}]}
1606152976691|{"cm":{"ln":"-58.1","sv":"V2.6.0","os":"8.0.4","g":"R2Q998F1@gmail.com","mid":"995","nw":"3G","l":"en","vc":"2","hw":"640*960","ar":"MX","uid":"995","t":"1606111827871","la":"6.4","md":"Huawei-17","vn":"1.0.5","ba":"Huawei","sr":"I"},"ap":"app","et":[{"ett":"1606129460089","en":"newsdetail","kv":{"entry":"1","goodsid":"245","news_staytime":"42","loading_time":"0","action":"1","showtype":"5","category":"79","type1":"201"}},{"ett":"1606100900686","en":"ad","kv":{"entry":"3","show_style":"3","action":"4","detail":"102","source":"1","behavior":"1","content":"2","newstype":"8"}},{"ett":"1606098687596","en":"active_foreground","kv":{"access":"","push_id":"3"}},{"ett":"1606067052812","en":"active_background","kv":{"active_source":"3"}},{"ett":"1606068620224","en":"error","kv":{"errorDetail":"java.lang.NullPointerException\\n    at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}},{"ett":"1606076123601","en":"favorites","kv":{"course_id":6,"id":0,"add_time":"1606133566208","userid":2}}]}

Flume消费Kafka数据到HDFS

接着我们通过flume消费kafka数据到HDFS

配置

在cdh2节点部署另一个Flume,在/data0/apache-flume-1.9.0-bin/conf目录下创建kafka-flume-hdfs.conf文件

代码语言:javascript
复制
[root@cdh2 conf]# cat kafka-flume-hdfs.conf 
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = cdh1.macro.com:9092,cdh2.macro.com:9092,cdh2.macro.com:9092
a1.sources.r1.kafka.topics=topic_start

## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = cdh1.macro.com:9092,cdh2.macro.com:9092,cdh2.macro.com:9092
a1.sources.r2.kafka.topics=topic_event

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data0/apache-flume-1.9.0-bin/checkpoint/behavior1
a1.channels.c1.dataDirs = /data0/apache-flume-1.9.0-bin/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /data0/apache-flume-1.9.0-bin/checkpoint/behavior2
a1.channels.c2.dataDirs = /data0/apache-flume-1.9.0-bin/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second

##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 10
a1.sinks.k2.hdfs.roundUnit = second

## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0

## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.fileType = CompressedStream 

a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

启动flume消费kafka

在HDFS中创建origin_data目录

代码语言:javascript
复制
hadoop fs -mkdir /origin_data

在/data0/apache-flume-1.9.0-bin/bin目录下执行如下命令

代码语言:javascript
复制
nohup flume-ng agent --name a1 --conf-file ../conf/kafka-flume-hdfs.conf &

可以看到HDFS origin_data目录下已经生成了数据,flume成功地消费kafka数据到HDFS中了。

代码语言:javascript
复制
[root@cdh2 bin]# hadoop fs -ls /origin_data/gmall/log
Found 2 items
drwxr-xr-x   - hive hive          0 2020-11-24 02:41 /origin_data/gmall/log/topic_event
drwxr-xr-x   - hive hive          0 2020-11-24 10:19 /origin_data/gmall/log/topic_start
[root@cdh2 bin]# hadoop fs -ls /origin_data/gmall/log/topic_event
Found 1 items
drwxr-xr-x   - hive hive          0 2020-11-24 02:41 /origin_data/gmall/log/topic_event/2020-11-24
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据湖 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flume采集日志数据到Kafka
  • 配置
  • Flume的ETL和分类型拦截器
  • Kafka创建topic
  • 启动Flume采集
  • Flume消费Kafka数据到HDFS
  • 配置
  • 启动flume消费kafka
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档