专栏首页猿天地一次机房停电引发的思考

一次机房停电引发的思考

一次机房停电引发的思考

今天早上到公司的时候,接到开发反馈 DEV 环境所有接口都卡,耗时都在一分钟以上,严重影响开发正常工作,然后通过网关的日志定位到原因是因为 kafka 集群不可用(总共 3 个 broker,前一天晚上机房停电导致 leader 节点挂了),导致网关的反爬过滤器里面发送 kafka 消息的代码 kafkaTemplat.send 阻塞了 60s,当时在想这个 send 方法不是异步的吗,为什么会阻塞 60s?于是查阅了一些资料,大致搞清楚了原因,这里稍作整理,分享给可能踩坑或者以及踩过坑的同学。

版本信息

  • spring-boot:2.0.6.RELEASE
  • spring-kafka:2.1.2.RELEASE
  • kafka-clients:1.0.2

为什么阻塞了 60s?

首先我们知道 kafkaTemplat.send 底层是调用 KafkaProducer 的 send 方法

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {}

根据文档的说明[1]它是一个异步的发送方法,按道理不管如何它都不应该阻塞主线程,但实际中某些情况下会出现阻塞线程,比如 broker 未正确运行,topic 未创建等情况。具体得源码分析参考 https://www.cnblogs.com/felixzh/p/11849296.html[2]

查询官方文档http://kafka.apache.org/10/documentation.html[3]得知,具体阻塞多久是由 max.block.ms 参数决定的,由于我们的业务场景是高容忍消息丢失,低容忍阻塞请求,所以需要进行优化,下面简单介绍一下 2 种优化方案。

!!!注意,以下方案只适用于高容忍消息丢失,低容忍阻塞请求业务场景

优化方案

方案 1:参数调优

  • max.block.ms 调整到 100ms,这个参数有以下 2 个作用
    1. 用于配置 send 数据或 partitionFor 函数得到对应的 leader 时,最大的等待时间,默认值为 60 秒
    2. 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽 buffer.memory 这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过 max.block.ms 设定, 之后它将抛出一个 TimeoutException。
  • 关闭自动重试 retries=0 默认就是 0
  • 其他
  • acks=0,acks 有 4 个选项[all, -1, 0, 1] 。这里不确定会不会阻塞 send 方法,但是高容忍消息丢失,低容忍阻塞请求的业务场景配置成 0 就好了
    • 0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个 server 失败的情况下,有点像 TCP
    • 1:发送消息,并会等待 leader 收到确认后,一定的可靠性
    • -1 或 all:发送消息,等待 leader 收到确认,并进行复制操作后,才返回,最高的可靠性
  • 其他参数参考 http://kafka.apache.org/10/documentation.html[4]

虽然调整一些参数,但是 kafka 集群不可用或请求量过大时,还是对主流程有短暂的阻塞

方案 2:真异步

kafkaTemplat.send 方法其实是个假异步方法,所以需要自己实现真异步,这里构造一个公用的线程池来处理就可以了,下面为参考代码

package com.qiaofang.tortoise.gateway.component;

import com.qiaofang.tortoise.gateway.config.KafkaAsyncProperties;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * kafka异步操作工具类
 *
 * @author chenhao
 * @version 1.0
 * @date 2020/7/2 3:47 下午
 */
public class KafkaAsyncUtil {

    private final KafkaTemplate kafkaTemplate;

    private final KafkaAsyncProperties kafkaAsyncProperties;

    public KafkaAsyncUtil(KafkaTemplate kafkaTemplate, KafkaAsyncProperties kafkaAsyncProperties) {
        this.kafkaTemplate = kafkaTemplate;
        this.kafkaAsyncProperties = kafkaAsyncProperties;
        init();
    }

    private ThreadPoolTaskExecutor executor;

    private void init() {
        executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(kafkaAsyncProperties.getThreadPoolCoreThreads());
        executor.setMaxPoolSize(kafkaAsyncProperties.getThreadPoolMaxThreads());
        executor.setQueueCapacity(kafkaAsyncProperties.getThreadPoolQueueSize());
        executor.setThreadNamePrefix("kafka-async-util-pool-");
        //高容忍消息丢失场景,工作队列满了之后直接丢弃
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        executor.initialize();
    }

