The content of this page has been automatically translated by AI. If you encounter any problems while reading, you can view the corresponding content in Chinese.

Real-time monitoring solution based on SCS

Last updated: 2024-10-25 09:28:53

This solution combines Tencent Cloud CKafka, SCS, Tencent Cloud Database Elasticsearch, TCOP, etc. Using Filebeat to monitor system logs and application logs in real-time, the monitoring data is transmitted to Tencent Cloud CKafka. The data in Kafka is then integrated into SCS, undergoes simple business logic processing, and is output to the Cloud Database Elasticsearch. Cloud Prometheus is used to monitor system metrics, and Cloud Grafana is utilized for personalized business data monitoring of Oceanus jobs.


Scheme Architecture

Search for the required CAM policy as needed, and click to complete policy association.


Preparation

Before using, please ensure that the corresponding big data components have been purchased and created.

Creating VPC

A VPC is a logically isolated network space self-defined by you on Tencent Cloud. You need to use Peering Connection, NAT Gateway, etc., to connect the network. For specific creation steps, please refer to Creating VPC.
Note
When building CKafka, Oceanus, Elasticsearch clusters, the network selected must be consistent for network intercommunication.

Create CKafka instance

Enter Message Queue CKafka Console, in the instance list click Create, for details on creating CKafka instance, refer to Creating Instance. After purchasing, create a Kafka topic (topic-app-info), for detailed operations, refer to Creating Topic.
Note:
VPC and subnet: select the network and subnet created.
It is recommended to choose the latest version of Kafka to ensure better compatibility with the Filebeat collection tool.
Search for the required CAM policy as needed, and click to complete policy association.


Creating Oceanus cluster

SCS services are compatible with native Flink tasks. In the SCS console, go to Computing Resources > + Create New to create a cluster. Select the region, availability zone, VPC, log, storage, set the initial password, etc. Select the newly created network for VPC and subnet. Specific creation steps can refer to Creating Dedicated Cluster. After creation, the Flink cluster is as follows:
Search for the required CAM policy as needed, and click to complete policy association.


Create Elasticsearch Instance

Go to the ES console, click new in the ES cluster managementCreate Cluster,select the previously created VPC and subnet, and set the account and password. Detailed operations can refer to Creating Cluster.
Search for the required CAM policy as needed, and click to complete policy association.


Create TCOP Service

To display system metrics from the Definition, you need to purchase Prometheus services. If only Definition business metrics are needed, this step can be skipped. Go to the TCOP page, click on the left Prometheus Monitoring, and click Create New,select the previous VPC and subnet, and set the instance name and Grafana password. Detailed operations can refer to Creating Instance.
Search for the required CAM policy as needed, and click to complete policy association.


Create Independent Grafana Resources

In the Grafana management page, make a separate purchase to display business monitoring metrics. Click Create New to enter the Grafana visualization service purchase page. For details, see Creating Instance. When purchasing, you still need to select the same VPC network as other resources.

Install and Configure Filebeat

Filebeat is a lightweight log data collection tool that collects information by monitoring designated files. Install Filebeat on the CVMs requiring monitoring of host and application information under this VPC (the configuration is generally in filebeat.yml).
Method 1: Download and install Filebeat Filebeat Download Link.
Method 2: Use the Filebeat provided in the Elasticsearch Management Page > Beats Management. In this example, Method 1 is used. Download to CVM and configure Filebeat. Add the following configuration items in the filebeat.yml file:```shell.

Log file monitoring configuration

type: log enabled: true paths:
/tmp/test.log #- c:\programdata\elasticsearch\logs*
```shell
# Monitoring data output configuration
output.kafka:
version: 2.0.0 # Kafka version
hosts: ["xx.xx.xx.xx:xxxx"] # Please fill in the actual IP address and port
topic: 'topic-app-info' # Please fill in the actual topic
Please configure the corresponding filebeat.yml file according to actual business needs. Refer to Filebeat official documentation.
Note
The example uses version 2.4.1 of Ckafka. Here, the configuration is version: 2.0.0. If the versions don't match, an ERROR [kafka] kafka/client.go:341 Kafka (topic=topic-app-info): dropping invalid message error might occur.

Solution Implementation

Next, we will introduce how to achieve personalized monitoring through SCS through a case study.

Filebeat Data Transmission

1. Go to the root directory of Filebeat and start Filebeat for data acquisition. In the example, data such as CPU, memory shown in the top command are collected. Logs of jar applications, JVM usage, listening ports, etc. can also be collected. Refer to Filebeat official website for more details.
# Start Filebeat
./filebeat -e -c filebeat.yml

