前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flume 1.8 集成 ES6 与 Granfa 的容器化实践

Flume 1.8 集成 ES6 与 Granfa 的容器化实践

作者头像
蒋老湿
修改2019-12-09 14:41:38
1.3K2
修改2019-12-09 14:41:38
举报
文章被收录于专栏:技术栈技术栈

Flume 简介

Flume 是 Apache Software Foundation 的顶级项目,是一个分布式,可靠且可用的系统,是对大数据量的日志进行高效收集、聚集、移动的服务,Flume 只能在 Unix 环境下运行。 它具有基于流数据的简单灵活的架构,具有可靠的可靠性机制和许多故障转移和恢复机制,具有强大的容错性。它使用简单的可扩展数据模型,允许在线分析应用程序。可以有效地从许多不同的 Source 收集数据,便于聚合和移动大量日志数据到集中式数据存储。

Flume 原理

首先了解以下两种角色分别表示什么:

  • Flume Event 被定义为具有字节有效负载和可选字符串属性集的数据流单元。
  • Flume agent 是一个(JVM)进程,它承载 Event 从外部 Source 流向下一个目标(跃点)的组件。

下图为 Flume 的原理工作流程图:

从图可以看出,Source 监控某个文件或数据流,数据源产生新的数据,拿到该数据后,将数据封装在一个 Event 中,并 put 到 Channel 后 commit 提交,Channel 队列先进先出,Sink 去 Channel 队列中拉取数据,然后写入到 HDFS 中。

  • Source:用于采集数据,Source 是产生数据流的地方,同时 Source 会将产生的数据流传输到 Channel,这个有点类似于 Java IO 部分的 Channel。
  • Channel:用于桥接 Sources 和 Sinks,类似于一个队列。
  • Sink:从 Channel 收集数据,将数据写到目标源(可以是下一个 Source,也可以是 HDFS 或者 HBase)。
  • Event:传输单元,Flume 数据传输的基本单元,以事件的形式将数据从源头送至目的地。
一个简单的例子

这里给出一个示例配置文件,描述单节点 Flume 部署。此配置允许用户生成 Event,然后将其记录输出显示到控制台。

example.conf

代码语言:txt
复制
# 给a1的sources、sinks、channels定义名称(自定义)a1.sources = r1a1.sinks = k1a1.channels = c1# 设置数据源类型a1.sources.r1.type = netcat# 设置数据源绑定的 IPa1.sources.r1.bind = localhost# 设置数据源监听的端口a1.sources.r1.port = 44444# 设置采集的数据类型为loggera1.sinks.k1.type = logger# 设置管道类型为memory内存a1.channels.c1.type = memory# 设置管道容量大小a1.channels.c1.capacity = 1000# 设置管道吞吐量大小a1.channels.c1.transactionCapacity = 100# 将source和sink绑定到channel上a1.sources.r1.channels = c1a1.sinks.k1.channel = c1

温馨提示:如果输出到本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。

此配置定义名为 a1 的单个 agent。a1 有一个监听端口 44444 上的数据的 Source,一个缓冲内存中 Event 数据的 Channel,以及一个将 Event 数据记录到控制台的 Sink。

  1. 鉴于此配置文件,我们可以按如下方式启动 Flume:
代码语言:txt
复制
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
  1. 开启一个单独的终端,我们可以通过 telnet 端口 44444,向 Flume 发送一个 Event:
代码语言:txt
复制
$ telnet localhost 44444Trying 127.0.0.1...Connected to localhost.localdomain (127.0.0.1).Escape character is '^]'.Hello world! <ENTER>OK
  1. 原 Flume 终端将在控制台中输出日志消息中输出 Event 内容。
代码语言:txt
复制
12/06/19 15:32:19 INFO source.NetcatSource: Source starting12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D          Hello world!. }

关于更多使用内容可参考 Flume 官网:

Flume 1.8.0 User Guide

Flume1.8 与 ES6 集成

