SCF:云产品业务告警功能

在使用云产品的时候,我们可能会需要一些业务告警,虽然腾讯云已经提供了监控告警功能,但是毕竟是针对一个产品通用的,并不是“定制化”的,那么我们如何做一个定制化的告警系统呢?本文将会通过腾讯云云API对Kafka消息积压数量进行监控(在云监控部分是不提供这个指标的告警),当超过阈值,通过Email以及企业微信和短信等进行业务告警。

云API对数据进行获取

说到云API数据获取部分,这里就非常推荐大家一定要用Explorer,这个产品可以帮我我们节省很多力气,本文也是通过Explorer来进行鉴权和监控数据获取的工作:

鉴权部分(已经去掉了我的SecretId和Key,如果使用请自行添加,但是注意不要泄漏):

API 2.0签名地址:https://cloud.tencent.com/document/product/215/1693

def GetSignature(param):
    # 公共参数
    param["SecretId"] = ""
    param["Timestamp"] = int(time.time())
    param["Nonce"] = random.randint(1, sys.maxsize)
    param["Region"] = "ap-guangzhou"
    # param["SignatureMethod"] = "HmacSHA256"

    # 生成待签名字符串
    sign_str = "GETckafka.api.qcloud.com/v2/index.php?"
    sign_str += "&".join("%s=%s" % (k, param[k]) for k in sorted(param))

    # 生成签名
    secret_key = ""
    if sys.version_info[0] > 2:
        sign_str = bytes(sign_str, "utf-8")
        secret_key = bytes(secret_key, "utf-8")
    hashed = hmac.new(secret_key, sign_str, hashlib.sha1)
    signature = binascii.b2a_base64(hashed.digest())[:-1]
    if sys.version_info[0] > 2:
        signature = signature.decode()

    # 签名串编码
    signature = urllib.parse.quote(signature)
    return signature

获取Kafka数据积压量

Kafka地址文档:https://cloud.tencent.com/product/ckafka

获取积压数据的API:https://cloud.tencent.com/document/product/597/30030

def GetGroupOffsets(max_lag, phoneList):
    param = {}
    param["Action"] = "GetGroupOffsets"
    param["instanceId"] = ""
    param["group"] = ""
    signature = GetSignature(param)

    # 生成请求地址
    param["Signature"] = signature
    url = "https://ckafka.api.qcloud.com/v2/index.php?Action=GetGroupOffsets&"
    url += "&".join("%s=%s" % (k, param[k]) for k in sorted(param))

    req_attr = urllib.request.urlopen(url)
    res_data = req_attr.read().decode("utf-8")
    json_data = json.loads(res_data)

    for eve_topic in json_data['data']['topicList']:
        temp_lag = 0
        result_list = []
        for eve_partition in eve_topic["partitions"]:
            lag = eve_partition["lag"]
            temp_lag = temp_lag + lag

        if temp_lag > max_lag:
            result_list.append(
                {
                    "topic": eve_topic["topic"],
                    "lag": lag
                }
            )
        
        print(result_list)
        if len(result_list)>0:
            KafkaLagRobot(result_list)
            KafkaLagSMS(result_list,phoneList)

接入企业微信

这里先贴一个企业微信的机器人地址:https://work.weixin.qq.com/api/doc#search

通过企业微信机器人配置,可以获得一个Webhook,编写告警代码:

(我已经删除掉了企业微信的webhook,请自行添加到url中)

def KafkaLagRobot(content):

    url = ""
    data = {
        "msgtype": "markdown",
        "markdown": {
            "content": content,
        }
    }
    data = str(json.dumps(data)).encode("utf-8")
    print(urllib.request.urlopen(urllib.request.Request(url, data)).read().decode("utf-8"))

接入腾讯云短信服务

(已经删掉部分敏感信息)

短信页面地址:https://cloud.tencent.com/product/sms

def KafkaLagSMS(infor, phone_list):

    random_data = random.randint(1, sys.maxsize)
    url = ""
    strMobile = phone_list
    strAppKey = ""
    strRand = str(random_data)
    strTime = int(time.time())
    sig = hashlib.sha256()
    sig.update(
        ("appkey=%s&random=%s&time=%s&mobile=%s" % (strAppKey, random_data, strTime, ",".join(strMobile))).encode(
            "utf-8"))

    phone_dict = []
    for eve_phone in phone_list:
        phone_dict.append(
            {
                "mobile": eve_phone,
                "nationcode": "86"
            }
        )

    data = {
        "ext": "",
        "extend": "",
        "params": [
            infor,
        ],
        "sig": sig.hexdigest(),
        "sign": "你的sign",
        "tel": phone_dict,
        "time": strTime,
        "tpl_id": 你的模板id
    }
    data = str(json.dumps(data)).encode("utf-8")
    print(urllib.request.urlopen(urllib.request.Request(url=url, data=data)).read().decode("utf-8"))

