专栏首页腾讯云流计算 Oceanus基于流计算 Oceanus 和 Elasticsearch 构建日志分析系统

基于流计算 Oceanus 和 Elasticsearch 构建日志分析系统

实时即未来,最近在腾讯云流计算 Oceanus(Flink)进行实时计算服务,以下为MySQL 到 Flink 进行处理分析,再存储到ES的实践。分享给大家~

1 方案概述

本方案可作为日志搜索场景解决方案使用。方案中使用了云数据库 MySQL、流计算 Oceanus(Flink)、Elasticsearch、Kibana 和私有网络 VPC。

日志搜索场景

2 前置准备

2.1 创建流计算 Oceanus 集群

在 Oceanus 控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。

若之前未使用过VPC,日志,存储这些组件,需要先进行创建。

这里 VPC及子网和下面的 MySQL、ES 集群使用了同一个。

创建完后的集群如下:

oceanus集群

2.2 创建Mysql集群

在腾讯云主页【产品】->【数据库】->【云数据库 MySQL】页面购买 MySQL 集群。

MySQL 控制台找到创建的 MySQL 集群,在【数据库管理】->【参数设置】页面修改如下参数:

   binlog_row_image=FULL
MySQL 参数配置

在 MySQL 数据库中创建表。执行如下 SQL,或通过可视化页面创建表。

-- 以学生成绩表为例
CREATE TABLE `cdc_source4es` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '学号',
`score` int(11) NOT NULL COMMENT '分数',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8 COMMENT='create for student score'

2.3 创建 Elasticsearch Service 集群

在腾讯云主页【产品】->【大数据】->【ElasticSearch】页面购买ES集群,这里为了简单,选择了与流计算 Oceanus 同一个地域,同可用区。网络选择也选择与上面同样的 VPC。

本次创建了1个ES6版本的集群,通过 ES 控制台查看,创建完后的集群如下:

ES 集群

创建之后可通过 Kibana 查看 ES 集群信息。如在 Dev Tools 面板上执行如下命令:

# 查看集群节点
   GET _cat/nodes
# 返回节点信息则为正常
   172.28.1.1 43 99 1 0.06 0.06 0.12 dilm - 1627027760001130832
   172.28.1.2  65 99 3 0.03 0.12 0.13 dilm - 1627027760001130732
   172.28.1.3 29 99 3 0.08 0.08 0.12 dilm * 1627027760001130632

注:ES中无需提前创建类似表的实体。

至此,环境准备完毕。

3 作业创建

3.1 创建 SQL 作业

在 Oceanus 控制台【作业管理】>【新建】新建作业,选择【SQL 作业】,选择刚刚新建的集群创建作业。然后进入【开发调试】页面。

3.2 创建 Source 端

此处选择 MySQL 作为数据源,并将后续的数据持续更新到 ES 中。

-- mysql-cdc connector
CREATE TABLE `mysql_source` (
    `id` int,
    `score` int,
    PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
    'connector' = 'mysql-cdc',      -- 必须为 'mysql-cdc'
    'hostname' = '172.28.28.213',   -- 数据库的 IP
    'port' = '3306',                -- 数据库的访问端口
    'username' = 'youruser',        -- 数据库访问的用户名(需要提供 SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD 权限)
    'password' = 'yourpassword',    -- 数据库访问的密码
    'database-name' = 'test',   -- 需要同步的数据库
    'table-name' = 'cdc_source4es'      -- 需要同步的数据表名
);

3.3 创建 Sink 端

此处 Sink 无需在 ES 集群中提前做初始化,可直接写入数据。

-- 注意! 如果您启用了 Elasticsearch 的用户名密码鉴权功能, 目前只能使用 Flink 1.10 的旧语法。若无需鉴权, 则可以使用 Flink 1.11 的新语法。
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

CREATE TABLE es_old_sink (
    `id` INT,
    `score` INT
) WITH (
    'connector.type' = 'elasticsearch', -- 输出到 Elasticsearch
    'connector.version' = '6',          -- 指定 Elasticsearch 的版本, 例如 '6', '7'. 注意务必要和所选的内置 Connector 版本一致
    'connector.hosts' = 'http://172.28.1.175:9200',  -- Elasticsearch 的连接地址
    'connector.index' = 'connector-test-index',       -- Elasticsearch 的 Index 名
    'connector.document-type' = '_doc',  -- Elasticsearch 的 Document 类型
    'connector.username' = 'elastic',  -- 可选参数: Elasticsearch 用户名
    'connector.password' = 'yourpassword',  -- 可选参数: Elasticsearch 密码

    'update-mode' = 'upsert',            -- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式     
    'connector.key-delimiter' = '$',     -- 可选参数, 复合主键的连接字符 (默认是 _ 符号, 例如 key1_key2_key3)
    'connector.key-null-literal' = 'n/a',  -- 主键为 null 时的替代字符串,默认是 'null'
    'connector.failure-handler' = 'retry-rejected',   -- 可选的错误处理。可选择 'fail' (抛出异常)、'ignore'(忽略任何错误)、'retry-rejected'(重试)

    'connector.flush-on-checkpoint' = 'true',   -- 可选参数, 快照时不允许批量写入(flush), 默认为 true
    'connector.bulk-flush.max-actions' = '42',  -- 可选参数, 每批次最多的条数
    'connector.bulk-flush.max-size' = '42 mb',  -- 可选参数, 每批次的累计最大大小 (只支持 mb)
    'connector.bulk-flush.interval' = '60000',  -- 可选参数, 批量写入的间隔 (ms)
    'connector.connection-max-retry-timeout' = '1000',     -- 每次请求的最大超时时间 (ms)
    --'connector.connection-path-prefix' = '/v1'          -- 可选字段, 每次请求时附加的路径前缀
                                                        
    'format.type' = 'json'        -- 输出数据格式, 目前只支持 'json'
);

