前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flume采集数据实时存储hive两种解决方案

flume采集数据实时存储hive两种解决方案

作者头像
尚浩宇
发布2018-08-17 10:43:08
5.5K0
发布2018-08-17 10:43:08
举报
文章被收录于专栏:杂烩杂烩

说明:本文不仅提供两种方案,还详细的记录了一些相关信息。

方案一

        本方案的核心是flume采集数据后,按照hive表的结构,将采集数据输送到对应的地址中,达到数据实时存储的目的,这种实时实际上是一种准实时。

        假设hadoop集群已经正常启动,hive也已经正常启动,并且hive的文件地址是/hive/warehouse,然后hive里存在一张由以下建表语句创建的表

 create table flume_test(uuid string);

可推断,表flume_test地址在/hive/warehouse/flume_test,下面介绍flume:

        flume安装步骤

#下载
cd /opt
mkdir flume
wget http://archive.apache.org/dist/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz
tar xvzf apache-flume-1.6.0-bin.tar.gz
cd apache-flume-1.6.0-bin/conf
cp flume-env.sh.template flume-env.sh

        打开flume-env文件,添加java变量

export JAVA_HOME=/usr/java/jdk1.8.0_111

        然后添加环境变量,为了一次过,分别在profile和bashrc末尾添加

export FLUME_HOME=/opt/flume/apache-flume-1.6.0-bin 
export FLUME_CONF_DIR=$FLUME_HOME/conf  
export PATH=$PATH:$FLUME_HOME/bin 

        然后

source /etc/profile

        到此flume安装完毕,下面进行配置,切换到conf文件夹复制flume-conf.properties.template为agent.conf,然后编辑

#定义活跃列表
agent.sources=avroSrc
agent.channels=memChannel
agent.sinks=hdfsSink

#定义source
agent.sources.avroSrc.type=avro
agent.sources.avroSrc.channels=memChannel
agent.sources.avroSrc.bind=0.0.0.0
agent.sources.avroSrc.port=4353
agent.sources.avroSrc.interceptors=timestampinterceptor
agent.sources.avroSrc.interceptors.timestampinterceptor.type=timestamp
agent.sources.avroSrc.interceptors.timestampinterceptor.preserveExisting=false

#定义channel
agent.channels.memChannel.type=memory
agent.channels.memChannel.capacity = 1000
agent.channels.memChannel.transactionCapacity = 100

#定义sink
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.channel=memChannel
#agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-n:9000/flume/test/%{topic}/%Y%m%d%H
agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-n:9000/hive/warehouse/flume_test
agent.sinks.hdfsSink.hdfs.filePrefix=stu-flume
agent.sinks.hdfsSink.hdfs.inUsePrefix=inuse-stu-flume
agent.sinks.hdfsSink.hdfs.inUseSuffix=.temp
agent.sinks.hdfsSink.hdfs.rollInterval=0
agent.sinks.hdfsSink.hdfs.rollSize=10240000
agent.sinks.hdfsSink.hdfs.rollCount=0
agent.sinks.hdfsSink.hdfs.idleTimeout=0
agent.sinks.hdfsSink.hdfs.batchSize=100
agent.sinks.hdfsSink.hdfs.minBlockReplicas=1
# agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType = DataStream

        具体的每一项配置可参照下面这篇博客http://lxw1234.com/archives/2015/10/527.htm,需要警惕的是rollInterval、rollSize、rollCount、idleTimeout这四个属性,如果进行了配置发现不起作用,就要检查一下minBlockReplicas这个属性是否配置,并且值是否是1,下面这个连接是原因http://doc.okbase.net/chiweitree/archive/126197.html

        配置完毕后可以启动,启动命令

./flume-ng agent -f ../conf/agent.conf -n agent -c conf -Dflume.monitoring.type=http \-Dflume.monitoring.port=5653 -Dflume.root.logger=DEBUG,console

        注意:-n 指的是agent的名称,需要对应到配置文件的第一个值,本启动命令还开启了监控,监控地址http://host:5653/metrics;-f 指的是配置文件的路径及名称。flume的conf修改后不用重启,默认30秒刷新一次,自动装载最新的配置。

        flume安装并启动完毕后,编写测试程序。打开eclipse,创建maven项目

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>scc</groupId>
	<artifactId>stu-flume</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>war</packaging>
	<name>stu-flume</name>
	<dependencies>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.9</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flume.flume-ng-clients</groupId>
			<artifactId>flume-ng-log4jappender</artifactId>
			<version>1.6.0</version>
		</dependency>
	</dependencies>
</project>

测试servlet

public class GenerLogServlet extends HttpServlet {
    private static final Logger LOGGER = Logger.getLogger(GenerLogServlet.class);
    private static final long serialVersionUID = 1L;
    
    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        for (;;) {
            LOGGER.info(UUID.randomUUID().toString());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    @Override
    protected void doPost(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        this.doGet(request, response);
    }

}

log4j.properties

#log4j settings
#log4j.rootLogger=debug, CONSOLE
log4j.logger.scc.stu_flume.GenerLogServlet=debug,GenerLogServlet
#log4j.rootLogger=INFO

log4j.appender.GenerLogServlet=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.GenerLogServlet.Hostname=10.5.3.100
log4j.appender.GenerLogServlet.Port=4353
log4j.appender.GenerLogServletUnsafeMode=false

        启动项目,访问http://localhost:8080/log开始生产数据。需要注意的是,如果flume配置基于时间戳做文件分组(此种情况可以匹配hive根据时间进行分区),那么需要agent.conf中的source一定要配置

agent.sources.avroSrc.interceptors=timestampinterceptor
agent.sources.avroSrc.interceptors.timestampinterceptor.type=timestamp
agent.sources.avroSrc.interceptors.timestampinterceptor.preserveExisting=false

否则flume的sink会报找不到timestamp错误,因为源码org.apache.flume.clients.log4jappender.Log4jAvroHeaders中定义timestamp的key是flume.client.log4j.timestamp而不是timestamp,所以需要手动添加一个timestamp,如果对这个timestamp要求必须是数据生产的时间,可以修改源码或者为source添加拦截器手动配置。

        flume具有非常灵活的使用方式,可以自定义source、sink、拦截器、channel选择器等等,适应绝大部分采集、数据缓冲等场景。

        观察hadoop目录,发现flume已经按配置将数据移动到相应的hive表目录中,如下图:

        打开hive客户端,数据查询命令,发现数据已可被查询!并且针对hive的分区表和桶表flume都可以实现按照hive表数据规则写入,进而达到数据实时插入,至此,方案一结束。

        本方案缺点:

            由于flume在写入文件的时候,独占正在写入的文件资源,导致hive不能读取正在被写入的文件的内容,也就是说假如每5分钟生成一个文件,那么正在写的文件不会被hive读取到内容,也就意味了hive存在最大5分钟的延迟。而如果把时间变小,那么延迟就会降低,但是哪怕是设置30分钟或1个小时,flume流量不大的情况下,也会生成许多零散的小文件,这点与hive的特长相悖,hive擅长处理大文件,对于零散小文件hive性能会降低很多。

方案二

       对比方案一,测试程序、source不变,sink改成hbase-sink,数据实时插入到hbase中,然后在hive建立一张hbase映射表,hive从hbase中读取数据,这样可达到实时插入的效果。由于字数限制,方案二记录在如下博客连接中:

https://cloud.tencent.com/developer/article/1188239

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016/11/15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 方案一
  • 方案二
相关产品与服务
TDSQL MySQL 版
TDSQL MySQL 版(TDSQL for MySQL)是腾讯打造的一款分布式数据库产品,具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档