前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >logstash sql 数据采集

logstash sql 数据采集

作者头像
以谁为师
发布2019-05-30 20:35:57
1.4K0
发布2019-05-30 20:35:57
举报

Elasticsearch 6.3 发布SQL模块作为C-Pack的一部分使用

kabana官方工具查询

Dev Tools - console 查看

代码语言:javascript
复制
POST /_xpack/sql?format=txt
{
    "query": "SHOW tables"
}
# 查询有哪些表

POST /_xpack/sql?format=txt
{
    "query": "show functions "
}
# 查询支持的函数

POST /_xpack/sql?format=txt
{
    "query": "  select count(1) as \"统计行\" from  \"browser_req-*\"    "
}
# 转义特殊字符

postman 工具

利用logstash抽取sql数据

logstash 插件

代码语言:javascript
复制
/usr/share/logstash/bin/logstash-plugin  install logstash-input-jdbc
/usr/share/logstash/bin/logstash-plugin install logstash-output-elasticsearch
代码语言:javascript
复制
wget -P /usr/local/src/  https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip
unzip /usr/local/src/mysql-connector-java-5.1.46.zip  -d /usr/local/
# 下载解压

配置文件

/etc/logstash/conf.d/sql.indexer.conf

代码语言:javascript
复制
input {
  jdbc {
    jdbc_driver_library => "/usr/local/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_default_timezone =>"Asia/Shanghai"

    # 分页 limit
    jdbc_paging_enabled => true
    jdbc_page_size => "5000"

    jdbc_connection_string => "jdbc:mysql://172.16.140.xxx:3306/gateway"
    jdbc_user => "root"
    jdbc_password => "xxxxxxx"


    # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年) 5分钟频率: */5  * * * *
    schedule => "* * * * *"

    statement => "SELECT * FROM browser_req_log  WHERE gmt_create  >= :sql_last_value"

    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "gmt_create"

    # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来
    last_run_metadata_path => "/etc/logstash/conf.d/syncpoint_table"
    record_last_run => true

    # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
    clean_run => false
    #是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }
}


filter {
     mutate {
      add_field => {"temp_ts" => "%{gmt_create}"}
    }
    #添加数据库字段作为 timestamp

    # timestamp
    date {
      match => ["temp_ts","ISO8601"]
      remove_field => ["temp_ts"]
      timezone => "Asia/Shanghai"
    }

output {
  elasticsearch {
    hosts => ["172.16.140.120", "172.16.140.121", "172.16.140.122"]
    index => "browser_req-log"
    document_id => "%{id}"
  }
}

cat /etc/logstash/conf.d/syncpoint_table

代码语言:javascript
复制
2019-01-07

  • jdbc_driver_library: jdbc mysql 驱动的路径,在上一步中已经下载
  • jdbc_driver_class: 驱动类的名字,mysql 填 com.mysql.jdbc.Driver 就好了
  • jdbc_connection_string: mysql 地址
  • jdbc_user: mysql 用户
  • jdbc_password: mysql 密码
  • schedule: 执行 sql 时机,类似 crontab 的调度
  • statement: 要执行的 sql,以 ":" 开头是定义的变量,可以通过 parameters 来设置变量,这里的 sql_last_value 是内置的变量,表示上一次 sql 执行中 update_time 的值,这里 update_time 条件是 >= 因为时间有可能相等,没有等号可能会漏掉一些增量
  • use_column_value: 使用递增列的值
  • tracking_column_type: 递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型
  • tracking_column: 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp
  • last_run_metadata_path: 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改

  • hosts: es 集群地址
  • user: es 用户名
  • password: es 密码
  • index: 导入到 es 中的 index 名,这里我直接设置成了 mysql 表的名字
  • document_id: 导入到 es 中的文档 id,这个需要设置成主键,否则同一条记录更新后在 es 中会出现两条记录,%{id} 表示引用 mysql 表中 id 字段的值

多表同步

config/pipelines.yml

代码语言:javascript
复制
- pipeline.id: table1
  path.config: "config/sync_table1.cfg"
- pipeline.id: table2
  path.config: "config/sync_table2.cfg"
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019年4月4日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • kabana官方工具查询
  • postman 工具
  • 利用logstash抽取sql数据
    • logstash 插件
      • 配置文件
        • 多表同步
        相关产品与服务
        云数据库 SQL Server
        腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档