前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Elasticsearch系列之六】通过logstash迁移ES数据

【Elasticsearch系列之六】通过logstash迁移ES数据

原创
作者头像
Vicwan
发布2020-04-22 17:07:26
9.6K1
发布2020-04-22 17:07:26
举报

Logstash

1.1、适合场景

数据体量不大,需要在线数据同步的场景(实际使用的是scroll,是执行瞬间的es快照,近实时的数据同步)。

1.2、logstash架构简介

logstash整体架构如上图 ,箭头代表数据流向,可以有多个input,中间的queue负责将数据分发到不通的pipline中,每个pipline由batcher,filter和output构成。batcher的作用是批量从queue中取数据,这个值可以通过配置进行设置。

1.3、Logstash处理流程

Logstash处理流程大致可分为3个阶段,Input---->Filter---->Output(数据采集----->数据分析/解析---->数据输出),具体的处理流程可以查看下图:

解释一下codec的概念,Codec 是 logstash 从 1.3.0 版开始新引入的概念(Codec 来自 Coder/decoder 两个单词的首字母缩写)。在此之前,logstash 只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以在输入处理不同类型的数据,这全是因为有了 codec 设置。所以,Logstash不只是一个input | filter | output 的数据流,而是一个 input | decode | filter | encode | output 的数据流,codec 就是用来decode、encode事件的。

1.4、Queue

使用Queue以后,我们首先要关注的是怎么确保数据是正常被消费掉了,这里到达output以后会有发送一个ACK给Queue,来告诉Queue这些Logstash Event已经处理完了,这样就能确保一个数据正常被消费掉,这也是在消息队列中我们常用的一种手段。

1) Queue分类:

a) In Memory

在内存中,固定大小,无法处理进程crash、机器宕机等情况,会导致数据丢失。

b) Persistent Queue In Disk

可处理进程crash情况,保证数据不丢失,充当缓冲区,可代替kafka等消息队列作用。

2) Persistent Queue(PQ)处理流程:

a) 一条数据经由input进入PQ,PQ将数据备份在disk,然后PQ响应input表示已收到数据;

b) 数据从PQ到达filter/output,其处理到事件后返回ACK到PQ;

c) PQ收到ACK后删除磁盘的备份数据。

3) 性能对比:

PQ性能要低于内存中的queue的,但是差别不是很大。如果不是特殊需求,建议打开PQ。

4) PQ配置:

代码语言:txt
复制
queue.type: persisted  #默认是memory,表示使用内存队列

queue.max\_bytes: 4gb  #队列存储的最大数据量,默认1gb,这个值大一点队列可以存储多一点数据

1.5、Logstash配置文件

配置文件一般在config目录录下面,logstash.yml和jvm.options,另外6.0以上的版本会有pipelines.yml,这个文件是为了在同一个进程中运行多个管道。

1) 管道配置文件

在定义Logstash处理管道的各个阶段时,需要创建管道配置文件,Logstash尝试在/etc/logstash/conf.d目录中只加载扩展名为.conf的文件并忽略所有其他文件。

logstash.conf管道配置文件模板:

代码语言:txt
复制
input {

 elasticsearch {

 hosts => "http://xxxxxxxxx:9200"

 user => "elastic"

 index => "\*"

 password => "xxxxxx"

 docinfo => true

 docinfo\_fields => "\_index", "\_id", "\_type", "\_routing"

 }

 }

output {

 elasticsearch {

 hosts => "http://xxxxxxx:9200"

 user => "elastic"

 password => "xxxxxx"

 index => "%{@metadata}"

 document\_type => "%{@metadata}"

 document\_id => "%{@metadata}"

 routing => "%{@metadata}"

 }

}

参数说明:

参数

说明

hosts

ES服务的访问地址。input中为http://<阿里云ES公网地址>:<端口>:output中为http://腾讯云ES实例

user

访问ES服务的用户名

password

访问ES服务的密码

index

指定同步索引名,通配符*代表所有索引

docinfo

设置为true,将会提取ES文档的元信息,例如index、type和id

docinfo_fields

指定文档元信息,默认不带routing元信息

