ELK
数据管道本章将使用ELK技术栈来构建第一条基本的数据管道。这样可以帮助我们理解如何将ELK技术栈的组件简单地组合到一起来构建一个完整的端到端的分析过程
google
每天的股票价格数据下载地址:https://finance.yahoo.com/q/hp?s=GOOG
Date
(日期)、Open Price
(开盘价)、Close Price
(收盘价)、High Price
(最高价)、Volume
(成交量)和Adjusted Price
(调整价格)Logstash
的输入tail -0f
input {
file {
path => "文件路径(必选项)"
start_position => "读取数据的开始位置"
tags => "任意字符串数组,能在随后针对事件做一些过滤和处理"
type => "标记事件的特定类型"
}
}
path
:文件输入插件唯一必填的配置项start_position
:从源文件读取数据的开始位置,可以是beginning
或end
。默认是end
,这样可以满足读取活动的流数据的场景需求。如果需要读取历史数据,可以设置为beginning
tags
:可以是任意数量的字符串数组,在随后基于tags
来针对事件做一些过滤和处理type
:标记事件的特定类型,可以在随后的过滤和搜索中有所帮助 。type
字段会保存在es
的文档中,并通过kibana
的_type
字段来进行展现如,可以将type设置为error_log或者info_logs
input {
file {
path => "/opt/logstash/input/GOOG.csv"
start_position => "beginning"
}
}
因为是历史数据,所以需要设置start_position为beginning
CSV
文件,所以可以使用csv
过滤插件。csv
过滤器可以对csv
格式的数据提取事件的字段进行解析并独立存储filter {
csv {
columns => #字段名数组
separator => # 字符串;默认值,
}
}
columns
属性指定了csv
文件中的字段的名字,可选项。默认将字段命名为column1
、column2
等等separator
属性定义了输入文件中用来分割不同字段的分割符。默认是逗号,也可以是其他任意的分割符filter {
csv {
columns => ["date_of_record", "open", "high", "low", "close", "volumn", "adj_close"]
separator => ","
}
}
Logstash
中有一个叫date
的过滤器可以完成上述任务filter {
date {
match => # 默认值是[]
target => # 默认值是@timestamp
timezone => ","
}
}
match
:是一个[域,格式],可为每个字段设置一种格式timestamp
:在上述例子中,我们采用了历史数据,不希望使用时间捕获时的时间作为@timestamp
,而是使用记录生成时的时间,所以我们将date
字段映射为@timestamp
。这不是强制的,但建议这样做mutate
过滤器将字段转换为指定的数据类型,这个过滤器可以用于对字段做各种常见的修改,包括修改数据类型、重命名、替换和删除字段。另外也可以用来合并两个字段、转换大小写、拆分字段等等date
过滤器可以配置如下date {
match => ["date_of_record", "yyyy-MM-dd"]
target => "@timestamp"
}
@timestamp
,而是使用记录生成时的时间,所以我们将date
字段映射为@timestamp
,这不是强制的,但建议这样做mutate
过滤器将字段转换为指定的数据类型。这个过滤器可以用于对字段做各种常见的修改,包括修改数据类型、重命名、替换和删除字段。另外也可以用来合并两个字段、转换大小写、拆分字段等等filter {
mutate {
convert => # 列以及数据类型的Hash值(可选项)
join => # 用于关联的列的Hash值(可选项)
lowercase => # 用于转换的字段数组
merge => # 用于合并的字段的Hash值
rename => # 用于替换的字段的Hash值
replace => # 用于替换的字段的Hash值
split => # 用于分割的字段的Hash值
strip => # 字段数组
uppercase => # 字段数组
}
}
mutate {
convert => ["open", "float"]
convert => ["high", "float"]
convert => ["low", "float"]
convert => ["close", "float"]
convert => ["volume", "float"]
convert => ["adj_close", "float"]
}
convert
功能来将价格和成交量字段转换为浮点数和整数类型Elasticsearch
Logstash
的CSV
过滤器(用来处理数据),并且已根据数据类型对数据进行解析和处理。接下来将处理后的数据存储到Elasticsearch
,以便对不同字段做索引,这样后续就可以使用Kibana
来展现output {
elasticsearch {
action => # 字符串(可选项),默认值:"index"(索引),delete(根据文档ID删除文档)
cluster => # 字符串(可选项),集群名字
hosts=> # 字符串(可选项)
index=> # 字符串(可选项),默认值:"logstash-%{+YYYY.MM.dd}"
index_type => # 字符串(可选项),事件写入的索引类型,确保相同类型的事件写入相同类型的索引
port => # 字符串(可选项)
protocol => # 字符串,协议类型,取值为["node","transport","http"]
}
}
Logstash
配置input {
file {
path => "/GOOG.csv"
start_position => "beginning"
}
}
filter {
csv {
columns => ["date_of_record","open","high","low","close","volume","adj_close"]
separator => ","
}
date {
match => ["date_of_record", "yyyy-MM-dd"]
}
mutate {
convert => ["open", "float"]
convert => ["high", "float"]
convert => ["low", "float"]
convert => ["close", "float"]
convert => ["volume", "float"]
convert => ["adj_close", "float"]
}
}
output {
elasticsearch {
hosts=>"localhost"
}
}
logstash.conf
bin/logstash -f logstash.conf
Kibana
可视化http://localhost:5601
,默认使用logstash-
*索引bin/kibana
Time Filter
),根据数据的日期范围来设置绝对时间过滤器Settings
页面链接,然后选择屏幕左边的logstash-
*索引模式Kibana
主页上方点击可视化(Visualize
)页面链接,然后点击新建可视化的图标Kibana
接口来实现GOOG
每周收盘价的指数趋势Y-Axis
)的聚合函数为Max
,字段为close
。在桶(buckets
)的区域,选择聚合(Aggregation
)为基于@timestamp
字段的日期直方图(Date Histogram
),间隔(Interval
)选择每周(Weekly
),点击应用(Apply
)。将上述折线图保存并命名,随后可在仪表盘中使用Sum
,字段为volume
。在桶的区域,选择X轴的聚合函数为基于@timestamp
字段的日期直方图,间隔选择每周Max
,字段为volume
,然后点击应用split rows
),选择度量值 的聚合函数为求平均值 (Average
),字段为volume
。在桶的区域,选择聚合函数为基于@timestamp
字段的日期直方图,间隔为月度(Monthly
)Add Visualization
)链接,选择已保存的可视化组件并在仪表盘页面上进行布局