前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >何测试kafka

何测试kafka

作者头像
赵云龙龙
发布2024-07-24 14:15:12
520
发布2024-07-24 14:15:12
举报
文章被收录于专栏:python爱好部落

最近项目的消息中间件从nsq切换至kafka,说是为了避免消息丢失的问题。 没有项目管理,让我去推进,大家吭呲吭呲切换了,结果测试的时候发现性能跟不上,功能上没有问题。 kafka的基础组件是由工程院提供,是将官方的sdk包了一层,供各个业务方调用。是一个应届生写的,真搞不懂这么重要的东西,交给应届生去弄。 然后问题来了,业务方怎么用都是性能有问题,很卡顿,性能很差。工程院死活不承认,也不测试,拒不接受问题,反复让业务方提供证据。当业务方一份又一份报告给出的时候,他们就是不认可,极限拉扯。

也难怪,他们也没有几个人懂,就一个人弄这个东西,其它的事情特别多,也懒得看。 反复拉扯了个把月, 天天让我这样测,那样测,问题摆在那,就是没人去分析和解决,推进这个都搞得好烦躁,向上反馈也没啥用,上面也不懂,提供不了任何帮助,锅一直挂我头上,压力山大。

突然有一天,也许是良心发现,也许是受到某种压力,工程院去看问题了,马上就改了个参数就解决问题了。美丽的话语就不问候了。只能说这公司的管理水平和技术水平就这么奇葩。

先不吐槽了,现在来讨论如何测试kafka如何测试。

Kafka 的特性(设计原则)
  • 高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
  • 高伸缩性:每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
  • 持久性、可靠性:Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
  • 容错性:允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
  • 高并发:支持数千个客户端同时读写
Kafka 的使用场景
  • 活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常会去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。
  • 传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。
  • 度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 流式处理:流式处理是有一个能够提供多种应用程序的领域。
  • 限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。
Kafka 的基本术语

消息:Kafka 中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。

批次:为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。

主题:消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。

分区:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序

生产者:向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。

消费者:订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。

消费者群组:生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。

偏移量:偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。

broker: 一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。

broker 集群:broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。

副本:Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。

重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

可以理解为,以前取快递,需要快递员送到手中,mq作为快递站,可以存和收快递,快递就很方便了。 这样就解耦了必须本人签字收快递。 但是有个原则,快递不能错,不能丢失,不能超期,不能没人领取。 其它的快递站因为快递都丢一起,快递有丢失风险。所以用kafka, kafka可以理解为顺丰的丰巢。

生产者可以理解为快递小哥。 消息可以理解为快递。 消费者可以理解为收快递的。 消费者群组可以理解为一个小区收快递的。 broker可以理解为快递柜。 偏移量可以理解为快递柜上的小仓格。

测试kafka, 要保证消息没堆积,如果堆积消息要能消费完,如果发生故障消息不丢失, 而且要保证错误消息要重发,重消费,能消费正确。

如果要测试性能,需要用脚本压一下,以下是示例:

代码语言:javascript
复制
from confluent_kafka import Producer
import json
import time

import uuid
from datetime import datetime



# 生成随机UUID


kafka_conf = {
    'bootstrap.servers': 'kafka-headless.resource.svc.cluster.local:9097',
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_PLAINTEXT',
    'sasl.username': 'bot',
    'sasl.password': 'com123'
}

producer = Producer(kafka_conf)

# with open('test.json', 'r') as file:
#     body_data = json.load(file)


topic = 'as.audit_log.log_login'


def sent_message(body_data):
    producer.produce(topic, value=json.dumps(body_data))

    producer.flush()
    time.sleep(0.007)


if __name__ == "__main__":
    data_count = 500000  # 总数据量
    sleep_interval = 1000  # 每5万条数据后暂停
    sleep_duration = 1  # 暂停1秒
    # 获取当前时间
    now = datetime.now()

    # 按照指定格式输出
    start_time = now.strftime('%Y-%m-%d %H:%M:%S')
    print("start: {}".format(start_time))
    for i in range(data_count):
        uuid4 = uuid.uuid4()
        body_data = {
            "user_id": "c8d8a054-3a9d-11ef-a871-0ebc48156fd7",
            "user_name": "anna",
            "user_type": "authenticated_user",
            "level": 1,
            "op_type": 2,
            "date": 1720061706335227,
            "ip": "10.4.36.215",
            "mac": "",
            "msg": "上传文件“归档库使用明细表.2024-06-12.csv”成功",
            "ex_msg": "文档唯一标识:59FBDF50B015411C9B1DD093C2D226F6,父路径: AnyShare://darren; 文件大小:129 字节(Bytes); 文件密级:非密 UserAgent:Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
            "obj_id": "59FBDF50B015411C9B1DD093C2D226F6",
            "additional_info": "{\"user_account\":\"admin\"}",
            "out_biz_id": str(uuid4),
            "dept_paths": "组织结构, 组织结构/一个新的部门001"
        }
        sent_message(body_data)
        # if (i+1) % sleep_interval == 0:
        #     time.sleep(1)

    now = datetime.now()

    # 按照指定格式输出
    end_time = now.strftime('%Y-%m-%d %H:%M:%S')
    print("end: {}".format(end_time))

测试场景:

  1. 将服务scale的副本数降为0,发100W数据,然后将scale的副本数恢复。看100W数据多久消费完。
  2. 边发送边消费,3小时发送和消费100W数据,没有堆积,计算发送和消费吞吐。
  3. 边发送边消费,1小时发送和消费100W数据,有堆积,计算发送和消费吞吐。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-07-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 python粉丝团 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka 的特性(设计原则)
  • Kafka 的使用场景
  • Kafka 的基本术语
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档