前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka系统与ELK的整合(八)

Kafka系统与ELK的整合(八)

作者头像
无涯WuYa
发布2022-01-18 16:32:46
4940
发布2022-01-18 16:32:46
举报

我们使用Apache Flume来采集数据到Kafka中进行存储,最后在ELK中展示出来。到http://flume.apache.org/的地址下载Apache Flume,下载后部署在日志的服务器。下载后进行解压以及配置到环境变量中。整体思路是在拉勾网搜索“测试开发工程师”,把获取到的结果信息存储到Kafka的系统中,最后展示在ELK中。下面具体配置这些信息。在conf的目录下编辑文件,文件内容为:

代码语言:javascript
复制
#设置代理名
agent.sources=s1
agent.channels=c1
agent.sinks=k1

#设置收集方式
agent.sources.s1.type=exec
agent.sources.s1.command=tail -F  /Applications/devOps/bigData/ELK/apache-flume/logs/apps.log
agant.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
apage.channels.c1.transactionCapacity=100
#设置kafka接收器
agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
#设置kafka的broker和端口号
agent.sinks.k1.brokerList=localhost:9092
#设置kafka的topic
agent.sinks.k1.topic=laGou
#设置序列化
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
#指定管道名
agent.sinks.k1.channel=c1

这里使用的主题是laGou,切记此时需要启动Kafka。下来启动Apache Flume,在apache-flume/bin的执行如下命令来启动,命令为:

代码语言:javascript
复制
flume-ng agent -n agent --conf conf --conf-file ../conf/flume-kafka.properties  -Dflume.root.logger=DEBUG,CONSOLE

执行后,输出如下的信息:

下来使用分流数据的方式来实现数据的展示,具体可以理解为把采集到的数据存储到Kafka系统中,然后使用LogStash来消费Kafka存储的数据,并将消费后的数据存储到ElasticSearch中。下来配置logstash.yml的文件,配置LogStash账户和密码,具体如下:

配置kafka_laGou.conf,具体内容为:

配置完成后,在控制台中LogStach来消费Kafka集群中主题为laGou的数据,到LogStash的bin目录下执行:

代码语言:javascript
复制
./logstash -f ../config/kafka_laGou.conf

执行后,LogStash的Agent将正常启动并消费Kafka集群中的数据,然后把消费后的数据存储到ElasticSearch集群中,执行后,输出如下信息:

代码语言:javascript
复制
Sending Logstash's logs to /Applications/devOps/bigData/ELK/logstash/logs which is now configured via log4j2.properties
[2021-06-12T18:39:43,175][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-06-12T18:39:43,210][FATAL][logstash.runner          ] Logstash could not be started because there is already another instance using the configured data directory.  If you wish to run multiple instances, you must change the "path.data" setting.
[2021-06-12T18:39:43,221][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit
localhost:bin liwangping$ clear
localhost:bin liwangping$ ./logstash -f ../config/kafka_laGou.conf
Sending Logstash's logs to /Applications/devOps/bigData/ELK/logstash/logs which is now configured via log4j2.properties
[2021-06-12T18:40:31,712][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-06-12T18:40:32,136][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.3.2"}
[2021-06-12T18:40:33,674][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2021-06-12T18:40:34,092][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elastic:xxxxxx@localhost:9200/]}}
[2021-06-12T18:40:34,111][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://elastic:xxxxxx@localhost:9200/, :path=>"/"}
[2021-06-12T18:40:34,426][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://elastic:xxxxxx@localhost:9200/"}
[2021-06-12T18:40:34,505][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}
[2021-06-12T18:40:34,508][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
[2021-06-12T18:40:34,528][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2021-06-12T18:40:34,544][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2021-06-12T18:40:34,561][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2021-06-12T18:40:34,584][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x53f09319 run>"}
[2021-06-12T18:40:34,670][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash
[2021-06-12T18:40:34,676][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2021-06-12T18:40:34,691][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = logstash-0
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = console-consumer-83756
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[2021-06-12T18:40:34,797][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 1.1.0
[2021-06-12T18:40:34,798][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : fdcf75ea326b8e07
[2021-06-12T18:40:35,011][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: E0qvXyu_T_Wr_vZgZUV80w
[2021-06-12T18:40:35,024][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
[2021-06-12T18:40:35,029][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Revoking previously assigned partitions []
[2021-06-12T18:40:35,029][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] (Re-)joining group
[2021-06-12T18:40:35,047][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2021-06-12T18:40:35,149][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Successfully joined group with generation 1
[2021-06-12T18:40:35,151][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Setting newly assigned partitions [laGou-0, laGou-1, laGou-2, laGou-3, laGou-4, laGou-5]
[2021-06-12T18:40:35,168][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-0 to offset 1.
[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-1 to offset 1.
[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-2 to offset 1.
[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-3 to offset 1.
[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-4 to offset 1.
[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-5 to offset 0.

此时,在Kafka的监控系统中可以看到主题laGou消费的详细信息,如下所示:

下来实现数据的可视化,把数据存储到ElasticSearch的集群后,就可以通过Kibana来查询和分析数据。在ManageMent里面创建索引后,点击Discover模块,然后就会展示消费到的拉勾网的测试开发职位的数据,如下所示:

可以使用不同的索引来查询,比如使用message来查询,就会显示如下的信息:

当然也可以点击查看完整的数据,点击向右的箭头,就可以使用table格式和JSON格式来展示具体的数据。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-06-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python自动化测试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档