1、通过实验掌握Structured Streaming的基本编程方法; 2、掌握日志分析的常规操作,包括拆分日志方法和分析场景。
1、通过Socket传送Syslog到Spark
日志分析是一个大数据分析中较为常见的场景。在Unix类操作系统里,Syslog广泛被应用于系统或者应用的日志记录中。Syslog通常被记录在本地文件内,也可以被发送给远程Syslog服务器。Syslog日志内一般包括产生日志的时间、主机名、程序模块、进程名、进程ID、严重性和日志内容。
日志一般会通过Kafka等有容错保障的源发送,本实验为了简化,直接将Syslog通过Socket源发送。新建一个终端,执行如下命令:
$ tail -n+1 -f /var/log/syslog | nc -lk 9988
“tail -n+1 -f /var/log/syslog”表示从第一行开始打印文件syslog的内容。“-f”表示如果文件有增加则持续输出最新的内容。然后,通过管道把文件内容发送到nc程序(nc程序可以进一步把数据发送给Spark)。
如果/var/log/syslog内的内容增长速度较慢,可以再新开一个终端(计作“手动发送日志终端”),手动在终端输入如下内容来增加日志信息到/var/log/syslog内:
$ logger ‘I am a test error log message.’
2、对Syslog进行查询
由Spark接收nc程序发送过来的日志信息,然后完成以下任务:
(1)统计CRON这个进程每小时生成的日志数,并以时间顺序排列,水印设置为1分钟。 (2)统计每小时的每个进程或者服务分别产生的日志总数,水印设置为1分钟。 (3)输出所有日志内容带error的日志。
分析日志是一个大数据分析中较为常见的场景。在Unix类操作系统里,Syslog广泛被应用于系统或者应用的日志记录中。Syslog通常被记录在本地文件内,也可以被发送给远程Syslog服务器。Syslog日志内一般包括产生日志的时间、主机名、程序模块、进程名、进程ID、严重性和日志内容。
日志一般会通过kafka等有容错保障的源发送,本实验为了简化,直接将syslog通过Socket源发送。新开一个终端,命令为“tail终端”,输入
tail -n+1 -f /var/log/syslog | nc -lk 9988
tail命令加-n+1代表从第一行开始打印文件内容。-f代表如果文件有增加则持续输出最新的内容。通过管道发送到nc命令起的在本地9988上的服务上。 如果/var/log/syslog内的内容增长速度较慢,可以再新开一个终端,命名为“手动发送log终端”,手动在终端输入
logger ‘I am a test error log message.’
来增加日志信息到/var/log/syslog内。
Syslog每行的数据类似以下:
Nov 24 13:17:01 spark CRON[18455]: (root) CMD (cd / && run-parts --report /etc/cron.hourly)
最前面为时间,接着是主机名,进程名,可选的进程ID,冒号后是日志内容。在Spark内,可以使用正则表达式对syslog进行拆分成结构化字段,以下是示例代码:
# 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段
fields = partial(
regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$"
)
words = lines.select(
to_timestamp(format_string('2019 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),
fields(idx=2).alias("hostname"),
fields(idx=3).alias("tag"),
fields(idx=4).alias("content"),
)
to_timestamp(format_string('2018 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),这句是对Syslog格式的一个修正,因为系统默认的Syslog日期是没有年的字段,所以使用format_string函数强制把拆分出来的第一个字段前面加上2019年,再根据to_timestamp格式转换成timestamp字段。在接下来的查询应当以这个timestamp作为事件时间。
由Spark接收nc程序发送过来的日志信息,然后完成以下任务。
(1)统计CRON这个进程每小时生成的日志数,并以时间顺序排列,水印设置为1分钟。
在新开的终端内输入 vi spark_exercise_testsyslog1.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。
#!/usr/bin/env python3
from functools import partial
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("StructuredSyslog") \
.getOrCreate()
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9988) \
.load()
# Nov 24 13:17:01 spark CRON[18455]: (root) CMD ( cd / && run-parts --report /etc/cron.hourly)
# 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段
fields = partial(
regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$"
)
words = lines.select(
to_timestamp(format_string('2019 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),
fields(idx=2).alias("hostname"),
fields(idx=3).alias("tag"),
fields(idx=4).alias("content"),
)
# (1). 统计CRON这个进程每小时生成的日志数,并以时间顺序排列,水印设置为1分钟。
windowedCounts1 = words \
.filter("tag = 'CRON'") \
.withWatermark("timestamp", "1 minutes") \
.groupBy(window('timestamp', "1 hour")) \
.count() \
.sort(asc('window'))
# 开始运行查询并在控制台输出
query = windowedCounts1 \
.writeStream \
.outputMode("complete") \
.format("console") \
.option('truncate', 'false')\
.trigger(processingTime="3 seconds") \
.start()
query.awaitTermination()
(2)统计每小时的每个进程或者服务分别产生的日志总数,水印设置为1分钟。
在新开的终端内输入 vi spark_exercise_testsyslog2.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。
#!/usr/bin/env python3
from functools import partial
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("StructuredSyslog") \
.getOrCreate()
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9988) \
.load()
# Nov 24 13:17:01 spark CRON[18455]: (root) CMD ( cd / && run-parts --report /etc/cron.hourly)
# 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段
fields = partial(
regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$"
)
words = lines.select(
to_timestamp(format_string('2019 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),
fields(idx=2).alias("hostname"),
fields(idx=3).alias("tag"),
fields(idx=4).alias("content"),
)
# (2). 统计每小时的每个进程或者服务分别产生的日志总数,水印设置为1分钟。
windowedCounts2 = words \
.withWatermark("timestamp", "1 minutes") \
.groupBy('tag', window('timestamp', "1 hour")) \
.count() \
.sort(asc('window'))
# 开始运行查询并在控制台输出
query = windowedCounts2 \
.writeStream \
.outputMode("complete") \
.format("console") \
.option('truncate', 'false')\
.trigger(processingTime="3 seconds") \
.start()
query.awaitTermination()
(3)输出所有日志内容带error的日志。
在新开的终端内输入 vi spark_exercise_testsyslog3.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。
#!/usr/bin/env python3
from functools import partial
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("StructuredSyslog") \
.getOrCreate()
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9988) \
.load()
# Nov 24 13:17:01 spark CRON[18455]: (root) CMD ( cd / && run-parts --report /etc/cron.hourly)
# 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段
fields = partial(
regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$"
)
words = lines.select(
to_timestamp(format_string('2019 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),
fields(idx=2).alias("hostname"),
fields(idx=3).alias("tag"),
fields(idx=4).alias("content"),
)
# (3). 输出所有日志内容带error的日志。
windowedCounts3 = words \
.filter("content like '%error%'")
# 开始运行查询并在控制台输出
query = windowedCounts3 \
.writeStream \
.outputMode("update") \
.format("console") \
.option('truncate', 'false')\
.trigger(processingTime="3 seconds") \
.start()
query.awaitTermination()
Spark Structured Streaming 是 Spark 提供的用于实时流处理的 API,它提供了一种统一的编程模型,使得批处理和流处理可以共享相同的代码逻辑,让开发者更容易地实现复杂的实时流处理任务。通过对 Structured Streaming 的实验,有以下体会:
通过实验和实践,更深入地理解 Structured Streaming 的特性和工作原理,掌握实时流处理的开发技巧和最佳实践,为构建稳健可靠的实时流处理应用打下坚实基础。
Syslog 是一种常用的日志标准,它定义了一个网络协议,用于在计算机系统和网络设备之间传递事件消息和警报。通过对 Syslog 的实验,有以下体会:
通过实验和实践,更深入地了解 Syslog 的工作原理和应用场景,学会如何配置和使用 Syslog,掌握日志收集、存储、分析和可视化的技巧和最佳实践,为构建高效、可靠、安全的日志管理系统打下坚实基础。