# Write monitored system information to the test.log file
top -d 10 >>/tmp/test.log
2. Go to the Ckafka page, click Message Query to query the corresponding topic messages and verify if the data has been collected.

Data format collected by Filebeat:
{
"@timestamp": "2021-08-30T10:22:52.888Z",
"@metadata": {
"beat": "filebeat",
"type": "_doc",
"version": "7.14.0"
},
"input": {
"type": "log"
},
"host": {
"ip": ["xx.xx.xx.xx", "xx::xx:xx:xx:xx"],
"mac": ["xx:xx:xx:xx:xx:xx"],
"hostname": "xx.xx.xx.xx",
"architecture": "x86_64",
"os": {
"type": "linux",
"platform": "centos",
"version": "7(Core)",
"family": "redhat",
"name": "CentOSLinux",
"kernel": "3.10.0-1062.9.1.el7.x86_64",
"codename": "Core"
},
"id": "0ea734564f9a4e2881b866b82d679dfc",
"name": "xx.xx.xx.xx",
"containerized": false
},
"agent": {
"name": "xx.xx.xx.xx",
"type": "filebeat",
"version": "7.14.0",
"hostname": "xx.xx.xx.xx",
"ephemeral_id": "6c0922a6-17af-4474-9e88-1fc3b1c3b1a9",
"id": "6b23463c-0654-4f8b-83a9-84ec75721311"
},
"ecs": {
"version": "1.10.0"
},
"log": {
"offset": 2449931,
"file": {
"path": "/tmp/test.log"
}
},
"message": "(B16root0-20000S0.00.00:00.00kworker/1:0H(B"
}

SQL Assignment Writing

In SCS, process the data accessed from Kafka and store it in Elasticsearch.

1. Definition source

Construct Flink Source according to the JSON message formats in Filebeat.
CREATE TABLE DataInput (
@timestamp VARCHAR,
host ROW<id VARCHAR,ip ARRAY<VARCHAR>>,
log ROW<offset INTEGER,file ROW<path VARCHAR>>,
message VARCHAR
) WITH (
'connector' = 'kafka', -- Optional: 'kafka', 'kafka-0.11'. Make sure to select the corresponding built-in Connector
'topic' = 'topic-app-info', -- Replace with the Topic you want to consume
'scan.startup.mode' = 'earliest-offset', -- Can be any of latest-offset / earliest-offset / specific-offsets / group-offsets
'properties.bootstrap.servers' = '10.0.0.29:9092', -- Replace with your Kafka connection address
'properties.group.id' = 'oceanus_group2', -- Required parameter, make sure to specify a Group ID
-- Definition Data formats (JSON formats)
'format' = 'json',
'json.ignore-parse-errors' = 'true', -- Ignore JSON structure parsing exceptions
'json.fail-on-missing-field' = 'false' -- If set to true, an error will be thrown when a field is missing. If set to false, the missing field will be set to null
);

2. Definition sink

CREATE TABLE es_output (
id VARCHAR,
ip ARRAY<VARCHAR>,
path VARCHAR,
num INTEGER,
message VARCHAR,
createTime VARCHAR
) WITH (
'connector.type' = 'elasticsearch', -- Output to Elasticsearch
'connector.version' = '6', -- Specifies the version of Elasticsearch, e.g., '6', '7'.
'connector.hosts' = 'http://10.0.0.175:9200', -- Elasticsearch connection address
'connector.index' = 'oceanus_test2', -- Name of the Elasticsearch Index
'connector.document-type' = '_doc', -- Type of the Elasticsearch document
'connector.username' = 'elastic',
'connector.password' = 'yourpassword',
'update-mode' = 'upsert', -- Optional 'append' mode without primary key or 'upsert' mode with primary key
'connector.key-delimiter' = '$', -- Optional parameter, the delimiter for composite primary keys (defaults to '_', e.g., key1_key2_key3)
'connector.key-null-literal' = 'n/a', -- The substitute string when the primary key is null, defaults to 'null'
'connector.failure-handler' = 'retry-rejected', -- Optional error handling. Options are 'fail' (throw exception), 'ignore' (ignore any error), 'retry-rejected' (retry)

'connector.flush-on-checkpoint' = 'true', -- Optional parameter, disallows batch writes (flush) during snapshots, defaults to true
'connector.bulk-flush.max-actions' = '42', -- Optional parameter, maximum number of actions per batch
'connector.bulk-flush.max-size' = '42 mb', -- Optional parameter, maximum accumulated size per batch (supports only mb)
'connector.bulk-flush.interval' = '60000', -- Optional parameter, interval for batch writes (ms)
'connector.connection-max-retry-timeout' = '1000', -- Maximum timeout for each request (ms)
--'connector.connection-path-prefix' = '/v1' -- Optional field, the path prefix appended to each request
'format.type' = 'json' -- Output data format, currently supports only 'json'
);

