文档中心>Elasticsearch Service>实践教程>应用场景构建>基于流计算 Oceanus 的实时监控解决方案

基于流计算 Oceanus 的实时监控解决方案

最近更新时间:2024-05-14 11:46:31

我的收藏
本方案结合腾讯云 Ckafka、流计算 Oceanus、腾讯云数据库 Elasticsearch、腾讯云可观测平台等,通过 Filebeat 实时监控系统日志和应用日志,将监控数据传输到腾讯云 Ckafka,再将 Kafka 中数据接入流计算 Oceanus,经过简单的业务逻辑处理输出到云数据库 Elasticsearch,利用云 Promethus 监控系统指标,利用云 Grafana 实现对 Oceanus 作业的个性化业务数据监控。



方案架构





前置准备

在使用前,请确保已购买并创建相应的大数据组件。

创建私有网络 VPC

私有网络是一块您在腾讯云上自定义的逻辑隔离网络空间。需要使用对等连接、NAT 网关等方式打通网络。具体创建步骤可参考 创建私有网络
注意
在构建 Ckafka、Oceanus、Elasticsearch 集群等服务时选择的网络必须保持一致,网络才能互通。

创建 Ckafka 实例

进入 消息队列 CKafka 控制台,在实例列表中单击新建,创建 Ckafka 实例详情请参见 创建实例。购买完成后,再创建 Kafka topic(topic-app-info),详细操作请参见 创建 Topic
说明
私有网络和子网选择上一步创建的网络和子网
Kafka 建议选择最新的版本,和 Filebeat 采集工具兼容性较好。




创建 Oceanus 集群

流计算 Oceanus 服务兼容原生的 Flink 任务。在 流计算 Oceanus 控制台计算资源 > + 新建创建集群,选择地域、可用区、VPC、日志、存储、设置初始密码等。VPC 及子网选择刚创建好的网络,具体创建步骤可参考 创建独享集群。创建完后 Flink 的集群如下:




创建 Elasticsearch 实例

进入 Elasticsearch Service 控制台,在ES集群管理中单击新建集群需选择之前创建好的私有网络和子网,并设置账户和密码,具体操作可参考 创建集群




创建腾讯云可观测平台服务

为了展示自定义系统指标,需购买 Promethus 服务。若只需要自定义业务指标,可以省略此步骤。 进入腾讯云可观测平台页面,单击左侧 Prometheus 监控,单击新建选择之前的私有网络和子网,并设置实例名称和 Grafana 密码,具体操作可参考 创建实例




创建独立 Grafana 资源

Grafana 管理页面 进行单独购买实现业务监控指标的展示。单击新建进入 Grafana 可视化服务购买页,详情请参见 创建实例,购买时仍需选择与其他资源相同的 VPC 网络。

安装配置 Filebeat

Filebeat 是一款轻量级日志数据采集的工具,通过监控指定位置的文件收集信息。在该 VPC 下给需要监控主机信息和应用信息的 CVM 上安装 Filebeat(配置一般会在filebeat.yml中)。
方式一:下载 Filebeat 并安装 Filebeat 下载地址
方式二:采用 Elasticsearch 管理页面 > beats 管理中提供的 Filebeat。本示例中采用了方式一。 下载到 CVM 中并配置 Filebeat,在 filebeat.yml 文件中添加如下配置项:```shell。

监控日志文件配置

type: log enabled: true paths:
/tmp/test.log #- c:\\programdata\\elasticsearch\\logs*
```shell
# 监控数据输出项配置
output.kafka:
version: 2.0.0 # kafka版本号
hosts: ["xx.xx.xx.xx:xxxx"] # 请填写实际的IP地址+端口
topic: 'topic-app-info' # 请填写实际的topic
请根据实际业务需求配置相对应的 filebeat.yml 文件,参考 Filebeat 官方文档
注意
示例选用2.4.1的 Ckafka 版本,这里配置 version: 2.0.0。版本对应不上可能出现ERROR [kafka] kafka/client.go:341 Kafka (topic=topic-app-info): dropping invalid message错误。

方案实现

接下来通过案例介绍如何通过流计算 Oceanus 实现个性化监控。

Filebeat 数据传输

1. 进入到 Filebeat 根目录下,并启动 Filebeat 进行数据采集。示例中采集了 top 命令中显示的 CPU、内存等信息,也可以采集 jar 应用的日志、JVM 使用情况、监听端口等,详情可参考 Filebeat 官网
# filebeat启动
./filebeat -e -c filebeat.yml

