专栏首页k8s_istio​云函数实践(含代码):将日志服务的日志投递到自建 Kafka 的 3 个步骤
原创

​云函数实践(含代码):将日志服务的日志投递到自建 Kafka 的 3 个步骤

上文提到 将K8S日志采集到日志服务,这次介绍将采集的日志投递到自建 Kafka 中,用于 Spark 计算。

核心流程

容器日志 -> 日志服务 -> 使用函数处理,将日志投递至自建 Kafka

本文介绍如何创建云函数,将日志投递至 Kafka 中。

1. 创建云函数 SCF

打开 函数服务列表,基于模板 CLS 数据转存到 Ckafka 创建函数。

虽然模板是投递 Ckafka,不过 Ckafka和 Kafka 兼容性好,所以投递 Kafka 也没问题。

基于模板创建SCF

1.1 填写基础配置

启用私有网络,函数服务使用的 VPC 和 Kafka 所在 VPC 相同。

如果不同,可以使用 对等连接 解决。

启用 VPC

1.2 函数代码

默认模板会把日志原始数据当成字符串,把每个字符当成一行 message 进行输出(for record in records:),需要调整代码。

不知道是因为我的日志服务原始数据设置的是 JSON 格式,还是当前 CLS 数据转存到 Ckafka 模板过时了

SCF 函数代码

有 3 处代码修改,详见注释,完整代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import time
import logging
import os
import base64
import json
import gzip
import urllib
from kafka import KafkaProducer
from kafka.errors import KafkaError
from StringIO import StringIO

logger = logging.getLogger('kafka')
logger.setLevel(logging.INFO)
 
class ClsToKafka(object):
    """
    CLS 消息投递 kafka
    """

    def __init__(self, host, **kwargs):
        self.host = host

        self.producer = KafkaProducer(bootstrap_servers = [self.host], 
            retries = 10,
            max_in_flight_requests_per_connection = 1, 
            request_timeout_ms = 30000,
            max_block_ms = 60000,
            **kwargs
        )def send(self, topic, records):
        """
        异步生产 kafka 消息
        """

        global count
        count = 0
        def on_send_success(record_metadata):
            global count
            count = count +1

        def on_send_error(excp):
            logger.error('failed to send message', exc_info = excp)

        s_time = time.time()
        
        try:
            ## 修改 1: 原始消息是 JSON 格式(日志服务采集容器服务输出的日志格式是 JSON),每条消息位于 .records(type: List)
            ## for record in records:
            for record in records['records']:
                key = ""# 当 key 为""或者为"None" 时,要传入 key=None,这样 python kafka 库会随机选取一个 partition 写入消息
                if key == "" or key =="None":
                    key = None
                ## 修改 2:record 是 dict,因为原始数据就是 JSON,需要转成 str,否则调用 self.producer.send 会报错 "assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))"
                value = json.dumps(record)

                # 也可以对消息进行处理后再转存
                #value = deal_message(data)

                self.producer.send(topic, key = key, value = value).add_callback(on_send_success).add_errback(on_send_error)

            # block until all async messages are sent
            self.producer.flush()
        except KafkaError as e:
            return e
        finally:
            if self.producer is not None:
                self.producer.close()e_time = time.time()

        return "{} messages delivered in {}s".format(count, e_time - s_time)

# 这里可以对消息进行处理后返回
def deal_message(message):
    return message

def main_handler(event, context):
    kafka_address = os.getenv("kafka_address")
    kafka_topic_name = os.getenv("kafka_topic_name")
  
    kafka_to_kafka = ClsToKafka(
        kafka_address
        #security_protocol = "SASL_PLAINTEXT",
        #sasl_mechanism = "PLAIN",
        #sasl_plain_username = "ckafka-80o10xxx#lkoxx",
        #sasl_plain_password = "ccllxxxx",
        #api_version=(0, 10, 2)
    )event = json.loads(gzip.GzipFile(fileobj=StringIO(event['clslogs']['data'].decode('base64'))).read())# print("type of event: %s" % type(event))
  
    ## 修改 3:直接使用 event 这个字典,便于从字典中获取每条消息的内容
    ## data = json.dumps(event, indent=4, sort_keys=True)
    ## ret = kafka_to_kafka.send(kafka_topic_name,data) 
    ret = kafka_to_kafka.send(kafka_topic_name,event) 
    logger.info(ret)
    return ret

1.4 高级设置:环境设置

云函数需要使用 kafka_addresskafka_topic_name 这 2 个变量,在 环境配置 中配好。

设置环境变量

点击创建后,部署成功。

创建SCF成功

2. 为日志服务的日志主题设置函数处理

