前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >​云函数实践(含代码):将日志服务的日志投递到自建 Kafka 的 3 个步骤

​云函数实践(含代码):将日志服务的日志投递到自建 Kafka 的 3 个步骤

原创
作者头像
SRE扫地僧
修改2021-04-18 14:47:00
9260
修改2021-04-18 14:47:00
举报
文章被收录于专栏:k8s_istiok8s_istio

上文提到 将K8S日志采集到日志服务,这次介绍将采集的日志投递到自建 Kafka 中,用于 Spark 计算。

核心流程

容器日志 -> 日志服务 -> 使用函数处理,将日志投递至自建 Kafka

本文介绍如何创建云函数,将日志投递至 Kafka 中。

1. 创建云函数 SCF

打开 函数服务列表,基于模板 CLS 数据转存到 Ckafka 创建函数。

虽然模板是投递 Ckafka,不过 Ckafka和 Kafka 兼容性好,所以投递 Kafka 也没问题。

基于模板创建SCF
基于模板创建SCF

1.1 填写基础配置

启用私有网络,函数服务使用的 VPC 和 Kafka 所在 VPC 相同。

如果不同,可以使用 对等连接 解决。

启用 VPC
启用 VPC

1.2 函数代码

默认模板会把日志原始数据当成字符串,把每个字符当成一行 message 进行输出(for record in records:),需要调整代码。

不知道是因为我的日志服务原始数据设置的是 JSON 格式,还是当前 CLS 数据转存到 Ckafka 模板过时了

SCF 函数代码
SCF 函数代码

有 3 处代码修改,详见注释,完整代码如下:

代码语言:txt
复制
#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import time
import logging
import os
import base64
import json
import gzip
import urllib
from kafka import KafkaProducer
from kafka.errors import KafkaError
from StringIO import StringIO

logger = logging.getLogger('kafka')
logger.setLevel(logging.INFO)
 
class ClsToKafka(object):
    """
    CLS 消息投递 kafka
    """

    def __init__(self, host, **kwargs):
        self.host = host

        self.producer = KafkaProducer(bootstrap_servers = [self.host], 
            retries = 10,
            max_in_flight_requests_per_connection = 1, 
            request_timeout_ms = 30000,
            max_block_ms = 60000,
            **kwargs
        )def send(self, topic, records):
        """
        异步生产 kafka 消息
        """

        global count
        count = 0
        def on_send_success(record_metadata):
            global count
            count = count +1

        def on_send_error(excp):
            logger.error('failed to send message', exc_info = excp)

        s_time = time.time()
        
        try:
            ## 修改 1: 原始消息是 JSON 格式(日志服务采集容器服务输出的日志格式是 JSON),每条消息位于 .records(type: List)
            ## for record in records:
            for record in records['records']:
                key = ""# 当 key 为""或者为"None" 时,要传入 key=None,这样 python kafka 库会随机选取一个 partition 写入消息
                if key == "" or key =="None":
                    key = None
                ## 修改 2:record 是 dict,因为原始数据就是 JSON,需要转成 str,否则调用 self.producer.send 会报错 "assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))"
                value = json.dumps(record)

                # 也可以对消息进行处理后再转存
                #value = deal_message(data)

                self.producer.send(topic, key = key, value = value).add_callback(on_send_success).add_errback(on_send_error)

            # block until all async messages are sent
            self.producer.flush()
        except KafkaError as e:
            return e
        finally:
            if self.producer is not None:
                self.producer.close()e_time = time.time()

        return "{} messages delivered in {}s".format(count, e_time - s_time)

# 这里可以对消息进行处理后返回
def deal_message(message):
    return message