因为当前 (2019-09-01) Flume 的源码支持的 Elasticsearch 的客户端版本是 0.90.1,通过查看 Flume 的 pom.xml 文件内容可以知道。所以如果我们想要集成高版本的 Elasticsearch 就需要修改对应 pom.xml 文件中的 Elasticsearch 版本,对应修改内容如下:

显示的指定依赖中 Elasticsearch 依赖的版本,并加上和 Elasticsearch 相关的其它依赖 jar。

由于修改了 jar 的版本,项目中相关的类也会出现错误,需要修改的地方也不少,但主要修改有 2 个类,因为发起调用的就是这 2 个类,分别是:

  • ElasticSearchRestClient.class
  • ElasticSearchTransportClient.class

下图是修改过的源码所对应的文件夹目录,被红框所圈住的是单元测试的代码,不做重点讲解。

我们先看 flume-parent 下的 pom.xml 的文件变化,通过 IDEA 工具可以发现,修改过的 pom.xml 相比原 pom.xml 有 2 处改动的地方;

  1. 增加了 log4j-core.jar 用户日志打印,给 @Slf4j 使用
  2. 配置将依赖包一并打入到项目的 jar 包中的 Maven 插件(用于构建一个包含应用所有依赖 jar 的 jar 包,方便使用)

接下来以此讲解剩下的内容:

ContentBuilderUtil

ContentBuilderUtil 主要是在 createParser() 方法中新增了一个 NamedXContentRegistry.EMPTY 参数,具体说明请看图中注释部分:

ElasticSearchEventSerializer

相比官方源码的修改部分,把 BytesStream 替换为 XcontentBuilder。

ElasticSearchLogStashEventSerializer

相比官方源码的修改部分,追加 builder.endobject() 方法。

EventSerializerIndexRequestBuilderFactory

因为已经修改过 ElasticSearchEventSerializer 的方法返回类型,所以在 EventSerializerIndexRequestBuilderFactory 中只需要把 BytesStream 替换为 XcontentBuilder 即可。

Maps

这里其实没什么复杂逻辑,就是为了方便,封装了一个简单的 Map 工具类。

flume-ng-elasticsearch/pom.xml

ElasticSearchRestClient

相比官方源码修改部分,content.toBytesArray().toUtf8()); 修改为 content.utf8ToString():

ElasticSearchTransportClient

替换与导入类库。

GitHub 项目源码的修改部分与注释,下面附上整个修改好的项目代码地址,读者可以根据修改后的版本和官方原版本对比,查看出其中的差异。

集成高版本 ES 的 Flume:

flume-modify-es6

Docker 的环境准备

随着容器化技术的普及,越来越多的公司开始使用容器化技术,本文我们也紧跟潮流,使用 Docker 来部署我们的开发运行环境,但考虑部分人的需求,普通方式和容器方式都会介绍。

Flume 的 Linux 环境安装

参考:记一次 linux 上安装配置 flume1.8.0 过程

Flume 的 Docker 环境安装
  1. 下载 Flume 镜像文件:
代码语言:txt
复制
docker pull probablyfine/flume:latest
  1. 基于 Flume 镜像文件创建一个容器示例:
代码语言:txt
复制
# 创建一个名为flume且网络配置为mynetwork的容器,# 并把容器中/opt/flume-config/flume.conf目录下的文件挂载到宿主机/Users/haha/Documents/flume-config目录下docker run -d --name flume --net mynetwork -v /Users/haha/Documents/flume-config/: /opt/flume-config/flume.conf/ probablyfine/flume:latest
  1. 查看 Docker 中网卡为 mynetwork 配置信息:

**温馨提示:**也可以不指定 --net mynetwork,则其使用默认的 bridge 网络模式。

代码语言:txt
复制
docker inspect mynetwork

至此,关于 Flume 的环境准备与安装已经完成,接下来进行配置。

Flume 配置文件

因为我们使用的是 Docker 的方式安装,所以需要使用以下命令进入 Flume 容器:

代码语言:txt
复制
docker exec -it flume bash