注意:客户在实际的生产环境中,如果用到_routing这个字段,就需要迁移_routing,需要在logstash的output里指定routing字段,值是"%{@metadata}"(意思是保持跟来源索引一致),但同时也要在input中指定docinfo_fields并增加_routing,因为默认元信息只有_index,_type,_id,以下是官方说明截图:

其他参数详情请参见input插件说明:https://www.elastic.co/guide/en/logstash/6.7/plugins-inputs-elasticsearch.html?spm=a2c4e.10696291.0.0.2ffa19a4VAMGwN

代码语言:txt
复制
input {

 elasticsearch {

 hosts => "http://xxxxxxxx:9200"

 user => "elastic"

 index => "test"

 password => "xxxxxxx"

 query => '{ "query":{ "query\_string": { "query": "\*" } } }'

 docinfo => true

 }

Elasticsearch input插件是根据配置的query语句,从ES集群读取文档数据。适用于批量导入测试日志、重索引等操作。默认一次query抽取完数据后,worker会自动关闭。如果想要实现定时触发同步动作,可以使用cron语法,配合schedule属性的配置实现,详情请参见Logstash官网Scheduling介绍:https://www.elastic.co/guide/en/logstash/6.7/plugins-inputs-elasticsearch.html?spm=a2c4e.10696291.0.0.34dd19a4GGsCSh#_scheduling

代码语言:txt
复制
schedule => "* * * * *"

以上示例会每分钟触发抽取。如果没有指定schedule参数,那么抽取仅执行一次。

2) logstash.yml

logstash.yml用于配置logstash的启动和执行相关配置,可以在这个文件中设置标志,而不是在命令行中传递标志,在命令行中设置的任何标志都覆盖logstash.yml文件中的相应设置。

logstash.yml配置模板:

代码语言:txt
复制
#使用分层表单来设置管道的批处理大小和批处理延迟
   pipeline:
     batch:
       size: 125        #管道批处理大小
       delay: 5             #管道批处理延迟

#若要表示与平面键相同的值:
   pipeline.batch.size: 125
   pipeline.batch.delay: 5

#节点名称,在集群中具备唯一性,默认为logstash主机的主机名
node.name: logstast-node1

#logstash及其插件所使用的数据路径,默认路径为logstash家目录下的data目录
path.data: /usr/local/logstash-7.0.0/data/

#管道的ID,默认为main
pipeline.id: main

#输入、输出及过滤器的总工作数量,也就是logstash的工作进程,此工作进程默认为主机的cpu核心数量
pipeline.workers: 16 

#在输入阶段,单个工作线程将从输入中收集的最大事件数,此事件数堆内存开销较大,内存开销可在jvm.options中设置堆内存大小来优化此选项
pipeline.batch.size: 125

#在将一个较小的批发送到filters+output之前,轮询下一个事件时等待的时间(以毫秒为单位)
pipeline.batch.delay: 50

#设置为true时,在强制关闭logstash期间,即使内存中还有事件,那么为true将会强制关闭,导致数据丢失;默认为false,false在强制关闭logstash期间,将拒绝退出,直到所有在管道中的事件被安全输出,再关闭。
pipeline.unsafe_shutdown: false

#指定管道配置的目录,在此目录下的所有管道配置文件都将被logstash读取,除管道配置外,不要放任何文件
path.config: /usr/local/logstash-7.0.0/conf.d/

#在启动时,测试配置是否有效并退出,检测配置文件是否正确,包括检测管道配置文件,默认为false
config.test_and_exit: true

#定期检查配置是否更改并重新加载管道,默认为false
config.reload.automatic: true

#logstash间隔多久检查一次配置中的更改,默认为3秒
config.reload.interval: 600s

#设置为true时,将完全编译的配置显示为调试日志消息
config.debug: false

#用于事件缓冲的内部排队模型;可以指定内存memory或者磁盘persisted,内存处理速度相对磁盘来说效率要高,默认为内存
queue.type: memory

#启用持久队列时将存储数据文件的目录路径,默认为logstash路径下的queue
path.queue: /usr/local/logstash-7.0.0/queue/