# 监控系统信息写入test.log文件
top -d 10 >>/tmp/test.log
2. 进入 Ckafka 页面,单击消息查询,查询对应 topic 消息,验证是否采集到数据。

filebeat 采集到的数据格式:
{
"@timestamp": "2021-08-30T10:22:52.888Z",
"@metadata": {
"beat": "filebeat",
"type": "_doc",
"version": "7.14.0"
},
"input": {
"type": "log"
},
"host": {
"ip": ["xx.xx.xx.xx", "xx::xx:xx:xx:xx"],
"mac": ["xx:xx:xx:xx:xx:xx"],
"hostname": "xx.xx.xx.xx",
"architecture": "x86_64",
"os": {
"type": "linux",
"platform": "centos",
"version": "7(Core)",
"family": "redhat",
"name": "CentOSLinux",
"kernel": "3.10.0-1062.9.1.el7.x86_64",
"codename": "Core"
},
"id": "0ea734564f9a4e2881b866b82d679dfc",
"name": "xx.xx.xx.xx",
"containerized": false
},
"agent": {
"name": "xx.xx.xx.xx",
"type": "filebeat",
"version": "7.14.0",
"hostname": "xx.xx.xx.xx",
"ephemeral_id": "6c0922a6-17af-4474-9e88-1fc3b1c3b1a9",
"id": "6b23463c-0654-4f8b-83a9-84ec75721311"
},
"ecs": {
"version": "1.10.0"
},
"log": {
"offset": 2449931,
"file": {
"path": "/tmp/test.log"
}
},
"message": "(B16root0-20000S0.00.00:00.00kworker/1:0H(B"
}

SQL 作业编写

在流计算 Oceanus 中,对 Kafka 接入的数据进行加工处理,并存入 Elasticsearch 中。

1. 定义 source

按照 Filebeat 中 json 消息的格式,构造 Flink Source。
CREATE TABLE DataInput (
`@timestamp` VARCHAR,
`host` ROW<id VARCHAR,ip ARRAY<VARCHAR>>,
`log` ROW<`offset` INTEGER,file ROW<path VARCHAR>>,
`message` VARCHAR
) WITH (
'connector' = 'kafka', -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置 Connector
'topic' = 'topic-app-info', -- 替换为您要消费的 Topic
'scan.startup.mode' = 'earliest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一种
'properties.bootstrap.servers' = '10.0.0.29:9092', -- 替换为您的 Kafka 连接地址
'properties.group.id' = 'oceanus_group2', -- 必选参数, 一定要指定 Group ID
-- 定义数据格式 (JSON 格式)
'format' = 'json',
'json.ignore-parse-errors' = 'true', -- 忽略 JSON 结构解析异常
'json.fail-on-missing-field' = 'false' -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null
);

2. 定义 sink

CREATE TABLE es_output (
`id` VARCHAR,
`ip` ARRAY<VARCHAR>,
`path` VARCHAR,
`num` INTEGER,
`message` VARCHAR,
`createTime` VARCHAR
) WITH (
'connector.type' = 'elasticsearch', -- 输出到 Elasticsearch
'connector.version' = '6', -- 指定 Elasticsearch 的版本, 例如 '6', '7'.
'connector.hosts' = 'http://10.0.0.175:9200', -- Elasticsearch 的连接地址
'connector.index' = 'oceanus_test2', -- Elasticsearch 的 Index 名
'connector.document-type' = '_doc', -- Elasticsearch 的 Document 类型
'connector.username' = 'elastic',
'connector.password' = 'yourpassword',
'update-mode' = 'upsert', -- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式
'connector.key-delimiter' = '$', -- 可选参数, 复合主键的连接字符 (默认是 _ 符号, 例如 key1_key2_key3)
'connector.key-null-literal' = 'n/a', -- 主键为 null 时的替代字符串,默认是 'null'
'connector.failure-handler' = 'retry-rejected', -- 可选的错误处理。可选择 'fail' (抛出异常)、'ignore'(忽略任何错误)、'retry-rejected'(重试)

'connector.flush-on-checkpoint' = 'true', -- 可选参数, 快照时不允许批量写入(flush), 默认为 true
'connector.bulk-flush.max-actions' = '42', -- 可选参数, 每批次最多的条数
'connector.bulk-flush.max-size' = '42 mb', -- 可选参数, 每批次的累计最大大小 (只支持 mb)
'connector.bulk-flush.interval' = '60000', -- 可选参数, 批量写入的间隔 (ms)
'connector.connection-max-retry-timeout' = '1000', -- 每次请求的最大超时时间 (ms)
--'connector.connection-path-prefix' = '/v1' -- 可选字段, 每次请求时附加的路径前缀
'format.type' = 'json' -- 输出数据格式, 目前只支持 'json'
);