进入到容器后可以看到 Flume 是安装在 /opt/ 目录下的,这个位置是 dockerfile 文件中指定的。

如果有需要也可以自己写一份 dockerfile 文件,通过 dockerfile 文件来构建一个 Flume 的镜像,关于 Docker 的其它内容,平台已经有很多了,有兴趣的朋友可以订阅,当然也可以关注我,后期我会发布免费的 Docker 基础使用教程。

进入 flume.conf 目录中可以看到 2 个 .conf 配置文件,还记得之前启动容器时配置的 -v 参数吗?这 2 个文件是在宿主机中创建的,对应的同步到了容器中。

查看 flume-config 目录中后缀名为 .conf 配置文件的内容。

config.conf 输出内容到 ES

代码语言:txt
复制
#定义sources,channel和sinks的名称a1.sources = r1a1.sinks = k1a1.channels = c1#配置source的详情a1.sources.r1.type = exec# 要读取的log路径a1.sources.r1.command = tail -F /opt/flume-log/spring.log# 自定义一个拦截器名 i1a1.sources.r1.interceptors=i1# 设置拦截器类型a1.sources.r1.interceptors.i1.type=regex_extractor# 设置拦截器参数内容a1.sources.r1.interceptors.i1.regex =(Order(.*)) &(dataType(.*)&)# 自定义声明拦截器中的序列化器 s1 s2 (2个)a1.sources.r1.interceptors.i1.serializers = s1 s2#a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializera1.sources.r1.interceptors.i1.serializers.s1.name = orderInfoa1.sources.r1.interceptors.i1.serializers.s2.name = dataType# 把source绑定到channel上a1.sources.r1.channels = c1# 设置管道类型为内存类型a1.channels.c1.type = memory# 设置管道容量大小a1es.channels.c1.capacity = 10#  设置管道吞吐量a1es.channels.c1.transactionCapacity = 10a1es.channels.c1.keep-alive = 10#配置sink的详情# 设置sink发送到elasticsearcha1.sinks.k1.type=elasticsearch# 设置sink的具体执行类#a1.sinks.k1.type=org.apache.flume.sink.elasticsearch.ElasticSearchSink# 设置批处理大小10a1.sinks.k1.batchSize=10a1.sinks.k1.hostNames=172.18.0.11:9300a1.sinks.k1.indexName=order_indexa1.sinks.k1.indexType=ordera1.sinks.k1.clusterName=elasticsearch# 设置sinks序列化的执行类a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer#a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer# 绑定sinks到channel上a1.sinks.k1.channel = c1

flume-console-config.conf 输出到控制台

代码语言:txt
复制
#定义sources,channel和sinks的名称a1.sources = r1a1.sinks = k1a1.channels = c1# 配置源类型a1.sources.r1.type = exec# 配置源命令a1.sources.r1.command = tail -f /opt/flume-log/spring.log# 配置源命令脚本a1.sources.r1.shell = /bin/bash -c# 设置sink类型为loggera1.sinks.k1.type = logger# 使用一个通道来缓冲内存中的事件a1.channels.c1.type = memory# 将源sources和接收器sinks绑定到通道channel a1.sources.r1.channels = c1a1.sinks.k1.channel = c1

关于 Flume 更多参数描述参考:

FlumeUserGuide

在本次示例当中,宿主机中的 log 日志文件也同步到了 Flume 容器当中,虽然看起来操作有点骚,但是你可以学到更多东西,对吧!其实 Fluem Docker 容器是无法对宿主机进行 IO 读写操作的,为了解决这个问题才把 log 文件同步到容器中便于 Flume 操作,以下是图片展示:

此时已经接近配置的尾声了,但是各位应该还记得我们之前修改过的 Flume 中关于 ES 的源码,到现在都没有用到,现在正式开始介绍如何让 Flume 把 log 数据发送到高版本的 Elasticsearch 中。

Flume 集成自定义版本 ES