#启用持久队列时使用的页面数据文件的大小(queue.type: persisted)队列数据由分成页面的仅附加数据文件组成
queue.page_capacity: 64mb

#启用持久队列时队列中未读事件的最大数量(queue.type: persisted),默认为0,0为无限制
queue.max_events: 0

#队列的总容量,以字节数表示,默认为1G,根据业务需求而定
queue.max_bytes: 1024mb

#启用持久队列时强制检查点之前最大的ACK事件数量(queue.type: persisted),设置为0,表示无限制,默认为1024
queue.checkpoint.acks: 1024

#启用持久队列时强制检查点之前写入事件的最大数量(queue,type: persisted),设置为0,表示无限制,默认为1024
queue.checkpoint.writes: 1024

#启用持久队列(queue,type: persisted),强制在头部页面上设置检查点的间隔(以毫秒为单位),有周期性检查点的默认值是1000毫秒
queue.checkpoint.interval: 1000

#用于指示logstast启用插件支持DLQ功能的标志,默认为false
dead_letter_queue.enable: false

#每个死信队列的最大大小,如果条目超过此设置会增加死信队列的大小,则会删除条目,默认为1024mb
dead_letter_queue.max_bytes: 1024mb

#为死信队列存储数据文件的目录路径
path.dead_letter_queue: /usr/local/logstash-7.0.0/letter-queue

#度量标准REST端点的绑定地址,默认为127.0.0.1
http.host: "127.0.0.1"

#度量标准REST端点的绑定端口,默认为9600
http.port: 9600

#日志级别,可以设置为以下几种级别,默认为info
log.level: info
           fatal
           error
           warn
           info (default)
           debug
           trace

#logstash日志目录位置,默认为logstash路径下的logs
path.logs: /usr/local/logstash-7.0.0/logs

#logstash插件路径
path.plugins: []

logstash.yml参数详细说明:

设置

描述

默认值

node.name

节点的描述性名称

机器的主机名

path.data

Logstash及其插件用于任何持久需求的目录

LOGSTASH_HOME/data

pipeline.id

管道的ID

main

pipeline.workers

将并行执行管道的过滤和输出阶段的工人数量,如果您发现事件正在备份,或者CPU没有饱和,请考虑增加这个数字,以更好地利用机器处理能力

主机CPU核心的数量

pipeline.batch.size

在尝试执行过滤器和输出之前,单个工作线程将从输入中收集的最大事件数,更大的批处理大小通常更高效,但代价是增加内存开销,您可能需要增加jvm.options配置文件中的JVM堆空间

125

pipeline.batch.delay

当创建管道事件批处理时,在向管道工作人员发送一个较小的批处理之前,等待每个事件的时间为多少毫秒

50

pipeline.unsafe_shutdown

当设置为true时,即使内存中仍然存在游离事件,也会在关闭期间强制Logstash退出,默认情况下,Logstash将拒绝退出,直到所有接收到的事件都被推送到输出,启用此选项可能导致关闭期间的数据丢失

false

path.config

主管道的Logstash配置路径,如果指定目录或通配符,配置文件将按字母顺序从目录中读取

config.string

包含要用于主管道的管道配置的字符串,使用与配置文件相同的语法

None

config.test_and_exit

当设置为true时,检查配置是否有效,然后退出,注意,在此设置中没有检查grok模式的正确性,Logstash可以从一个目录中读取多个配置文件,如果您把这个设置和log.level: debug结合起来,Logstash将对合并后的配置文件进行日志记录,并用它来自的源文件注解每个配置块

false

config.reload.automatic

当设置为true时,定期检查配置是否已更改,并在更改配置时重新加载配置,这也可以通过SIGHUP信号手动触发

false

config.reload.interval

Logstash多久检查一次配置文件以查看更改

3s

config.debug

当设置为true时,将完整编译的配置显示为debug日志消息,您还必须设置log.level: debug,警告:日志消息将包含传递给插件配置的任意密码选项,可能会导致明文密码出现在日志中!

false

config.support_escapes

