前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Logstash: 应用实践 - 装载 CSV 文档到 Elasticsearch

Logstash: 应用实践 - 装载 CSV 文档到 Elasticsearch

作者头像
腾讯云大数据
修改2021-01-08 16:25:00
1.1K0
修改2021-01-08 16:25:00
举报

腾讯云 Elasticsearch Service】高可用,可伸缩,云端全托管。集成X-Pack高级特性,适用日志分析/企业搜索/BI分析等场景


在进行我们这个实践之前,相信大家已经安装好自己的 Logstash 环境。如果大家还没安装好Logstash,可以参照我之前的文章 “如何安装Elastic栈中的Logstash”。

Logstash 到底是做什么的?

我们先看一下如下的图:

简单地说,Logstash 就是位于 Data 和 Elasticsearch 之间的一个中间件。Logstash 是一个功能强大的工具,可与各种部署集成。 它提供了大量插件。 它从数据源实时地把数据进行采集,可帮助您解析,丰富,转换和缓冲来自各种来源的数据,并最终把数据传入到Elasticsearch之中。 如果您的数据需要 Beats 中没有的其他处理,则需要将 Logstash 添加到部署中。Logstash 部署于 ingest node 之中。

更具体的描述:

在Logstash之中,它也分为三个部分

  • Inputs
  • Filters
  • Ouput

Input 创建事件,Filter 修改输入事件,然后 Ouput 将它们发送到目标。 Input 和 Output 支持编解码器,使用编解码器,你可以在数据进入或退出管道时进行编码或解码,而不必使用单独的过滤器。 默认情况下,Logstash 在管道(pipeline)阶段之间使用内存中有界队列(输入到过滤器和过滤器到输出)来缓冲事件。 如果Logstash 不安全地终止,则存储在内存中的所有事件都将丢失。 为防止数据丢失,您可以使 Logstash 通过使用持久队列将正在进行的事件持久化到磁盘上。可以通过在 logstash.yml 文件中设置 queue.type: persistent 属性来启用持久队列,该文件位于LOGSTASH_HOME/config 文件夹下。 logstash.yml 是一个配置文件,其中包含与 Logstash 相关的设置。 默认情况下,文件存储在 LOGSTASH_HOME/data /queue 中。 你可以通过在 logstash.yml 中设置 path.queue 属性来覆盖它。

Logstash 配置由一系列输入,过滤器和输出插件及其相应的属性组成。 每个插件在解析,处理和最终以所需格式放置数据方面起着重要作用。 输入插件生成事件,过滤器修改它们,输出将它们发送到其他系统。 Logstash 提供超过 200 个插件,以及创建和贡献自己的灵活性:

在我们安装的 Logstash bin 目录下,我们可以使用如下的命令来获得所有的 plugins:

代码语言:javascript
复制
  $ ./logstash-plugin list

下载 Data

为了能够使得我的练习能够进行,我们必须先得到数据。我们可以到网址 kaggle.com 进行下载。该网站含有大量的数据可以供我们进行下载。我们在该网页搜索 “cars”:

我们可以得到一叫做“Classified Ads for Cars”的 dataset。从上面我们可以看到有大概有 92M 的数据,不多,但是确实也不少啊。对于我们做这个练习来说,确实应该是可以的了。在网页上,我们可以看到这个 csv 文件的所有的数据描述。我们可以先注册一个账号,然后点击 “Download” 按钮,下载这个数据,并解压这个文件。在我们的电脑的 “Downloads” 文件目录下可以看到被解压的文件 “all_anonymized_2015_11_2017_03.csv”。我们可以在我们的 home 目录下创建一个叫做 data 的目录,并把这个文件拷贝过去,并重新命名为 cars.csv。

代码语言:javascript
复制
localhost:data liuxg$ tree.└── cars.csv 0 directories, 1 filelocalhost:data liuxg$ pwd/Users/liuxg/data

我们可以看到在data目录下只有叫做cars.csv的文件。我们可以打开这个文件,我们可以看到如下的内容:

这是一个非常大的文件。在这个文件里,我们可以看到 maker, model, mileage, manufacture 等等字段。

Index CSV 文件到 Elasticsearch

在上一节中,我们已经把我们的数据存入到我们的data目录中。在这节里我们来讲述如何把数据写入到 Elasticsearch 之中。首先,我们可以参阅链接 “Configuring Logstash”。我们需要创建一个属于我们自己的 config 文件。通常一个 config 文件由如下的三个部分组成:

代码语言:javascript
复制
# This is a comment. You should use comments to describe# parts of your configuration.input {  ...} filter {  ...} output {  ...}

我们在之前已经创建好的目录data下创建一个叫做 longstash_cars.config 的文件。我们可以使用我们喜欢的编辑器来编辑这个文件。

logstash_cars.config 文件的内容如下:

代码语言:javascript
复制
input {	file {		path => "/Users/liuxg/data/cars.csv"		start_position => "beginning"		sincedb_path => "/dev/null"	}} filter {	csv {		separator => "," 		columns => [ "maker", "model", "mileage", "manufacture_year", "engine_displacement",		"engine_power", "body_type", "color_slug", "stk_year", "transmission", "door_count",		"seat_count", "fuel_type", "date_created", "date_last_seen", "price_eur" ]	} 	mutate { convert => ["mileage", "integer"] }	mutate { convert => ["price_eur", "float"] }	mutate { convert => ["engine_power", "integer"] }	mutate { convert => ["door_count", "integer"] }	mutate { convert => ["seat_count", "integer"] }} output {	elasticsearch {		hosts => "localhost:9200"		index => "cars"		document_type => "sold_cars"	} 	stdout {}}