使用 IDEA 打开从我的 GitHub 下载的源代码,打开 flume-ng-elasticsearch-sink 工程进行 mvn install,然后找到 mvn install 成功后生成的 flume-ng-elasticsearch-sink-1.8.0.jar, 因为我为 maven plugin 配置了把项目依赖的 jar 包都一并打包,所以只要替换 Flume 容器中对应 jar 包即可。

执行命令:

代码语言:txt
复制
docker cp flume-ng-elasticsearch-sink-1.8.0.jar flume:/opt/lib/

最后通过如下命令启动 Flume 即可。

代码语言:txt
复制
$ cd /opt/flume$ bin/flume-ng agent --conf conf/ --name a1 --conf-file /opt/flume-config/flume.conf/config.conf -Dflume.root.logger==INFO,console

ElasticSearch 安装与使用

本文的 ElasticSearch 也是基于 Docker 环境搭建,所以读者可执行如下命令:

代码语言:txt
复制
# 下载对镜像docker pull elasticsearch:7.1.1docker pull mobz/elasticsearch-head:5-alpine# 创建容器并运行docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.1.1docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine

在浏览器中输入 http://localhost:9100/ 可以看到 Flume 已经在 ES 中创建了 index 索引,并将 log 内容也发送了过来。

elasticsearch-head 无法访问 Elasticsearch

ES 与 es-head 是两个独立的进程,当 es-head 访问 ES 服务时,会存在一个跨域问题。所以我们需要修改 ES 的配置文件,增加一些配置项来解决这个问题,如下:

代码语言:txt
复制
[root@localhost /usr/local/elasticsearch-head-master]# cd ../elasticsearch-5.5.2/config/[root@localhost /usr/local/elasticsearch-5.5.2/config]# vim elasticsearch.yml  # 文件末尾加上如下配置http.cors.enabled: truehttp.cors.allow-origin: "*"

修改完配置文件后需重启 ES 服务。

elasticsearch-head 查询报 406 Not Acceptable

解决方法:

  1. 进入 head 安装目录:
代码语言:txt
复制
cd _site/
  1. 编辑 vendor.js 共有两处:
代码语言:txt
复制
#6886行   contentType: "application/x-www-form-urlencoded

改成

代码语言:txt
复制
contentType: "application/json;charset=UTF-8"
代码语言:txt
复制
#7574行 var inspectData = s.contentType === "application/x-www-form-urlencoded" &&

改成

代码语言:txt
复制
var inspectData = s.contentType === "application/json;charset=UTF-8" &&

Granfa 的配置与使用

本文的 Granfa 也是基于 Docker 环境搭建,所以读者可执行如下命令:

代码语言:txt
复制
# 下载对镜像docker pull grafana/grafana:6.3.2# 创建容器并运行docker run -d --name grafana --net mynetwork --ip 172.18.0.12 -p 3000:3000 grafana/grafana:6.3.2

打开浏览器输入以下地址 localhost:3000:

首页配置数据源 add data source

点击 Learn more 可以跳转到 Granfa 使用指南

至此整个环境的部署连调过程完毕!

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019年09月11日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flume 简介
    • Flume 原理
      • 一个简单的例子
      • Flume1.8 与 ES6 集成
      • Docker 的环境准备
        • Flume 的 Linux 环境安装
          • Flume 的 Docker 环境安装
            • Flume 配置文件
              • Flume 集成自定义版本 ES
              • ElasticSearch 安装与使用
                • elasticsearch-head 无法访问 Elasticsearch
                  • elasticsearch-head 查询报 406 Not Acceptable
                  • Granfa 的配置与使用
                  相关产品与服务
                  容器镜像服务
                  容器镜像服务(Tencent Container Registry,TCR)为您提供安全独享、高性能的容器镜像托管分发服务。您可同时在全球多个地域创建独享实例,以实现容器镜像的就近拉取,降低拉取时间,节约带宽成本。TCR 提供细颗粒度的权限管理及访问控制,保障您的数据安全。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档