3. 业务逻辑

INSERT INTO es_output
SELECT
host.id as `id`,
host.ip as `ip`,
log.file.path as `path`,
log.`offset` as `num`,
message,
`@timestamp` as `createTime`
from DataInput;

4. ES 数据查询

在 ES 控制台的 Kibana 页面查询数据,或者进入某台相同子网的 CVM 下,使用以下命令进行查询:
# 查询索引 username:password请替换为实际账号密码
curl -XGET -u username:password http://xx.xx.xx.xx:xxxx/oceanus_test2/_search -H 'Content-Type: application/json' -d'
{
"query": { "match_all": {}},
"size": 10
}
'
更多访问方式请参考 访问 ES 集群

系统指标监控

本章节主要实现系统信息监控,对 Flink 作业运行状况进行监控告警。 Prometheus 是一个非常灵活的时序数据库,通常用于监控数据的存储、计算和告警。流计算 Oceanus 建议用户使用腾讯云可观测平台提供的 Prometheus 服务,以免去部署、运维开销;同时它还支持腾讯云的通知模板,可以通过短信、电话、邮件、企业微信机器人等方式,将告警信息轻松触达不同的接收方。

监控配置(Oceanus 作业监控)

除了 Oceanus 控制台 自带的监控信息,还可以配置目前已经支持了任务级细粒度监控、作业级监控和集群 Flink 作业列表监控。
1. Oceanus 作业详情页面,单击作业参数,在高级参数处添加如下配置:
pipeline.max-parallelism: 2048
metrics.reporters: promgateway
metrics.reporter.promgateway.host: xx.xx.xx.xx # Prometheus实例地址
metrics.reporter.promgateway.port: 9090 # Prometheus实例端口
metrics.reporter.promgateway.needBasicAuth: true
metrics.reporter.promgateway.password: xxxxxxxxxxx # Prometheus实例密码
metrics.reporter.promgateway.interval: 10 SECONDS
2. 在任一 Oceanus 作业中,单击云监控进入云 Prometheus 实例,点击链接进入 Grafana(灰度中的 Grafana 不能由此进入),导入 json 文件,详情请参见 接入 Prometheus 自定义监控


3. 展现出来的 flink 任务监控效果如下,用户也可以单击 Edit 设置不同 Panel 来优化展现效果。



告警配置

1. 进入腾讯云可观测平台界面,单击 Prometheus 监控,点击已购买的实例进入服务管理页面,然后选择告警策略 > 新建,配置相关信息。具体操作参考 接入 Prometheus 自定义监控


2. 设置告警通知。选择选择模板新建,设置通知模板。


3. 短信通知消息。



业务指标监控

通过 Filebeat 采集到应用业务数据,经过 Oceanus 服务的加工处理已经被存入 ES,可以通过 ES + Grafana 来实现业务数据的监控。
1. Grafana 配置 ES 数据源。进入灰度发布中的 Grafana 控制台,进入刚创建的 Grafana 服务,找到外网地址并打开。Grafana 账号为 admin,登录后选择 Configuration > Add Source,搜索elasticsearch,填写相关 ES 实例信息,添加数据源。


2. 选择左侧 Dashboards > Manage,单击右上角 New Dashboard,新建面板。

展现效果如下:
总数据量写入实时监控:对写入数据源的总数据量进行监控。
数据来源实时监控:对来源于某个特定 log 的数据写入量进行监控。
字段平均值监控:对某个字段的平均值进行监控。
num字段最大值监控:对 num 字段的最大值进行监控。

说明
此处仅作为示例,无实际业务。

总结

本方案中对系统监控指标和业务监控指标2种方式都进行尝试。若只需要对业务指标进行监控,可省略 Promethus 相关操作。此外,需要注意的是:
1. Ckafka 的版本和开源版本 Kafka 并没有严格对应,方案中 Ckafka2.4.1 和开源 Filebeat-1.14.1 版本能够调试成功。
2. 腾讯云可观测平台中的 Promethus 服务已经嵌入了 Grafana 监控服务。但不支持自定义数据源,该嵌入的 Grafana 只能接入 Promethus,需使用独立灰度发布的 Grafana 才能完成 ES 数据接入 Grafana。