def main_handler(event, context):
    kafka_address = os.getenv("kafka_address")
    kafka_topic_name = os.getenv("kafka_topic_name")
  
    kafka_to_kafka = ClsToKafka(
        kafka_address
        #security_protocol = "SASL_PLAINTEXT",
        #sasl_mechanism = "PLAIN",
        #sasl_plain_username = "ckafka-80o10xxx#lkoxx",
        #sasl_plain_password = "ccllxxxx",
        #api_version=(0, 10, 2)
    )event = json.loads(gzip.GzipFile(fileobj=StringIO(event['clslogs']['data'].decode('base64'))).read())# print("type of event: %s" % type(event))
  
    ## 修改 3:直接使用 event 这个字典,便于从字典中获取每条消息的内容
    ## data = json.dumps(event, indent=4, sort_keys=True)
    ## ret = kafka_to_kafka.send(kafka_topic_name,data) 
    ret = kafka_to_kafka.send(kafka_topic_name,event) 
    logger.info(ret)
    return ret

1.4 高级设置:环境设置

云函数需要使用 kafka_addresskafka_topic_name 这 2 个变量,在 环境配置 中配好。

设置环境变量
设置环境变量

点击创建后,部署成功。

创建SCF成功
创建SCF成功

2. 为日志服务的日志主题设置函数处理

在日志服务的 日志主题 页面找到需要投递消息的主题,在 函数处理 TAB 中 选择刚创建的函数即可。

为日志主题设置函数处理
为日志主题设置函数处理

函数处理创建成功。

函数处理设置成功
函数处理设置成功

3. 查看投递到自建 Kafka 的效果

等待 1 分钟后,查看函数每次调用的日志,可以看到调用已成功。

查看SCF的调用日志
查看SCF的调用日志

同时可以了解整体调用监控数据。

查看SCF的调用监控
查看SCF的调用监控

自建的 Kakfa 是使用 Cloudera Management 创建的,在 CM 中看到 Topic 已有数据写入。

查看SCF的调用监控
查看SCF的调用监控

使用命令行也可以看到数据持续写入。

代码语言:txt
复制
# ./kafka-console-consumer.sh --bootstrap-server 10.0.0.29:9092 --topic scf_topic --offset latest --partition 0
{"content": "{\"Accept\":\"*/*\",\"Body\":\"\",\"Host\":\"header.dev.xxx.cn\",\"Method\":\"GET\",\"Protocol\":\"HTTP/1.1\",\"Referer\":\"\",\"RemoteAddr\":\"172.16.7.71:37468\",\"RequestURI\":\"/\",\"Type\":\"web\",\"UserAgent\":\"Qcloud-boce\",\"X-Forwarded-For\":\"58.87.66.69\"}", "timestamp": 1618716491203428}
{"content": "{\"Accept\":\"*/*\",\"Body\":\"\",\"Host\":\"header.dev.xxx.cn\",\"Method\":\"GET\",\"Protocol\":\"HTTP/1.1\",\"Referer\":\"\",\"RemoteAddr\":\"172.16.7.71:36864\",\"RequestURI\":\"/\",\"Type\":\"curl\",\"UserAgent\":\"Qcloud-curl\",\"X-Forwarded-For\":\"180.163.9.66\"}", "timestamp": 1618716494178403}
...
Processed a total of 9 messages

FAQ

自建 Kafka 对外提供服务

如果函数调用有日志有如下报错,则证明 Kafka broker 未设置对外可访问的地址,参照 Won’t Connect to My Apache Kafka Cluster 修改 advertised.listeners 配置即可。

代码语言:txt
复制
DNS lookup failed for hadoop-29.com:9092, exception was [Errno -2] Name or service not known. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?

DNS lookup failed for hadoop-29.com:9092 (0)

reference

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 核心流程
  • 1. 创建云函数 SCF
    • 1.1 填写基础配置
      • 1.2 函数代码
        • 1.4 高级设置:环境设置
        • 2. 为日志服务的日志主题设置函数处理
        • 3. 查看投递到自建 Kafka 的效果
        • FAQ
          • 自建 Kafka 对外提供服务
          • reference
          相关产品与服务
          日志服务
          日志服务(Cloud Log Service,CLS)是腾讯云提供的一站式日志服务平台,提供了从日志采集、日志存储到日志检索,图表分析、监控告警、日志投递等多项服务,协助用户通过日志来解决业务运维、服务监控、日志审计等场景问题。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档