前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >实时监控:基于流计算 Oceanus(Flink) 实现系统和应用级实时监控

实时监控:基于流计算 Oceanus(Flink) 实现系统和应用级实时监控

原创
作者头像
吴云涛
修改2021-10-26 20:15:02
6.1K3
修改2021-10-26 20:15:02
举报

本文描述了如何使用腾讯云大数据组件来完成实时监控系统的设计和实现,通过实时采集并分析云服务器(CVM)及其 App 应用的 CPU和内存等资源消耗数据,以短信、电话、微信消息等方式实时反馈监控告警信息,高效地保障系统稳健运行。运用云化的 Kafka、Flink、ES 等组件,大大减少了开发运维人员的投入。

1 解决方案描述

1.1 概述

本方案结合腾讯云 CKafka、流计算 Oceanus (Flink)、 Elasticsearch、Prometheus 等,通过 Filebeat 实时采集系统和应用监控数据,并传输到 CKafka,再将 CKafka 数据接入流计算 Oceanus (Flink),经过简单的业务逻辑处理输出到 Elasticsearch,最后通过 Kibana 页面查询结果。方案中利用 Promethus 监控系统指标,如流计算 Oceanus 作业运行状况,利用云 Grafana 监控 CVM 或业务应用指标。

实时监控场景
实时监控场景

1.2 方案架构

架构图
架构图

2 前置准备

在使用前,请确保已购买并创建相应的大数据组件。

2.1 创建私有网络 VPC

私有网络是一块您在腾讯云上自定义的逻辑隔离网络空间,在构建 Ckafka、流计算 Oceanus,Elasticsearch集群等服务时选择的网络必须保持一致,网络才能互通。需要使用对等连接、NAT网关等方式打通网络。具体创建步骤请参考 帮助文档

2.2 创建 Ckafka 实例

注意私有网络和子网选择之前创建的网络和子网。Kafka建议选择最新的2.4.1版本,和Filebeat采集工具兼容性较好。

Kafka集群
Kafka集群

购买完成后,再创建Kafka topic: topic-app-info

2.3 创建 Oceanus 集群

流计算 Oceanus 服务兼容原生的 Apache Flink 任务。在 Oceanus 控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。VPC及子网选择刚刚创建好的网络,具体创建步骤请参考 帮助文档。创建完后 Oceanus 的集群如下:

Oceanus 集群
Oceanus 集群

2.4. 创建 Elasticsearch 实例

进入 Elasticsearch Service 控制台,点击左上角【新建】,注意选择之前创建好的私有网络和子网,并设置账户和密码,具体操作请参考帮助文档

创建 Elasticsearch 集群
创建 Elasticsearch 集群

2.5 创建云监控 Prometheus 实例

为了展示自定义系统指标,需购买Promethus服务。只需要自定业务指标的同学可以省略此步骤。

进入腾讯云监控页面,点击左侧 Prometheus 监控,点击【新建】,选择之前的私有网络和子网,并设置实例名称和Grafana密码,具体操作请参考帮助文档

创建Prometheus
创建Prometheus

2.6 创建独立 Grafana 资源

独立的Grafana在灰度发布中,需在Grafana管理页面进行单独购买实现业务监控指标的展示。购买时仍需选择与其他资源同一VPC网络。

2.7 安装配置 Filebeat

Filebeat 是一款轻量级日志数据采集的工具,通过监控指定位置的文件收集信息。在该VPC下给需要监控主机信息和应用信息的CVM上安装 Filebeat。

安装方式一:下载Filebeat并安装Filebeat下载地址 ,本示例中采用了方式一;

方式二:采用【Elasticsearch管理页面】-->【beats管理】中提供的 Filebeat。

下载到 CVM 中并配置 Filebeat,在 filebeat.yml 文件中提添加如下配置项:

代码语言:txt
复制
# 监控日志文件配置 
- type: log
  enabled: true
  paths:
    - /tmp/test.log
    #- c:\programdata\elasticsearch\logs\*