发送邮件告警

可以参考之前的Demo:https://cloud.tencent.com/developer/article/1419135

def sendEmail(content, to_user):
    sender = 'service@anycodes.cn'
    receivers = [to_user]

    mail_msg = content
    message = MIMEText(mail_msg, 'html', 'utf-8')
    message['From'] = Header("监控", 'utf-8')
    message['To'] = Header("站长", 'utf-8')

    subject = "告警"
    message['Subject'] = Header(subject, 'utf-8')

    try:
        smtpObj = smtplib.SMTP_SSL("smtp.exmail.qq.com", 465)
        smtpObj.login('service@anycodes.cn', '密码')
        smtpObj.sendmail(sender, receivers, message.as_string())
    except smtplib.SMTPException:
        pass

整合代码

此时我们只需要将所有的代码,通过一些逻辑进行整合即可:

# -*- coding: utf8 -*-
import json
import binascii
import hashlib
import hmac
import random
import sys
import ssl
import time
import urllib.parse
import urllib.request
import smtplib
from email.mime.text import MIMEText
from email.header import Header
ssl._create_default_https_context = ssl._create_unverified_context

def sendEmail(infor):

    temp_str = 'Topic:%s,积压数据量:%d;'
    content = ""
    for eve_infor in infor:
        content = content + temp_str % (eve_infor["topic"], eve_infor["lag"])

    sender = 'service@anycodes.cn'
    receivers = ["service@anycodes.cn"]

    mail_msg = content
    message = MIMEText(mail_msg, 'html', 'utf-8')
    message['From'] = Header("监控", 'utf-8')
    message['To'] = Header("站长", 'utf-8')

    subject = "告警"
    message['Subject'] = Header(subject, 'utf-8')

    try:
        smtpObj = smtplib.SMTP_SSL("smtp.exmail.qq.com", 465)
        smtpObj.login('service@anycodes.cn', '密码')
        smtpObj.sendmail(sender, receivers, message.as_string())
    except smtplib.SMTPException:
        pass

def KafkaLagRobot(infor):
    base_str = "Kafka消费者监控提醒:\n"
    temp_str = '>Topic:<font color="comment">%s</font>,积压数据量:<font color="warning">%d</font>条;\n'
    content = ""
    for eve_infor in infor:
        content = content + temp_str % (eve_infor["topic"], eve_infor["lag"])

    content = base_str + content

    url = ""
    data = {
        "msgtype": "markdown",
        "markdown": {
            "content": content,
        }
    }
    data = str(json.dumps(data)).encode("utf-8")
    print(urllib.request.urlopen(urllib.request.Request(url, data)).read().decode("utf-8"))


def KafkaLagSMS(infor, phone_list):

    temp_str = 'Topic:%s,积压数据量:%d;'
    content = ""
    for eve_infor in infor:
        content = content + temp_str % (eve_infor["topic"], eve_infor["lag"])

    random_data = random.randint(1, sys.maxsize)
    url = ""
    strMobile = phone_list
    strAppKey = ""
    strRand = str(random_data)
    strTime = int(time.time())
    sig = hashlib.sha256()
    sig.update(
        ("appkey=%s&random=%s&time=%s&mobile=%s" % (strAppKey, random_data, strTime, ",".join(strMobile))).encode(
            "utf-8"))

    phone_dict = []
    for eve_phone in phone_list:
        phone_dict.append(
            {
                "mobile": eve_phone,
                "nationcode": "86"
            }
        )

    data = {
        "ext": "",
        "extend": "",
        "params": [
            content,
        ],
        "sig": sig.hexdigest(),
        "sign": "",
        "tel": phone_dict,
        "time": strTime,
        "tpl_id": 
    }
    data = str(json.dumps(data)).encode("utf-8")
    print(urllib.request.urlopen(urllib.request.Request(url=url, data=data)).read().decode("utf-8"))


def GetSignature(param):
    # 公共参数
    param["SecretId"] = ""
    param["Timestamp"] = int(time.time())
    param["Nonce"] = random.randint(1, sys.maxsize)
    param["Region"] = "ap-guangzhou"
    # param["SignatureMethod"] = "HmacSHA256"

    # 生成待签名字符串
    sign_str = "GETckafka.api.qcloud.com/v2/index.php?"
    sign_str += "&".join("%s=%s" % (k, param[k]) for k in sorted(param))

    # 生成签名
    secret_key = ""
    if sys.version_info[0] > 2:
        sign_str = bytes(sign_str, "utf-8")
        secret_key = bytes(secret_key, "utf-8")
    hashed = hmac.new(secret_key, sign_str, hashlib.sha1)
    signature = binascii.b2a_base64(hashed.digest())[:-1]
    if sys.version_info[0] > 2:
        signature = signature.decode()

    # 签名串编码
    signature = urllib.parse.quote(signature)
    return signature