这里有几点需要说明的:

  • 在 input 中,我们定义了一个文件,它的path指向我们的 csv 文件的位置。start_position 指向beginning。如果对于一个实时的数据源来说,它通常是 ending,这样表示它每次都是从最后拿到那个数据。sincedb_path 通常指向一个文件。这个文件保存上次操作的位置。针对我们的情况,我们设置为 /dev/null,表明,我们不存储这个数据
  • 在 filter 中,CSV filter 是非常直接的,不太需要很多的解释。这里的 column 都来自于我们的 csv 表格。通常 Logstash 会把每个数据都当做是字符串。针对我们的情况,我们可看到 mileage 是一个整型数,price_eur 是一个浮点数。这些我们都需要进行调整。这个调整我们可以通过 mutate 来完成
  • 在 output 之中,我们制定本地的 Elasticsearch 为我们的数据库,它的 index 是 cars,同时 document_type 为_doc。我们也同时使用 stdout,这样我们可以在terminal屏幕中看出数据在处理之中

装载数据到 Elasticsearch

我们首先进入到 Logstash 的安装目录,然后打入如下的命令:

代码语言:javascript
复制
sudo ./bin/logstash -f ~/data/logstash_cars.config

提示:在运行 Logstash 时使用 -r 标志可让您在更改和保存配置后自动重新加载配置。 在测试新配置时,这将很有用,因为你可以对其进行修改,这样就不必在每次更改配置时都手动启动Logstash。

然后,我们可以在屏幕上看到如下的输出:

同时如果我们这在 Kibana 上可以看到正在 index 的 cars index数量(count)是一直变化的。

因为这是一个很大的文件,所以建立索引需要一段时间,而且我的电脑也将会是非常的热。

经过一段时间的运行,我们可以看到屏幕上不再滚动了,表明Logstash已经完成了数据的传输。我们可以在 Kibana上打入如下的命令:

代码语言:javascript
复制
GET cars/_stats

我看可以看到我们的cars Index的统计数据:

我们可以看到共有 3,167,984 条数据,并且它的大小有 2.466G 这么大的数据。

我们可以打入如下的命令:

代码语言:javascript
复制
GET cars

我们可以看到 Elasticsearch 已经为我们创建好了 mapping,并且它们的数据类型也可以看到:

代码语言:javascript
复制
{  "cars" : {    "aliases" : { },    "mappings" : {      "properties" : {        "@timestamp" : {          "type" : "date"        },     ...        "color_slug" : {          "type" : "text",          "fields" : {            "keyword" : {              "type" : "keyword",              "ignore_above" : 256            }          }        },        "door_count" : {          "type" : "long"        },        "engine_power" : {          "type" : "long"        },        "fuel_type" : {          "type" : "text",          "fields" : {            "keyword" : {              "type" : "keyword",              "ignore_above" : 256            }          }        },        "host" : {          "type" : "text",          "fields" : {            "keyword" : {              "type" : "keyword",              "ignore_above" : 256            }          }        },    ...    },    "settings" : {      "index" : {        "creation_date" : "1567949013407",        "number_of_shards" : "1",        "number_of_replicas" : "1",        "uuid" : "tYT-XWpFTGiqwn_rMX4S3A",        "version" : {          "created" : "7030099"        },        "provided_name" : "cars"      }    }  }}

我们可以看到 door_count 及 engine_power 都已经修改为 long 数据类型了。在默认的情况下已经为我们创建了一个 replica。

在 Kibana 上显示数据

我们打开我们的 Kibana。我们首先现在我们的 Kibana 进行如下的操作:

我们建立一个叫做 cars* 的 index pattern。

最后我的到如下的画面:

再接下来,我们选择 Discover(左边最上面的那个图标)来装载我们的数据:

我们可以选择最近一个小时的数据的数据。我们立马可以看到一个统计图,显示在每个时间有多少个数据进来。

我们也可以做一个报表通过点击 “add” 按钮选择 maker, fuel_type,price_eur 及

最终我们看到如下的一个列表:

创建 Visualization

我们选择创建一个 Visualization:

我们选择 “Pie” Chart:

我们可以按照如下的选择生产一个 top 10 的生产厂商

按照如下的方法可以生产 top 10 的汽车型号:

生成最终 Dashboard


最新活动

包含文章发布时段最新活动,前往ES产品介绍页,可查找ES当前活动统一入口

Elasticsearch Service自建迁移特惠政策>>

Elasticsearch Service 新用户特惠狂欢,最低4折首购优惠 >>

Elasticsearch Service 企业首购特惠,助力企业复工复产>>

关注“腾讯云大数据”公众号,技术交流、最新活动、服务专享一站Get~

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 【腾讯云 Elasticsearch Service】高可用,可伸缩,云端全托管。集成X-Pack高级特性,适用日志分析/企业搜索/BI分析等场景
  • Logstash 到底是做什么的?
  • 下载 Data
  • Index CSV 文件到 Elasticsearch
  • 装载数据到 Elasticsearch
  • 在 Kibana 上显示数据
  • 创建 Visualization
  • 生成最终 Dashboard
  • 最新活动
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档