代码语言:txt
复制
# 监控数据输出项配置 
output.kafka:
  version: 2.0.0                         # kafka版本号
  hosts: ["xx.xx.xx.xx:xxxx"]            # 请填写实际的IP地址+端口
  topic: 'topic-app-info'                  # 请填写实际的topic

请根据实际业务需求配置相对应的filebeat.yml文件,参考 Filebeat官方文档

! 注:示例选用2.4.1的Ckafka版本,这里配置version: 2.0.0。版本对应不上可能出现“ERROR kafka kafka/client.go:341 Kafka (topic=topic-app-info): dropping invalid message”错误

3 方案实现

接下来通过案例介绍如何通过流计算 Oceanus 实现个性化监控。

3.1 Filebeat数据传输

  1. 进入到 Filebeat 根目录下,并启动Filebeat进行数据采集。示例中采集了top命令中显示的CPU、内存等信息,也可以采集jar应用的日志、JVM使用情况、监听端口等,详情参考 Filebeat官网
  2. 进入 Ckafka 页面,点击左侧【消息查询】,查询对应topic消息,验证是否采集到数据。 filebeat采集到的数据格式:
代码语言:javascript
复制
{
	"@timestamp": "2021-08-30T10:22:52.888Z",
	"@metadata": {
		"beat": "filebeat",
		"type": "_doc",
		"version": "7.14.0"
	},
	"input": {
		"type": "log"
	},
	"host": {
		"ip": ["xx.xx.xx.xx", "xx::xx:xx:xx:xx"],
		"mac": ["xx:xx:xx:xx:xx:xx"],
		"hostname": "xx.xx.xx.xx",
		"architecture": "x86_64",
		"os": {
			"type": "linux",
			"platform": "centos",
			"version": "7(Core)",
			"family": "redhat",
			"name": "CentOSLinux",
			"kernel": "3.10.0-1062.9.1.el7.x86_64",
			"codename": "Core"
		},
		"id": "0ea734564f9a4e2881b866b82d679dfc",
		"name": "xx.xx.xx.xx",
		"containerized": false
	},
	"agent": {
		"name": "xx.xx.xx.xx",
		"type": "filebeat",
		"version": "7.14.0",
		"hostname": "xx.xx.xx.xx",
		"ephemeral_id": "6c0922a6-17af-4474-9e88-1fc3b1c3b1a9",
		"id": "6b23463c-0654-4f8b-83a9-84ec75721311"
	},
	"ecs": {
		"version": "1.10.0"
	},
	"log": {
		"offset": 2449931,
		"file": {
			"path": "/tmp/test.log"
		}
	},
	"message": "(B16root0-20000S0.00.00:00.00kworker/1:0H(B"
}

3.2 SQL作业编写

在流计算 Oceanus 中,对 Kafka 接入的数据进行加工处理,并存入 Elasticsearch 中。

1. 定义source

按照Filebeat中json消息的格式,构造Flink Source。

代码语言:txt
复制
 CREATE TABLE DataInput (
     `@timestamp` VARCHAR,
     `host`       ROW<id VARCHAR,ip ARRAY<VARCHAR>>,
     `log`        ROW<`offset` INTEGER,file ROW<path VARCHAR>>,
     `message`    VARCHAR
 ) WITH (
     'connector' = 'kafka',   -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置  Connector
     'topic' = 'topic-app-info',  -- 替换为您要消费的 Topic
     'scan.startup.mode' = 'earliest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一种
     'properties.bootstrap.servers' = '10.0.0.29:9092',  -- 替换为您的 Kafka 连接地址
     'properties.group.id' = 'oceanus_group2',  -- 必选参数, 一定要指定 Group ID
     -- 定义数据格式 (JSON 格式)
     'format' = 'json',
     'json.ignore-parse-errors' = 'true',     -- 忽略 JSON 结构解析异常
     'json.fail-on-missing-field' = 'false'   -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null
 );

2. 定义sink

