Flume整合Kafka实时收集日志信息

Linux系统查看文件内容的特殊方法:

最基本的有cat和less,more,如果有特殊的要求的话。 1/如果只想看文件的前5行,可以使用head命令,如: head -5 /etc/passwd

2/如果想查看文件的后10行,可以使用tail命令,如: tail -10 /etc/passwd

3/参数-f使tail不停地去读最新的内容,这样有实时监视的效果: tail -f /var/log/messages

定时调度工具的使用

1/各种工具聚集的网站:https://tool.lu/crontab 2/linux crontab 定时crontab -e 然后在里面编辑:*/1 * * * *    //1代表1分钟 3/vi log_generator.sh 4/把模拟生产日志的脚本generate_log.py执行脚本放进去: python /home/hadoop/data/project/generate_log.py 5/添加sh执行权限 chmod u+x log_generator.sh 6/验证日志能否输出,在日志文件生成的文件目录下执行:tail -200f logs/access.log,定时监控

应用服务器产生access.log ==> 控制台输出

1/Flume配置:exec +memory +logger

2/配置文件accesslog_to_logger.conf (exec-memory-logger):先输出到控制台测试一下

exec-memory-logger.sources = exec-source
exec-memory-logger.sinks = logger-sink
exec-memory-logger.channels = memory-channel

exec-memory-logger.sources.exec-source.type = exec
exec-memory-logger.sources.exec-source.command = tail -F /home/hadoop/data/project/logs/access.log
exec-memory-logger.sources.exec-source.shell = /bin/sh -c

exec-memory-logger.channels.memory-channel.type = memory

exec-memory-logger.sinks.logger-sink.type = logger

exec-memory-logger.sources.exec-source.channels = memory-channel
exec-memory-logger.sinks.logger-sink.channel = memory-channel

3/启动flume-ng agent

flume-ng agent \
--name exec-memory-logger \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/accesslog_to_logger.conf  \
-Dflume.root.logger=INFO,console

4/每隔1分钟即可在Flume控制台看到日志输出

日志文件==>Flume==>Kafka

1/启动zk:./zkServer.sh start 2/启动Kafka Server:kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

3/修改Flume配置文件使得flume sink数据到Kafka

选型:exec-memory-kafka
type:org.apache.flume.sink.kafka.KafkaSink
brokerList、topic、requiredAck、batchSize

accesslog_to_kafka.conf

exec-memory-kafka.sources = exec-source
exec-memory-kafka.sinks = kafka-sink
exec-memory-kafka.channels = memory-channel

exec-memory-kafka.sources.exec-source.type = exec
exec-memory-kafka.sources.exec-source.command = tail -F /home/hadoop/data/project/logs/access.log
exec-memory-kafka.sources.exec-source.shell = /bin/sh -c

exec-memory-kafka.channels.memory-channel.type = memory

exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop:9092
exec-memory-kafka.sinks.kafka-sink.topic = flume-kafka-streaming-topic
exec-memory-kafka.sinks.kafka-sink.batchSize = 5
exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1

exec-memory-kafka.sources.exec-source.channels = memory-channel
exec-memory-kafka.sinks.kafka-sink.channel = memory-channel

4/启动flume-ng agent

flume-ng agent \
--name exec-memory-kafka \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/accesslog_to_kafka.conf \
-Dflume.root.logger=INFO,console

5/启动kafka消费者进行消费 kafka-console-consumer.sh --zookeeper hadoop:2181 --topic flume-kafka-streaming-topic 6/代码消费 hadoop:2181 test flume-kafka-streaming-topic 1

package com.feiyue.bigdata.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FlumeKafkaStreamingTest {
  def main(args: Array[String]): Unit = {

    if (args.length != 4) {
      println("Usage: FlumeKafkaStreamingTest <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumeKafkaStreamingTest")

    val ssc = new StreamingContext(sparkConf, Seconds(60))
    val Array(zkQuorum, group, topics, numThreads) = args

    val topicsMap = topics.split(",").map((_, numThreads.toInt)).toMap

    val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicsMap)

    messages.map(_._2).count().print()

    ssc.start()
    ssc.awaitTermination()
  }

}

map(_._2) 等价于 map(t => t.2) //t是个2项以上的元组 map(._2, _) 等价与 map(t => t._2, t) //这会返回第二项为首后面项为旧元组的新元组

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Hadoop实操

如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS

注意:Fayson的github调整为:https://github.com/fayson/cdhproject,本文的代码在github中也能找到。

1.7K3
来自专栏Hadoop实操

如何使用Oozie API接口向Kerberos集群提交Java程序

在CDH集群外的节点向集群提交MapReduce作业的方式有多种,前面Fayson介绍了《如何跨平台在本地开发环境提交MapReduce作业到CDH集群》和《如...

9597
来自专栏Hadoop实操

如何使用Oozie API接口向Kerberos环境的CDH集群提交Spark2作业

前面Fayson介绍了多种方式在CDH集群外的节点向集群提交Spark作业,文章中均采用Spark1来做为示例,本篇文章主要介绍如何是用Oozie API向Ke...

1K4
来自专栏Hadoop实操

如何给Hadoop集群划分角色

Fayson在之前的文章中介绍过《CDH网络要求(Lenovo参考架构)》,《如何为Hadoop集群选择正确的硬件》和《CDH安装前置准备》,而我们在搭建Had...

5247
来自专栏Jed的技术阶梯

zookeeper编程02-服务器上下线动态感知

NameNode判断DataNode是否下线的时间太长了,利用zookeeper实现服务器上下线动态感知

1722
来自专栏分布式系统和大数据处理

使用.net通过odbc访问Hive

在 写入数据到Hive表(命令行) 这篇文章中,我们通过命令行的方式和hive进行了交互。但在通常情况下,是通过编程的方式来操作Hive,Hive提供了JDBC...

2514
来自专栏Hadoop实操

如何在CDH中安装Kudu&Spark2&Kafka

在CDH的默认安装包中,是不包含Kafka,Kudu和Spark2的,需要单独下载特定的Parcel包才能安装相应服务。本文档主要描述在离线环境下,在CentO...

1.6K9
来自专栏蓝天

Hadoop-2.8.0分布式安装手册

10.12.154.79: Error: JAVA_HOME is not set and could not be found.

3182
来自专栏Hadoop实操

如何使用Cloudera Manager在线为集群减容

在Hadoop集群资源紧张的情况下可以在线扩容来提升集群的计算能力,具体参考Fayson前面的文章《如何在非Kerberos环境下对CDH进行扩容》,那么在集群...

1.1K7
来自专栏Hadoop实操

如何使用Hue创建Spark2的Oozie工作流(补充)

目前Oozie 的 SparkAction 仅支持Spark1.6, 而并不支持Spark2, 这是 CDH Spark2已知的局限性(https://www....

9326

扫码关注云+社区