当设置为true时,引号中的字符串将处理以下转义序列:\n变成文字换行符(ASCII 10),\r变成文字回车(ASCII 13),\t变成文字制表符(ASCII 9),\变成字面反斜杠\,\"变成一个文字双引号,\'变成文字引号

false

modules

当配置时,modules必须位于上表中描述的嵌套YAML结构中

None

queue.type

用于事件缓冲的内部队列模型,为基于内存中的遗留队列指定memory,或者persisted基于磁盘的ACKed队列

memory

path.queue

启用持久队列时存储数据文件的目录路径(queue.type: persisted)

path.data/queue

queue.page_capacity

启用持久队列时使用的页面数据文件的大小(queue.type: persisted),队列数据由分隔成页面的仅追加的数据文件组成

64mb

queue.max_events

启用持久队列时队列中未读事件的最大数量(queue.type: persisted)

0(无限)

queue.max_bytes

队列的总容量(字节数),确保磁盘驱动器的容量大于这里指定的值,如果queue.max_events和queue.max_bytes都指定,Logstash使用最先达到的任何标准

1024mb(1g)

queue.checkpoint.acks

当启用持久队列时,在强制执行检查点之前的最大ACKed事件数(queue.type: persisted),指定queue.checkpoint.acks: 0设置此值为无限制

1024

queue.checkpoint.writes

启用持久队列时强制执行检查点之前的最大写入事件数(queue.type: persisted),指定queue.checkpoint.writes: 0设置此值为无限制

1024

queue.drain

启用时,Logstash会一直等到持久队列耗尽后才关闭

false

dead_letter_queue.enable

标记指示Logstash以插件支持的DLQ特性

false

dead_letter_queue.max_bytes

每个dead letter队列的最大大小,如果条目将增加dead letter队列的大小,超过此设置,则删除条目

1024mb

path.dead_letter_queue

存储dead letter队列数据文件的目录路径

path.data/dead_letter_queue

http.host

指标REST端点的绑定地址

"127.0.0.1"

http.port

指标REST端点的绑定端口

9600

log.level

日志级别,有效的选项是:fatal、error、warn、info、debug、trace

info

log.format

日志格式,设置为json日志以JSON格式,或plain使用Object#.inspect

plain

path.logs

Logstash将其日志写到的目录

LOGSTASH_HOME/logs

path.plugins

哪里可以找到自定义插件,您可以多次指定此设置以包含多个路径,插件应该在特定的目录层次结构中:PATH/logstash/TYPE/NAME.rb,TYPE是inputs、filters、outputs或codecs,NAME是插件的名称

3) pipelines.yml

如果需要在同一进程中运行多个管道,可以用pipeline.yml配置流水线数据处理,该文件在YAML中格式化并包含一个字典列表,其中每个字典描述一个管道,每个键/值对指定该管道的设置:

代码语言:txt
复制
- pipeline.id: my-pipeline_1
   path.config: "/etc/path/to/p1.config"
   pipeline.workers: 3
- pipeline.id: my-other-pipeline
   path.config: "/etc/different/path/p2.cfg"
   queue.type: persisted

以上示例展示了两个不同的管道,它们由ID和配置路径描述,第一个管道,pipeline.workers的值被设置为3,而在另一个管道则启用持久队列特性,在pipelines.yml文件中未显式设置的设置值将使用logstash.yml中指定的默认值。

在没有参数的情况下启动Logstash时,会读取pipelines.yml文件并实例化文件中指定的所有管道,当使用-e或-f时,Logstash会忽略pipelines.yml文件。

4) jvm.options

定义JVM的总堆空间的最小值和最大值,也就是内存使用量,默认是1G。

5) log4j2.properties

log4j2库的相关设置。

6) startup.options(Linux)

定义logstash启动时的相关配置,包含在/usr/share/logstash/bin中使用的system-install脚本选项,可以构建适当的服务启动脚本。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Logstash
    • 1.1、适合场景
      • 1.2、logstash架构简介
        • 1.3、Logstash处理流程
          • 1.4、Queue
            • 1.5、Logstash配置文件
            相关产品与服务
            Elasticsearch Service
            腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档