在日志服务的 日志主题 页面找到需要投递消息的主题,在 函数处理 TAB 中 选择刚创建的函数即可。

为日志主题设置函数处理

函数处理创建成功。

函数处理设置成功

3. 查看投递到自建 Kafka 的效果

等待 1 分钟后,查看函数每次调用的日志,可以看到调用已成功。

查看SCF的调用日志

同时可以了解整体调用监控数据。

查看SCF的调用监控

自建的 Kakfa 是使用 Cloudera Management 创建的,在 CM 中看到 Topic 已有数据写入。

查看SCF的调用监控

使用命令行也可以看到数据持续写入。

# ./kafka-console-consumer.sh --bootstrap-server 10.0.0.29:9092 --topic scf_topic --offset latest --partition 0
{"content": "{\"Accept\":\"*/*\",\"Body\":\"\",\"Host\":\"header.dev.xxx.cn\",\"Method\":\"GET\",\"Protocol\":\"HTTP/1.1\",\"Referer\":\"\",\"RemoteAddr\":\"172.16.7.71:37468\",\"RequestURI\":\"/\",\"Type\":\"web\",\"UserAgent\":\"Qcloud-boce\",\"X-Forwarded-For\":\"58.87.66.69\"}", "timestamp": 1618716491203428}
{"content": "{\"Accept\":\"*/*\",\"Body\":\"\",\"Host\":\"header.dev.xxx.cn\",\"Method\":\"GET\",\"Protocol\":\"HTTP/1.1\",\"Referer\":\"\",\"RemoteAddr\":\"172.16.7.71:36864\",\"RequestURI\":\"/\",\"Type\":\"curl\",\"UserAgent\":\"Qcloud-curl\",\"X-Forwarded-For\":\"180.163.9.66\"}", "timestamp": 1618716494178403}
...
Processed a total of 9 messages

FAQ

自建 Kafka 对外提供服务

如果函数调用有日志有如下报错,则证明 Kafka broker 未设置对外可访问的地址,参照 Won’t Connect to My Apache Kafka Cluster 修改 advertised.listeners 配置即可。

DNS lookup failed for hadoop-29.com:9092, exception was [Errno -2] Name or service not known. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?

DNS lookup failed for hadoop-29.com:9092 (0)

reference

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 微保 Serverless 实践之架构演进

    微保的前端架构,随着业务发展在不断进化调整。本文将具体介绍其架构演进,以及最终选择腾讯云Serverless的原因。

    腾讯云serverless团队
  • 【日志服务CLS】应用工作流ASW接入CLS实践分享

    附业务场景需求分析WIKI:https://iwiki.woa.com/pages/viewpage.action?pageId=572745651

    用户5963776
  • 腾讯云消息队列Ckafka和TDMQ选型问题

    目前随着技术架构不断演进,特别是微服务分布式技术兴起,很多大型网站逐步采用分布式的消息队列,用于面对流量高峰和异步处理,基于云上的消息队列逐步成为主流,接下来给...

    邓愉悦
  • IM开发基础知识补课(五):通俗易懂,正确理解并用好MQ消息队列

    消息是互联网信息的一种表现形式,是人利用计算机进行信息传递的有效载体,比如即时通讯网坛友最熟悉的即时通讯消息就是其具体的表现形式之一。

    JackJiang
  • 【玩转腾讯云】记一次容器服务日志处理过程

    在目前小程序为主的大背景下,有客户大部分业务在腾讯云,使用的大部分为容器服务,在大规模的使用容器下,需要对容器内业务的日志采集及分析,在腾讯云对应容器服务的日志...

    KaliArch
  • 小程序、容器、SCF、直播加速…最全面的云端架构技术揭秘(上)

    在刚刚闭幕不久的2017腾讯全球合作伙伴大会上,腾讯首次发布其AI开放全景图,并围绕AI主线进行腾讯全产品线开放布局。无论在AI方面的战略计划,还是机器学习、计...

    云加社区
  • 有赞 Flink 实时任务资源优化探索与实践

    随着 Flink k8s 化以及实时集群迁移完成,有赞越来越多的 Flink 实时任务运行在 K8s 集群上,Flink k8s 化提升了实时集群在大促时弹性扩...

    有赞coder
  • 顶级程序员必读的13本Java书籍

    公众号 IT老哥
  • TKE集群日志解决方案之日志采集

    当前技术领域容器盛行,已然是一个云原生的时代, 在技术领域都或多或少跟云计算、容器、Kubernetes、云原生应用有着不同的渊源。云原生的技术变更带来了革命性...

    朱瑞卿

扫码关注云+社区

领取腾讯云代金券