首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Flume-Kafka-SparkStreaming实战1

最近有个需求,实时计算用户的成长值。

场景:用户消费一笔订单后,实时计算这个用户的成长值

目标:也就是给用户一个好的体验。

实现:首先用户的成长值包括离线部分和实时部分,离线部分根据多类数据指标按天/周/月计算这个人的成长值,存储在Hbase里。当这个人产生消费后,后台产生订单日志到固定目录,使用Flume读取日志到kafka的topic里,sparkStreaming实时读取kafka数据,并读取hbase里该用户的离线成长值,综合计算成长值后再写回hbase中,供业务方展示给消费者。

实战包括三篇:

1. Flume安装配置,kafka安装配置,Flume+Kafka调试

2. SparkStreaming读取kafka并存储topic的offset到kafka中。

Flume->kafka->sparkStreaming调试

3.kafka的topic offset监控配置

由于之前服务器已经安装Spark,所以本文不涉及SparkStreaming安装配置

一。 Flume安装配置

1.tar -zxvf apache-flume-1.8.0-bin.tar.gz -C ~/bigdata/

2.配置JAVA_HOME

进入到flume的conf/下

vi flume-env.sh => export JAVA_HOME=/usr/local/jdk8

3.配置Flume启动的conf文件 logToKafka.conf

flume配置文件里主要有三个元素:sources,channels,sinks

sources使用:Taildir Source 为什么使用这个,参考官方文档

Watch the specified files, and tail them innearly real-timeonce detected new lines appended to the each files. If the new lines are being written, this source will retry reading them in wait for the completion of the write.

This sourceis reliableandwill not miss dataeven when the tailing files rotate. It periodicallywrites the last read position of each files on the given position file in JSON format.If Flume is stopped or down for some reason, it can restart tailing from the position written on the existing position file.

In other use case, this source can also start tailing from the arbitrary position for each files using the given position file.When there is no position file on the specified path, it will start tailing from the first line of each files by default.

Files will be consumed in order oftheir modification time. File with the oldest modification time will be consumed first.

This sourcedoes not rename or delete or do any modifications to the file being tailed.Currently this source does not support tailing binary files. It reads text files line by line.

logToKafka.conf内容

# Name the components on this agent

a1.sources = r1

a1.channels = c1

a1.sinks = k1

# Describe/configure the source

a1.sources.r1.positionFile =/home/bigdata/data/flume/taildir_position.json

a1.sources.r1.filegroups.f1 =/home/bigdata/data/flume/logs/.*log.*

# Use a channel which buffers events in memory

# Describe the sink

参数说明:

4.按conf里创建目录,并启动flume

mkdir -p /home/bigdata/data/flume/ /home/bigdata/data/flume/logs/

两个目录,第一个是taildir_position.json文件存放的位置,这个文件记录flume读取日志到哪了,为了防止日志丢失。

第二个目录是业务日志存储的路径,注意使用TailDir source,这个路径下文件不要改名字,因为flume依赖taildir_position.json决定是否读过这个文件,改名字被认为没读过。导致重复读取。

启动:进入flume的home目录

./bin/flume-ng agent -c conf -f conf/logToKafka.conf -n a1 -Dflume.root.logger=INFO,console

-c 代表配置文件文件夹

-f 代表配置文件

-n 代表agent的名字

二。 Kafka安装配置

kafka里几个核心概念就是 topic;topic的分区,副本;broker ; 消费组等。。。

百度上一堆

1. tar -zxvf kafka_2.11-2.0.0.tgz -C ~/bigdata/

2. 配置$KAFKA_HOME:

vi .bash_profile 结尾增加:

export KAFKA_HOME=/home/bigdata/kafka_2.11-2.0.0

export PATH=$/bin:$

3.到kafka家目录的config目录下配置server.properties

三个必须配置的:

broker.id=0 ;如果两台kafka就分别 0,1,三台分别0,1,2

log.dirs=/home/bigdata/data/kafka/logs

zookeeper.connect=hadoop2:2181,hadoop3:2181,hadoop4:2181

num.partitions=2 这个默认值,默认每个topic给2个分区

4.创建所需的log目录(就是kafka数据目录):/home/bigdata/data/kafka/logs

5.copy到其它机器,分别修改server.properties里的broker.id=0,1,2

6.启动--

./bin/kafka-server-start.sh /home/bigdata/bigdata/kafka_2.11-2.0.0/config/server.properties 1>/home/bigdata/data/kafka/std.log 2>/home/bigdata/data/kafka/err.log &

7. 一些kafka topic命令

1)查看当前服务器中的所有topic

./bin/kafka-topics.sh --list --zookeeper hadoop2:2181

2)查看topic详情:

./bin/kafka-topics.sh --topic zhengpinghao --describe --zookeeper hadoop3:2181

Topic:zhengpinghao PartitionCount:3 ReplicationFactor:2 Configs:

Topic: zhengpinghao Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0

Topic: zhengpinghao Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1

Topic: zhengpinghao Partition: 2 Leader: 0 Replicas: 0,2 Isr: 0,2

副本2

2)创建Topic

./bin/kafka-topics.sh --create --zookeeper hadoop3:2181 --replication-factor 2 --partitions 3 --topic zhengpinghao

3)生产者

./bin/kafka-console-producer.sh --broker-list hadoop2:9092 --topic zhengpinghao

***--broker-listhadoop2:9092***可以通过一台broker获取所有的list,并不是就写到这个broker的分区上。

4)消费者

./bin/kafka-console-consumer.sh --bootstrap-server hadoop3:9092 --topiczhengpinghao--from-beginning

--from-beginning: 从开始消费 去掉从最大(新的)消费

5)删除topic

kafka-topics.sh --zookeeperhadoop3:2181--delete --topic zhengpinghao

注意:创建,删除topic需要zk,生产消费直接连接broker

Kafka的单独测试:

1)启动几个kafka broker

2)创建topic

3) 使用kafka生产者脚本生产

4)使用kafka消费者脚本消费

三。Flume+Kafka调试

通过脚本模拟日志生成

1)生成日志的脚本produce_log.sh

#!/bin/bash

for (( i=1; i

do

sleep 1;

echo "order-"$ >>/home/bigdata/data/flume/logs/order.log

done

注:/home/bigdata/data/flume/logs/是在 一 flume配置 logToKafka.conf里配置的日志文件读取目录

2)在多台机器上启动flume

./bin/flume-ng agent -c conf -f conf/logToKafka.conf -n a1 -Dflume.root.logger=INFO,console

3)在多台启动了flumeAgent的机器上执行1)的脚本

4)启动kafka消费者脚本 来消费flume kafkaSink里对应的topic

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180828G0ER5W00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券