    /**
     * 发送消息
     *
     * @param topic
     * @param data
     */
    public void send(String topic, Object data) {
        executor.execute(() -> kafkaTemplate.send(topic, data));
    }

}

/**
 * kafka异步操作相关配置
 * @author chenhao
 * @version 1.0
 * @date 2020/7/2 3:47 下午
 */
@Data
@ConfigurationProperties(prefix = "tortoise.kafka.async")
public class KafkaAsyncProperties {

    /**
     * core
     */
    private Integer threadPoolCoreThreads = 3;
    /**
     * max
     */
    private Integer threadPoolMaxThreads = 3;
    /**
     * queue大小
     */
    private Integer threadPoolQueueSize = 10000;

}

有文章《关于高并发下 kafka producer send 异步发送耗时问题的分析[5]》说多线程高并发下 producer.send 的损耗比较严重,这个还要等到后续压测之后再更新文章吧

参考文章

站在巨人的肩膀上

  • Kafka producer 异步发送在某些情况会阻塞主线程,使用时候慎重[6]
  • HAVENT 原创 Spring Boot + Spring-Kafka 异步配置[7]
  • 关于高并发下 kafka producer send 异步发送耗时问题的分析[8]
  • http://kafka.apache.org/10/documentation.html[9]

参考资料

[1]

文档的说明: https://github.com/apache/kafka/blob/1f2d230bfdaafb34c9be12a370ab2eb4d3016039/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L853

[2]

https://www.cnblogs.com/felixzh/p/11849296.html

[3]

http://kafka.apache.org/10/documentation.html

[4]

http://kafka.apache.org/10/documentation.html

[5]

关于高并发下 kafka producer send 异步发送耗时问题的分析: https://www.cnblogs.com/dafanjoy/p/10292875.html

[6]

Kafka producer 异步发送在某些情况会阻塞主线程,使用时候慎重: https://www.cnblogs.com/felixzh/p/11849296.html

[7]

HAVENT 原创 Spring Boot + Spring-Kafka 异步配置: https://my.oschina.net/u/943746/blog/1928471

[8]

关于高并发下 kafka producer send 异步发送耗时问题的分析: https://www.cnblogs.com/dafanjoy/p/10292875.html

[9]

http://kafka.apache.org/10/documentation.html

本文分享自微信公众号 - 猿天地(cxytiandi),作者:陈浩

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-07-03

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 我所经历的一次Dubbo服务雪崩,这是一个漫长的故事

    这周,笔者经历了一次服务雪崩。服务雪崩,听到这个词就能想到问题的严重性。是的,整个项目,整条业务线都挂了,从该业务线延伸出来的下游业务线也跟着凉了。笔者是连续三...

    猿天地
  • nginx转发后后端怎么获取用户真实IP

    经常有需求要获取访问用户的IP,在经过nginx转发后真实IP就被隐藏起来了,我们需要在头部信息里拿真实IP,下面是拿IP的代码,考虑了各种情况。 public...

    猿天地
  • Logback配置文件这么写,TPS提高10倍

    SpringBoot工程自带logback和slf4j的依赖,所以重点放在编写配置文件上,需要引入什么依赖,日志依赖冲突统统都不需要我们管了。logback框架...

    猿天地
  • kafka管理神器-kafkamanager

    https://github.com/yahoo/kafka-manager/releases

    Spark学习技巧
  • 深入Spring Boot (十三):整合Kafka详解

    Kafka是一种高吞吐量的分布式流处理平台,它具有高可用、高吞吐量、速度快、易扩展等特性。本篇将介绍如何使用Spring Boot整合Kafka及使用Kafka...

    JavaQ
  • kafka中文文档

    之前的版本:0.7.x,0.8.0,0.8.1.X,0.8.2.X,0.9.0.X,0.10.0.X。

    gemron的空间
  • 谈谈程序员解决问题的能力

    用户1130025
  • 项目测试总结

    未来sky
  • 通过BGP EV**方式动态建立VXLAN隧道实现

    如下图的组网图所示,Router1为企业分支网关,Router2为企业总部网关,由于分支与总部之间用户的业务需求不同,则将其规划为不同网段。企业分支的PC_1与...

    刘銮奕
  • FeignClient注解及参数

    FeignClient注解被@Target(ElementType.TYPE)修饰,表示FeignClient注解的作用目标在接口上

    hbbliyong

扫码关注云+社区

领取腾讯云代金券