def GetGroupOffsets(max_lag, phoneList):
    param = {}
    param["Action"] = "GetGroupOffsets"
    param["instanceId"] = ""
    param["group"] = ""
    signature = GetSignature(param)

    # 生成请求地址
    param["Signature"] = signature
    url = "https://ckafka.api.qcloud.com/v2/index.php?Action=GetGroupOffsets&"
    url += "&".join("%s=%s" % (k, param[k]) for k in sorted(param))

    req_attr = urllib.request.urlopen(url)
    res_data = req_attr.read().decode("utf-8")
    json_data = json.loads(res_data)

    for eve_topic in json_data['data']['topicList']:
        temp_lag = 0
        result_list = []
        for eve_partition in eve_topic["partitions"]:
            lag = eve_partition["lag"]
            temp_lag = temp_lag + lag

        if temp_lag > max_lag:
            result_list.append(
                {
                    "topic": eve_topic["topic"],
                    "lag": lag
                }
            )
        
        print(result_list)
        if len(result_list)>0:
            KafkaLagRobot(result_list)
            KafkaLagSMS(result_list,phoneList)
            sendEmail(result_list)

def main_handler(event, context):
    # 发送短信的列表
    phone_list = ["PhoneNumber"]
    GetGroupOffsets(2000, phone_list)
    return True

总结

腾讯云SCF是一个非常有趣,且非常有价值的产品,我之前做了一个项目,由于临时需要有一个活动,需要增加一个活动模块,但是不想修改源代码,就通过腾讯云的SCF对数据库进行增删改查,增加了点逻辑代码,与API网关结合,很快上线,开发过程非常愉快。其实在生活中,灵活运用一个产品或者几个产品结合使用,是非常有趣的,同时正确应用一款产品,也会让你的工作事半功倍,本文主要通过云API对云监控数据进行获取,获取到Kafka数据积压量,进行一个逻辑处理,然后调用了发送邮件的方法、发送短信的方法以及发送企业微信的方法,实现了监控告警功能,经过使用时间触发器:

效果良好,也成功实现了基本告警功能。写本文的目的,也是希望大家,可以通过这样的一个Demo,将其应用到自己的项目中,发挥更大的价值,定制更有趣,更有价值,更加灵活多变的告警策略,服务项目。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏智慧教育

腾讯教育牵手江苏省教育厅 正式启动“名师空中课堂”

? 6月28日,由江苏省教育厅组织建设的省“名师空中课堂”网络端正式上线。该平台以微信和企业微信的用户连接能力为依托,以腾讯智慧校园平台为入口,支撑江苏省“名...

1.9K30
来自专栏大龄程序员的人工智能之路

AI.未来

轰轰烈烈的双十一购物狂欢终于过去了,你的荷包是不是轻了好几吨?不知道你有没有被各种优惠、满减整的头晕脑胀,反正我是晕了。想想现在都已经是人工智能时代了,一二十年...

13030
来自专栏乐百川的学习频道

使用腾讯企业邮箱免费版

版权声明:本文为博主原创文章,转载请注明出处。 ...

1.5K50
来自专栏SAP梦心的SAP分享

警惕SAP项目被“中间商赚差价”

前段时间某买卖二手车的广告特别火,里面有一句话叫“没有中间商赚差价”特别有说服力。同样在做SAP项目的过程中也是要警惕各种“中间商”赚差价。

16120
来自专栏数商云贸

探索制造业供应链的嬗变之道,加快布局柔性供应链基础设施

当经济增长由高增速到中增速,经济结构粗放转变优化升级,以及复杂的国际环境,都注定了实施并不断优化供应链战略成为中国经济发展的必然选择。

9330
来自专栏罗超频道

OYO与OTA相爱还是相杀?

携程和美团不约而同地与OYO握手言和,体现出的趋势是,OYO不是OTA的对手,两者合作的空间远远大于未来竞争的可能。

11420
来自专栏SAP梦心的SAP分享

写在Logg SAP项目上线之际

根据大环境大行业的惯用做法,公司建立Logg品牌是在意料之中。毫无意外的,Logg也要上到SAP系统中。

10520
来自专栏Java学习资料

新手Java程序员找工作更看重Java项目经验?

动力节点IT培训,全真项目实操实训,贯穿八大行业,彻底帮助学员摆脱纸上谈兵的尴尬,一技成,天下行。

21620
来自专栏python前行者

企查查api接口操作

企查查api接口中心:http://openapi.qichacha.com/DataCenter

1.1K20
来自专栏用户1068756的专栏

小程序风口之下商家该如何推广拉新?

小程序风口之下商家该如何推广拉新?是不是让很多商家人摸不着头脑?其实这都有现成的方法直接使用。而且相当简单高效!

12330

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励