代码语言:txt
复制
CREATE TABLE es_output (
    `id` VARCHAR,
    `ip` ARRAY<VARCHAR>,
    `path` VARCHAR,
    `num` INTEGER,
    `message` VARCHAR,
    `createTime` VARCHAR
) WITH (
    'connector.type' = 'elasticsearch', -- 输出到 Elasticsearch
    'connector.version' = '6',          -- 指定 Elasticsearch 的版本, 例如 '6', '7'. 
    'connector.hosts' = 'http://10.0.0.175:9200',  -- Elasticsearch 的连接地址
    'connector.index' = 'oceanus_test2',       -- Elasticsearch 的 Index 名
    'connector.document-type' = '_doc',  -- Elasticsearch 的 Document 类型
    'connector.username' = 'elastic',  
    'connector.password' = 'yourpassword', 
    'update-mode' = 'upsert',            -- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式     
    'connector.key-delimiter' = '$',     -- 可选参数, 复合主键的连接字符 (默认是 _ 符号, 例如 key1_key2_key3)
    'connector.key-null-literal' = 'n/a',  -- 主键为 null 时的替代字符串,默认是 'null'
    'connector.failure-handler' = 'retry-rejected',   -- 可选的错误处理。可选择 'fail' (抛出异常)、'ignore'(忽略任何错误)、'retry-rejected'(重试)

    'connector.flush-on-checkpoint' = 'true',   -- 可选参数, 快照时不允许批量写入(flush), 默认为 true
    'connector.bulk-flush.max-actions' = '42',  -- 可选参数, 每批次最多的条数
    'connector.bulk-flush.max-size' = '42 mb',  -- 可选参数, 每批次的累计最大大小 (只支持 mb)
    'connector.bulk-flush.interval' = '60000',  -- 可选参数, 批量写入的间隔 (ms)
    'connector.connection-max-retry-timeout' = '1000',     -- 每次请求的最大超时时间 (ms)
    --'connector.connection-path-prefix' = '/v1'          -- 可选字段, 每次请求时附加的路径前缀                                                        
    'format.type' = 'json'        -- 输出数据格式, 目前只支持 'json'
);

3. 业务逻辑

代码语言:txt
复制
INSERT INTO es_output
SELECT 
    host.id as `id`,
    host.ip as `ip`,
    log.file.path as `path`,
    log.`offset` as `num`,
    message,
    `@timestamp` as `createTime`
from DataInput;

4. 作业参数

【内置connector】选择flink-connector-elasticsearch6flink-connector-kafka

注: 根据实际版本选择

5. ES数据查询

进入某台同子网的CVM下,使用以下命令或者在ES控制台的Kibana页面查询ES数据:

代码语言:txt
复制
# 查询索引  username:password请替换为实际账号密码
curl -XGET -u username:password http://xx.xx.xx.xx:xxxx/oceanus_test2/_search -H 'Content-Type: application/json' -d'
{
    "query": { "match_all": {}},
    "size":  10
}'

更多访问方式请参考 访问ES集群

3.3 系统指标监控

本章节主要实现系统信息监控,对Flink作业运行状况进行监控告警。

Prometheus 是一个非常灵活的时序数据库,通常用于监控数据的存储、计算和告警。 流计算 Oceanus 建议用户使用腾讯云监控提供的 Prometheus 服务,以免去部署、运维开销;同时它还支持腾讯云的通知模板,可以通过短信、电话、邮件、企业微信机器人等方式,将告警信息轻松触达不同的接收方。

监控配置

Oceanus 作业监控

除了 Oceanus 控制台自带的监控信息,还可以配置目前已经支持了任务级细粒度监控、作业级监控和集群Flink作业列表监控。

1. 配置作业高级参数。

Oceanus作业详情页面,点击【作业参数】,在【高级参数】处添加如下配置:

代码语言:txt
复制
pipeline.max-parallelism: 2048
metrics.reporters: promgateway
metrics.reporter.promgateway.host: xx.xx.xx.xx              # Prometheus实例地址 
metrics.reporter.promgateway.port: 9090                     # Prometheus实例端口
metrics.reporter.promgateway.needBasicAuth: true
metrics.reporter.promgateway.password: xxxxxxxxxxx          # Prometheus实例密码
metrics.reporter.promgateway.interval: 10 SECONDS
2. 导入自定义监控。

在任一Oceanus作业中,点击【云监控】进入云Prometheus实例,点击链接进入Grafana(灰度中的Grafana不能由此进入),导入json文件,详情请参见 接入Prometheus自定义监控

Grafana页面
Grafana页面
2. 展现出来的flink任务监控效果如下,用户也可以点击【Edit】设置不同Panel来优化展现效果。
实时监控效果图
实时监控效果图

告警配置

1. 新建告警。

进入腾讯云监控界面,点击左侧【Prometheus监控】,点击已购买的实例进入服务管理页面,点击左侧【告警策略】,点击【新建】,配置相关信息。具体操作参考 接入Prometheus自定义监控

告警配置
告警配置
2. 设置告警通知。

选择【选择模版】或【新建】,设置通知模版。

通知模版
通知模版
3. 短信通知消息
短信告警信息
短信告警信息

3.4 业务指标监控

通过 Filebeat 采集到应用业务数据,经过 Oceanus 服务的加工处理已经被存入ES,可以通过 ES + Grafana 来实现业务数据的监控。

1. Grafana配置ES数据源。

进入灰度发布中的 Grafana控制台,进入刚刚创建的Grafana服务,找到外网地址并打开。Grafana账号为admin,登录后点击【Configuration】,点击【Add Source】,搜索elasticsearch,填写相关ES实例信息,添加数据源。

添加数据源
添加数据源
2. 设置面板

点击左侧【Dashboards】,点击【Manage】,点击右上角【New Dashboard】,新建面板,编辑面板。

编辑Dashboard
编辑Dashboard
  1. 展现效果如下:
  2. 总数据量写入实时监控:对写入数据源的总数据量进行监控;
  3. 数据来源实时监控:对来源于某个特定log的数据写入量进行监控;
  4. 字段平均值监控:对某个字段的平均值进行监控;
  5. num字段最大值监控:对num字段的最大值进行监控;
ES-个性化监控
ES-个性化监控

注:本处只做示例,无实际业务

4 总结

本方案中对系统监控指标和业务监控指标2种方式都进行尝试。若只需要对业务指标进行监控,可省略Promethus相关操作。

此外,需要注意的是:

1. Ckafka的版本和开源版本Kafka并没有严格对应,方案中Ckafka2.4.1和开源Filebeat-1.14.1版本能够调试成功。

2. 云监控中的Promethus服务已经嵌入了Grafana监控服务。但不支持自定义数据源,该嵌入的Grafana只能接入Promethus,需使用独立灰度发布的Grafana才能完成ES数据接入Grafana。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 解决方案描述
    • 1.1 概述
      • 1.2 方案架构
      • 2 前置准备
        • 2.1 创建私有网络 VPC
          • 2.2 创建 Ckafka 实例
            • 2.3 创建 Oceanus 集群
              • 2.4. 创建 Elasticsearch 实例
                • 2.5 创建云监控 Prometheus 实例
                  • 2.6 创建独立 Grafana 资源
                    • 2.7 安装配置 Filebeat
                    • 3 方案实现
                      • 3.1 Filebeat数据传输
                        • 3.2 SQL作业编写
                          • 1. 定义source
                          • 2. 定义sink
                          • 3. 业务逻辑
                          • 4. 作业参数
                          • 5. ES数据查询
                        • 3.3 系统指标监控
                          • 监控配置
                          • Oceanus 作业监控
                          • 告警配置
                        • 3.4 业务指标监控
                          • 1. Grafana配置ES数据源。
                      • 4 总结
                      相关产品与服务
                      流计算 Oceanus
                      流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档