3. Configure the business logic

INSERT INTO es_output
SELECT
host.id as id,
host.ip as ip,
log.file.path as path,
log.offset as num,
message,
@timestamp as createTime
from DataInput;

4. ES Data Query

Query data on the Kibana page in the ES console, or enter a CVM in the same subnet and use the following command to query:
# Query index. Replace username:password with the actual account password
curl -XGET -u username:password http://xx.xx.xx.xx:xxxx/oceanus_test2/_search -H 'Content-Type: application/json' -d'
{
"query": { "match_all": {}},
"size": 10
}
'
For more access methods, please refer to Accessing the ES Cluster.

System Metrics Monitoring

This section mainly implements system information monitoring and monitoring and alerting for the running status of Flink jobs. Prometheus is a highly flexible time series database, commonly used for monitoring data storage, computation, and alerting. SCS recommends users to use the Prometheus service provided by TCOP to avoid deployment and operation and maintenance expenses; it also supports Tencent Cloud's notification templates, allowing alerts to be easily sent to different recipients via SMS, telephone, email, Enterprise WeChat robot, etc.

Configuring Monitoring (Oceanus Job Monitoring)

In addition to the monitoring information provided by the Oceanus Console, you can configure task-level fine-grained monitoring, job-level monitoring, and cluster Flink job list monitoring.
1. On the Oceanus Job Details Page, click Job Parameters, and add the following configuration under Advanced Parameters:
pipeline.max-parallelism: 2048
metrics.reporters: promgateway
metrics.reporter.promgateway.host: xx.xx.xx.xx # Prometheus Instance Address
metrics.reporter.promgateway.port: 9090 # Prometheus Instance Port
metrics.reporter.promgateway.needBasicAuth: true
metrics.reporter.promgateway.password: xxxxxxxxxxx # Prometheus Instance Password
metrics.reporter.promgateway.interval: 10 SECONDS
2. In any Oceanus job, click Cloud Monitor to enter the Cloud Prometheus instance. Click the link to enter Grafana (you cannot enter Grafana in grayscale from this link), import the JSON file, and for more details, please refer to Accessing Prometheus Custom Monitoring.

3. The displayed Flink task monitoring effect is as follows; users can also click Edit to set different panels to optimize the display effect.


Alarm Configuration

1. Enter the TCOP interface, click Prometheus Monitoring, click on the purchased instance to go to the service management page, then select Alert Policy > New, and configure the relevant information. For detailed operations, refer to Accessing Prometheus Custom Monitoring.

2. Set alarm notification. Choose Select Template or New, and set the notification template.

3. SMS notification message.


Business Metric Monitoring

The application business data collected by Filebeat, processed by Oceanus Service, has been stored into ES. Business data monitoring can be realized through ES and Grafana.
1. Grafana configures the ES data source. Enter the Grafana Console in the grayscale release, go to the newly created TCMG, find and open the public network address. The Grafana account is admin, log in to and then select Configuration > Add Source, search elasticsearch, fill in the relevant ES instance information, and add the data source.

2. Select Dashboards > Manage on the left side, click New Dashboard on the top right corner, and create a new panel.

Display effect as follows:
Total Data Volume Write Real-time Monitoring: Monitoring the total data volume written to the data source.
Data Source Real-time Monitoring: Monitoring the data write volume from a specific log.
Field Average Value Monitoring: Monitoring the average value of a specific field.
num Field Maximum Value Monitoring: Monitoring the maximum value of the num field.

Note:
This is for demonstration only and has no actual business purposes.

Summary

This solution attempts both system monitoring metrics and business monitoring metrics. If you only need to monitor business metrics, you can skip the Promethus-related operations. Additionally, please note:
1. The version of Ckafka does not strictly correspond to the open-source version of Kafka. In this solution, Ckafka 2.4.1 and open-source Filebeat-1.14.1 versions can be successfully debugged.
2. The Promethus Service in TCOP has already been integrated into the Grafana Monitoring Service. However, it does not support custom data sources. This embedded Grafana can only connect to Promethus. You need to use the Independently Grayscale Released Grafana to access ES Data to Grafana.