文档中心>流计算 Oceanus>实践教程>使用流计算 Oceanus 和 ES 构建日志分析系统

使用流计算 Oceanus 和 ES 构建日志分析系统

最近更新时间:2026-03-26 17:59:49

我的收藏
本文主要介绍了从 MySQL 数据库采集数据到流计算服务 Oceanus 进行分析,最后输出到 Elasticsearch 服务的实践。本方案可作为日志搜索场景解决方案使用。使用了云数据库 MySQL、流计算 Oceanus、Elasticsearch、Kibana 和私有网络 VPC。

环境搭建

创建 Oceanus 集群

流计算 Oceanus 控制台计算资源 > 新建中新建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。
说明
若之前未曾使用过私有网络 VPC、日志 CLS、对象存储 COS 等组件,需要先进行创建。
新建集群时,私有网络 VPC 及子网选择需和下文的 MySQL、ES 集群相同,否则需要手动打通(如对等连接)。
创建完后的集群如下:




创建 MySQL 集群

云数据库 TencentDB 控制台 中,单击新建,创建 MySQL 集群。然后在数据库管理 > 参数设置中修改如下参数。
binlog_row_image=FULL




在 MySQL 数据库中创建表

执行如下 SQL,或通过可视化页面创建数据库,创建表。
-- 创建数据库
create database test;
-- 以学生成绩表为例
CREATE TABLE `cdc_source4es` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '学号',
`score` int(11) NOT NULL COMMENT '分数',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8 COMMENT='create for student score'
并在表中插入几条数据。
insert into cdc_source4es values(1, 99);
insert into cdc_source4es values(2, 88);
insert into cdc_source4es values(3, 77);

创建 Elasticsearch 集群

Elasticsearch Service 控制台 中,进入 ES 集群管理页面单击新建集群,创建 Elasticsearch Service 集群。建议选择与 Oceanus 相同地域、可用区和网络。文本以 Elasticsearch 7.5.1 版本的集群为例。

集群创建完成后,可通过 Kibana 查看集群信息。如在 Dev Tools 面板上执行如下命令。
说明
Elasticsearch Service 中无需提前创建类似表的实体。
# 查看集群节点
GET _cat/nodes
# 返回节点信息则为正常
172.28.1.1 43 99 1 0.06 0.06 0.12 dilm - 1627027760001130832
172.28.1.2 65 99 3 0.03 0.12 0.13 dilm - 1627027760001130732
172.28.1.3 29 99 3 0.08 0.08 0.12 dilm * 1627027760001130632

作业创建

创建 SQL 作业

流计算 Oceanus 控制台 中选择作业管理 > 新建> 新建作业,新建 SQL 作业,选择在新建的集群中新建作业。单击确定后即可在作业列表中看到新建的作业。


创建 SQL 作业后,在作业管理中单击要进行开发的作业名称,然后单击开发调试,即可在草稿状态下进行作业开发。版本管理(草稿)后的“(草稿)”,即表示当前正处于可编辑的草稿状态下。具体 SQL 作业开发操作可以参考开发 SQL 作业


然后在作业的开发调试 > 作业参数中添加必要的 connector,如 mysql-cdc connector、elasticsearch6/7 connector。程序包的上传和版本管理方式可参考 依赖管理
注意
ES connector 版本要与购买的 ES 组件版本一致。




创建 Source 端

选择 MySQL 作为数据源,并将后续的数据持续更新到 ES 中。
-- mysql-cdc connector
CREATE TABLE `mysql_source` (
`id` int,
`score` int,
PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
'connector' = 'mysql-cdc', -- 必须为 'mysql-cdc'
'hostname' = '172.28.28.213', -- 数据库的 IP
'port' = '3306', -- 数据库的访问端口
'username' = 'youruser', -- 数据库访问的用户名(需要提供 SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD 权限)
'password' = 'yourpassword', -- 数据库访问的密码
'database-name' = 'test', -- 需要同步的数据库
'table-name' = 'cdc_source4es' -- 需要同步的数据表名
);

创建 Sink 端

Sink 无需在 ES 集群中提前做初始化,可直接写入数据。
-- 具体Elasticsearch Connector的使用可以参见 https://cloud.tencent.com/document/product/849/48313

CREATE TABLE es_sink (
`id` INT,
`score` INT
) WITH (
'connector' = 'elasticsearch-7', -- 输出到 Elasticsearch 7
'username' = '$username', -- 选填 用户名
'password' = '$password', -- 选填 密码
'hosts' = 'http://10.28.28.94:9200', -- Elasticsearch 的连接地址
'index' = 'my-index', -- Elasticsearch 的 Index 名
'sink.bulk-flush.max-actions' = '1000', -- 数据刷新频率
'sink.bulk-flush.interval' = '1s' -- 数据刷新周期
'format' = 'json' -- 输出数据格式,目前只支持 'json'
);

算子操作

下面语法中只做了简单的数据插入,没有进行复杂计算。
-- Flink SQL 进行运算
INSERT INTO es_sink select id, score from mysql_source;

验证总结

在 Kibana 的 Dev Tools 中查询 ES 中的数据是否插入成功。
# 查询该索引下所有的数据
GET connector-test-index/_search