前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >打通实时流处理log4j-flume-kafka-structured-streaming

打通实时流处理log4j-flume-kafka-structured-streaming

作者头像
王知无-import_bigdata
发布2020-05-20 15:59:45
5580
发布2020-05-20 15:59:45
举报

模拟产生log4j日志

jar包依赖 pom.xml

代码语言:javascript
复制
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.flume.flume-ng-clients</groupId>
    <artifactId>flume-ng-log4jappender</artifactId>
    <version>1.8.0</version>
</dependency>

java代码 LoggerGenerator.java

代码语言:javascript
复制
public class LoggerGenerator {

    private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());

    public static void main(String[] args) throws Exception{

        int index = 0;
        while(true) {
            Thread.sleep(1000);
            logger.info("value : " + index++);
        }

        // $ kafka-topics.sh --list --zookeeper 127.0.0.1:2181
    }
}

log4j.properties配置

代码语言:javascript
复制
log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n


log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 127.0.0.1
log4j.appender.flume.Port = 44444
log4j.appender.flume.UnsafeMode = true
kafka broker启动

提前创建好topic【不是必须的】 flume-ng启动后,启动一个kafka console consulmer观察数据

代码语言:javascript
复制
$ kafka-server-start.sh $KAFKA_HOME/config/server.properties

$ kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic default_flume_topic
flume-ng配置和启动

前面文章用过的avro-memory-kafka.conf

代码语言:javascript
复制
# avro-memory-kafka.conf

# Name the components on this agent
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = momory-channel

# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = 127.0.0.1
avro-memory-kafka.sources.avro-source.port = 44444

# Describe the sink
# Must be set to org.apache.flume.sink.kafka.KafkaSin
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = 127.0.0.1:9092
avro-memory-kafka.sinks.kafka-sink.kafka.topic = default_flume_topic

# Use a channel which buffers events in memory
avro-memory-kafka.channels.momory-channel.type = memory
avro-memory-kafka.channels.momory-channel.capacity = 1000
avro-memory-kafka.channels.momory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = momory-channel
avro-memory-kafka.sinks.kafka-sink.channel = momory-channel

启动flume-ng

代码语言:javascript
复制
$ nohup flume-ng agent --conf conf --conf-file conf/avro-memory-kafka.conf --name avro-memory-kafka > avro-memory-kafka.out 2>&1 &

$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic default_flume_topic --from-beginning --new-consumer
spark structured streaming实时流处理
代码语言:javascript
复制
topic = 'kafka_streaming_topic'
brokers = "127.0.0.1:9092"

spark = SparkSession.builder.appName("log4j-flume-kafka-structured-streaming").getOrCreate()

lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", topic).option("startingOffsets", """{"%s":{"0": 7}}""" % topic).load().selectExpr("CAST(value AS STRING)")

# 自定义处理传输的数据-比如JSON串
words = lines.select(
    explode(
        split(lines.value, ' ')
    ).alias('word')
)
word_counts = words.groupBy('word').count()

query = word_counts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • kafka broker启动
  • flume-ng配置和启动
  • spark structured streaming实时流处理
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档