模拟产生log4j日志
jar包依赖 pom.xml
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.8.0</version>
</dependency>
java代码 LoggerGenerator.java
public class LoggerGenerator {
private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());
public static void main(String[] args) throws Exception{
int index = 0;
while(true) {
Thread.sleep(1000);
logger.info("value : " + index++);
}
// $ kafka-topics.sh --list --zookeeper 127.0.0.1:2181
}
}
log4j.properties配置
log4j.rootLogger=INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 127.0.0.1
log4j.appender.flume.Port = 44444
log4j.appender.flume.UnsafeMode = true
提前创建好topic【不是必须的】 flume-ng启动后,启动一个kafka console consulmer观察数据
$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
$ kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic default_flume_topic
前面文章用过的avro-memory-kafka.conf
# avro-memory-kafka.conf
# Name the components on this agent
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = momory-channel
# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = 127.0.0.1
avro-memory-kafka.sources.avro-source.port = 44444
# Describe the sink
# Must be set to org.apache.flume.sink.kafka.KafkaSin
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = 127.0.0.1:9092
avro-memory-kafka.sinks.kafka-sink.kafka.topic = default_flume_topic
# Use a channel which buffers events in memory
avro-memory-kafka.channels.momory-channel.type = memory
avro-memory-kafka.channels.momory-channel.capacity = 1000
avro-memory-kafka.channels.momory-channel.transactionCapacity = 100
# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = momory-channel
avro-memory-kafka.sinks.kafka-sink.channel = momory-channel
启动flume-ng
$ nohup flume-ng agent --conf conf --conf-file conf/avro-memory-kafka.conf --name avro-memory-kafka > avro-memory-kafka.out 2>&1 &
$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic default_flume_topic --from-beginning --new-consumer
topic = 'kafka_streaming_topic'
brokers = "127.0.0.1:9092"
spark = SparkSession.builder.appName("log4j-flume-kafka-structured-streaming").getOrCreate()
lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", topic).option("startingOffsets", """{"%s":{"0": 7}}""" % topic).load().selectExpr("CAST(value AS STRING)")
# 自定义处理传输的数据-比如JSON串
words = lines.select(
explode(
split(lines.value, ' ')
).alias('word')
)
word_counts = words.groupBy('word').count()
query = word_counts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()