前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何不写一行代码把 Mysql json 字符串解析为 Elasticsearch 的独立字段

如何不写一行代码把 Mysql json 字符串解析为 Elasticsearch 的独立字段

作者头像
铭毅天下
发布2021-03-22 12:37:56
2.7K0
发布2021-03-22 12:37:56
举报
文章被收录于专栏:铭毅天下铭毅天下

1、事出有因

实战问题:有数百万数据需要导入 Elasticsearch 做性能对比测试,但当前数据存储在 Mysql 中,且核心字段以 Json 字符串形式存储。Mysql 存储如下所示:

有没有又快又好的方法?接收同事是非开发人员,如果不写一行代码(脚本)就更好了!

2、方案探讨

2.1 前置认知

比较成熟同步方案选型。

  • Mysql 到 Elasticsearch 同步选定:logstash。

2.2 Json 字段的处理方案

2.2.1 方案一:遍历 Mysql,解析Json。

逐行遍历 Mysql,把 Json 字符串字段解析为单个字段,更新到Mysql中。

然后,logstash 同步到 Elasticsearch。

  • 优点:很好理解,切实可行。
  • 缺点:需要写解析代码,且涉及 Mysql 的逐行更新操作,慢且效率低。
2.2.2 方案二:logstash 中间环节用 json filter 插件过滤搞定 Json 串解析。

在 logstash 中间 filter 环节,加上 json 串的过滤。

举例如下(类似):

代码语言:javascript
复制
filter {
      json {
        source => "message",
        target => "doc"

      }
    }

实战参考:

https://www.elastic.co/guide/en/logstash/current/plugins-filters-json.html

  • 优点:少了代码解析环节,借助插件实现。
  • 缺点:需要修改 logstash 同步脚本,有一点学习成本。
2.2.3 方案三:Ingest 数据预处理搞定 json 解析。

既然 logstash json filter 插件能做数据解析,那么,与之对标的 Ingest 管道预处理中的 json processor 等 processor 组合肯定也能搞定。

  • 优点1:少了代码解析环节,借助 Ingest processor 组合实现复杂数据预处理功能。
  • 优点2:相比 logstash filter 更通俗易懂,小白也能快速上手。
  • 缺点:占无。

3、实战一把

如前分析,方案一、二 也能搞定。

但是,方案三更方便,更适合技术小白人员甚至非技术人员。

我们就以方案三实战一把。

3.1 创建预处理管道

代码语言:javascript
复制
PUT _ingest/pipeline/text2json_pipeline
{
  "description": "describe pipeline",
  "processors": [
    {
      "json": {
        "field": "wb_detail",
        "target_field": "wb_json"
      }
    },
    {
      "script": {
        "source": """
            ctx.loc = ctx.wb_json.loc;
            ctx.cont = ctx.wb_json.wc;
            ctx.author = ctx.wb_json.usn;
            ctx.area = ctx.wb_json.uloc;
            ctx.url = ctx.wb_json.sr;
          """
      }
    },
    {
      "remove": {
        "field": "wb_json"
      }
    }
  ]
}

如上所示,应用了三个 process。

  • processor 1:json 处理。

将 wb_detail 源字符串 变成 wb_json json串。

wb_json 属于中间过度字段。

  • processor 2:script 处理。

将 wb_json json 串中的字段逐个字段切分。

  • processor 3:remove 删除字段处理。

删除中间过度字段 wb_json。

3.2 创建索引,并指定 default_pipeline

代码语言:javascript
复制
PUT test-003
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "refresh_interval": "30s",
    "index.default_pipeline":"text2json_pipeline"
  },
  "mappings": {
    "properties": {
      "area": {
        "type": "text",
        "analyzer": "ik_smart",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "author": {
        "type": "keyword"
      },
      "cont": {
        "type": "text",
        "analyzer": "ik_max_word",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "id": {
        "type": "long"
      },
      "loc": {
        "type": "keyword"
      },
      "publish_time": {
        "type": "date"
      },
      "publish_timestamp": {
        "type": "keyword"
      },
      "update_time": {
        "type": "date"
      },
      "url": {
        "type": "keyword"
      },
      "wb_detail": {
        "type": "keyword"
      },
      "wb_id": {
        "type": "keyword"
      }
    }
  }
}

通过 default_pipeline 提前指定预处理管道的方式非常巧妙,避免了一次 reindex 操作。

相当于在写入环节同时做了数据的处理。

3.3 logstash 数据同步

之前同步讲的很多了,这里就不做具体字段含义的讲解,基本见名释义,很好理解。不明白的读者,留言讨论或者加 wx:elastic 6 讨论。

代码语言:javascript
复制
input {
  stdin {
  }
  jdbc {
  # mysql jdbc connection string to our backup databse  
  jdbc_connection_string => "jdbc:mysql://172.21.0.x:3306/weibo_base"
  # the user we wish to excute our statement as
  jdbc_user => "root"
  jdbc_password => "XXXXX"
  record_last_run => "true"
  use_column_value => "true"
  tracking_column => "id"
  last_run_metadata_path => "/home/elasticsearch/logstash-7.6.0/sync/test_info"
  clean_run => "false"
  # the path to our downloaded jdbc driver
  jdbc_driver_library => "/home/elasticsearch/mysql-connector-java-5.1.47.jar"
  # the name of the driver class for mysql
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "500"
# 以下对应着要执行的sql的绝对路径
  statement_filepath => "/home/elasticsearch/logstash-7.6.0/sync/jdbc_test.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
  }
}

filter {
}

output {
  elasticsearch {
#ESIP地址与端口
  hosts => "172.21.0.x:9200"
#ES索引名称(自己定义的)
  index => "test-003"
  user => "elastic"
  password => "XXXXXX"
#自增ID编号
  document_id => "%{id}"
  }
  stdout {
#以JSON格式输出
  codec => json_lines
  }
}

以上三步,搞定。

4、看效果

有图有真相。

数据源 json 字符串已经拆分为独立字段:area、loc、author 等。

拆分结果达到预期,就加了管道预处理一下,没有写一行脚本。

5、小结

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-03-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 铭毅天下Elasticsearch 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、事出有因
  • 2、方案探讨
    • 2.1 前置认知
      • 2.2 Json 字段的处理方案
        • 2.2.1 方案一:遍历 Mysql,解析Json。
        • 2.2.2 方案二:logstash 中间环节用 json filter 插件过滤搞定 Json 串解析。
        • 2.2.3 方案三:Ingest 数据预处理搞定 json 解析。
    • 3、实战一把
      • 3.1 创建预处理管道
        • 3.2 创建索引,并指定 default_pipeline
          • 3.3 logstash 数据同步
          • 4、看效果
          • 5、小结
          相关产品与服务
          云数据库 SQL Server
          腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档