3.4 算子操作

此处为了演示,只做了简单的数据插入,没有进行复杂计算。Oceanus 也可以进行条件过滤,正则匹配等操作,能够兼容 Flink SQL的所有语法。

INSERT INTO es_old_sink 
(select id, score  from mysql_source);

3.5 验证

在 Kibana 的 Dev Tools 中查询 ES 中的数据,数据是否插入成功。

# 查询该索引下所有的数据
GET connector-test-index/_search
本文转载自: https://cloud.tencent.com/developer/article/1856092复制
如有侵权,请联系 cloudcommunity@tencent.com 删除。
登录 后参与评论
0 条评论

相关文章

  • 基于流计算 Oceanus 和 Elasticsearch Service 构建百亿级实时监控系统

    在后移动互联网时代,良好的用户体验是增长的基础,而稳定的使用体验则是用户体验的基础。大型的互联网公司,尤其是面向 C 端客户的公司,对业务系统稳定性的要求越来越...

    三余鱻生
  • 基于流计算 Oceanus 和 Elasticsearch Service 构建百亿级实时监控系统

    The following article is from 腾讯技术工程 Author 腾讯程序员 作者:龙逸尘,腾讯 CSIG 高级工程师 为什么要构建...

    腾讯QQ大数据
  • 基于流计算 Oceanus 和 Elasticsearch Service 构建百亿级实时监控系统

    为什么要构建监控系统 作者:龙逸尘,腾讯 CSIG 高级工程师 在后移动互联网时代,良好的用户体验是增长的基础,稳定的使用体验就是用户体验的基础。大型的互联网公...

    腾讯技术工程官方号
  • 基于流计算 Oceanus 和 Elasticsearch Service 实现实时监控系统

    本文描述了如何使用腾讯云大数据组件来完成实时监控系统的设计和实现,通过实时采集并分析云服务器(CVM)及其 App 应用的 CPU 和内存等资源消耗数据,高效地...

    于乐
  • 实时监控:基于流计算 Oceanus(Flink) 实现系统和应用级实时监控

    本文描述了如何使用腾讯云大数据组件来完成实时监控系统的设计和实现,通过实时采集并分析云服务器(CVM)及其 App 应用的 CPU和内存等资源消耗数据,以短信、...

    吴云涛
  • 实时监控:基于流计算 Oceanus ( Flink ) 实现系统和应用级实时监控

    ---- 作者:吴云涛,腾讯 CSIG 高级工程师 本文描述了如何使用腾讯云大数据组件来完成实时监控系统的设计和实现,通过实时采集并分析云服务器(CVM)及...

    腾讯技术工程官方号
  • 实时监控:基于流计算 Oceanus ( Flink ) 实现系统和应用级实时监控

    ---- 作者:吴云涛,腾讯 CSIG 高级工程师 本文描述了如何使用腾讯云大数据组件来完成实时监控系统的设计和实现,通过实时采集并分析云服务器(CVM)及...

    腾讯QQ大数据
  • 实时数仓:基于流计算 Oceanus 实现 MySQL 和 HBase 维表到 ClickHouse 的实时分析

    实时即未来,最近在腾讯云流计算 Oceanus(Flink) 进行实时计算服务分享给大家~

    吴云涛
  • 腾讯云流计算 Oceanus 最佳实践&解决方案汇总

    本页面汇总了腾讯云流计算 Oceanus (Flink 实时计算) 产品的最佳实践和解决方案文档,将持续更新。

    吴云涛
  • 最佳实践:MySQL CDC 同步数据到 ES

    作者:于乐,腾讯 CSIG 工程师 一、 方案描述 1.1 概述 在线教育是一种利用大数据、人工智能等新型互联网技术与传统教育行业相结合的新型教育方式。发展在线...

    腾讯QQ大数据
  • Flink 实践教程-进阶(1):维表关联

    作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介 流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache F...

    腾讯QQ大数据
  • Flink 实践教程-入门(4):读取 MySQL 数据写入到 ES

    作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介 流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache F...

    腾讯QQ大数据
  • Flink 实践教程:入门(2):写入 Elasticsearch

    作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介 流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache F...

    腾讯QQ大数据
  • Flink 实践教程:进阶1-维表关联

    流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点...

    吴云涛
  • Flink 实践教程:入门4-读取 MySQL 数据写入 ES

    流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点...

    吴云涛
  • Flink 实践教程:入门2-写入 Elasticsearch

    流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点...

    吴云涛
  • 专家带你吃透 Flink 架构:一个新版 Connector 的实现

    作者:刘泽善,腾讯CSIG专家工程师 前言 Flink 可以说已经是流计算领域的事实标准,其开源社区发展迅速,提出了很多改进计划(Flink Improvem...

    腾讯QQ大数据
  • Flink 实践教程:入门(3):读取 MySQL 数据

    作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介 流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache F...

    腾讯QQ大数据

扫码关注腾讯云开发者

领取腾讯云代金券