前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >SpringBoot 整合 Kafka 实现千万级数据异步处理,实战介绍!

SpringBoot 整合 Kafka 实现千万级数据异步处理,实战介绍!

作者头像
Java极客技术
发布于 2023-02-23 09:54:37
发布于 2023-02-23 09:54:37
8.8K00
代码可运行
举报
文章被收录于专栏:Java极客技术Java极客技术
运行总次数:0
代码可运行

一、介绍

在之前的文章中,我们详细的介绍了 kafka 的架构模型,在集群环境中,kafka 可以通过设置分区数来加快数据的消费速度。

光知道理论可不行,我们得真真切切的实践起来才行!

下面,我将结合生产环境的真实案例,以SpringBoot技术框架为基础,向大家介绍 kafka 的使用以及如何实现数据高吞吐!

二、代码实践

最近,公司大数据团队每天凌晨会将客户的订单数据进行统计计算,然后把业绩数据推送给我们,以便销售人员每天能看到昨天的业绩数据,数据的体量大约在 1000 多万条,以下是我对接的过程!

2.1、添加 kafka 依赖包

本次项目的SpringBoot版本为2.1.5.RELEASE,依赖的 kafka 的版本为2.2.6.RELEASE

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<!--kafka-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.6.RELEASE</version>
</dependency>
2.2、添加 kafka 配置变量

当添加完了依赖包之后,我们只需要在application.properties中添加 kafka 配置变量,基本上就可以正常使用了。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=197.168.25.196:9092
#重试次数
spring.kafka.producer.retries=3
#批量发送的消息数量
spring.kafka.producer.batch-size=1000
#32MB的批处理缓冲区
spring.kafka.producer.buffer-memory=33554432
#默认消费者组
spring.kafka.consumer.group-id=crm-user-service
#最早未被消费的offset
spring.kafka.consumer.auto-offset-reset=earliest
#批量一次最大拉取数据量
spring.kafka.consumer.max-poll-records=4000
#是否自动提交
spring.kafka.consumer.enable-auto-commit=true
#自动提交时间间隔,单位ms
spring.kafka.consumer.auto-commit-interval=1000
2.3、创建一个消费者
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Component
public class BigDataTopicListener {

    private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);

    /**
     * 监听kafka数据
     * @param consumerRecords
     * @param ack
     */
    @KafkaListener(topics = {"big_data_topic"})
    public void consumer(ConsumerRecord<?, ?> consumerRecord) {
        log.info("收到bigData推送的数据'{}'", consumerRecord.toString());
        //...
        //db.save(consumerRecord);//插入或者更新数据
    }

}
2.4、模拟对方推送数据测试
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Test
    public void testSend(){
        for (int i = 0; i < 5000; i++) {
            Map<String, Object> map = new LinkedHashMap<>();
            map.put("datekey", 20210610);
            map.put("userid", i);
            map.put("salaryAmount", i);
            //向kafka的big_data_topic主题推送数据
            kafkaTemplate.send("big_data_topic", JSONObject.toJSONString(map));
        }
    }
}

起初,通过这种单条数据消费方式,进行测试程序没太大毛病!

但是,当上到生产之后,发现一个很大的问题,就是消费 1000 万条数据,至少需要3个小时,结果导致数据看板一直没数据。

第二天痛定思痛,决定改成批量消费模型,怎么操作呢,请看下面!

2.5、将 kafka 的消费模式改成批量消费

首先,创建一个KafkaConfiguration配置类,内容如下!

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.retries}")
    private Integer retries;

    @Value("${spring.kafka.producer.batch-size}")
    private Integer batchSize;

    @Value("${spring.kafka.producer.buffer-memory}")
    private Integer bufferMemory;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private Integer maxPollRecords;

    @Value("${spring.kafka.consumer.batch.concurrency}")
    private Integer batchConcurrency;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private Integer autoCommitInterval;


    /**
     *  生产者配置信息
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.ACKS_CONFIG, "0");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    /**
     *  生产者工厂
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     *  生产者模板
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }


    /**
     *  消费者配置信息
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    /**
     *  消费者批量工厂
     */
    @Bean
    public KafkaListenerContainerFactory<?> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        //设置并发量,小于或等于Topic的分区数
        factory.setConcurrency(batchConcurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }

}

同时,新增一个spring.kafka.consumer.batch.concurrency变量,用来设置并发数,通过这个参数我们可以指定几个线程来实现消费。

application.properties配置文件中,添加如下变量

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
#批消费并发量,小于或等于Topic的分区数
spring.kafka.consumer.batch.concurrency = 3

#设置每次批量拉取的最大数量为4000
spring.kafka.consumer.max-poll-records=4000

#设置自动提交改成false
spring.kafka.consumer.enable-auto-commit=false

最后,将单个消费方法改成批量消费方法模式

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Component
public class BigDataTopicListener {

    private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);

    /**
     * 监听kafka数据(批量消费)
     * @param consumerRecords
     * @param ack
     */
    @KafkaListener(topics = {"big_data_topic"}, containerFactory = "batchFactory")
    public void batchConsumer(List<ConsumerRecord<?, ?>> consumerRecords, Acknowledgment ack) {
        long start = System.currentTimeMillis();

        //...
        //db.batchSave(consumerRecords);//批量插入或者批量更新数据

        //手动提交
        ack.acknowledge();
        log.info("收到bigData推送的数据,拉取数据量:{},消费时间:{}ms", consumerRecords.size(), (System.currentTimeMillis() - start));
    }

}

此时,消费性能大大的提升,数据处理的非常快,500万条数据,最多 30 分钟就全部消费完毕了。

本例中的消费微服务,生产环境部署了3台服务器,同时big_data_topic主题的分区数为3,因此并发数设置为3比较合适。

随着推送的数据量不断增加,如果你觉得消费速度还不够,你可以重新设置每次批量拉取的最大数量,活着横向扩展微服务的集群实例数量和 topic 的分区数,以此来加快数据的消费速度。

但是,如果在单台机器中,每次批量拉取的最大数量过大,大对象也会很大,会造成频繁的 gc 告警!

因此,在实际的使用过程中,每次批量拉取的最大数量并不是越大越好,根据当前服务器的硬件配置,调节到合适的阀值,才是最优的选择!

三、小结

本文主要以SpringBoot技术框架为背景,结合实际业务需求,采用 kafka 进行数据消费,实现数据量的高吞吐,在下篇文章中,我们会介绍消费失败的处理流程。

由于笔者才疏学浅,难免会有理解不到位的地方,欢迎网友批评指出!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-02-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java极客技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
在同一台电脑上同时安装Python2和Python3
目前Python的两个版本Python2和Python3同时存在,且这两个版本同时在更新与维护。
全栈程序员站长
2022/07/07
1.2K0
在同一台电脑上同时安装Python2和Python3
python2.x和python3.x共
   下载完成之后,解压到python3所在的安装目录,用CMD控制台进入解压目录,输入:
py3study
2020/01/08
5200
怎么同时 安装python3和python2
Windows 下python3和python2 我们该怎么同时安装python3跟python2呢
程序员小新
2021/12/23
7130
“Python2与Python3共存”,会带来一些什么实质性的改变?
最近被一个问题困扰了很久,就是Python2和Python3共存pip的使用问题,本来是一个很简单的问题。但是我的Python2的pip就是用不了。困扰了我两天的时间。最后发现是我在Python3中换源导致的。如果大家在电脑中也安装了Python2和Python3两个版本的话。不建议在Python3中设置永久换源的配置文件,因为这样可能导致Python2的pip用不了。
汤贤
2020/05/18
9930
“Python2与Python3共存”,会带来一些什么实质性的改变?
Python双版本Windows安装攻略
本文将详细介绍 Python 2.x 和 Python 3.x 在 Windows 系统下的安装过程,并对环境变量的配置进行说明。
程序员NEO
2025/04/09
1070
Python双版本Windows安装攻略
Windows下同时安装python2、python3和pip2、pip3设置
打开,控制面板\系统和安全\系统,选择高级系统设置,环境变量,选择Path,点击编辑,新建,分别添加D:\Python\python27和D:\Python\python27\Scripts到环境变量。
周小董
2019/03/25
3.7K0
Windows下同时安装python2、python3和pip2、pip3设置
Win10下python3和python2多版本同时安装并解决pip共存问题
特别说明,本文是在Windows64位系统下进行的,32位系统请下载相应版本的安装包,安装方法类似。
拓荒者
2019/03/16
7.3K0
python3和python2共存
特别说明,本文是在Windows64位系统下进行的,32位系统请下载相应版本的安装包,安装方法类似。
似水的流年
2018/01/14
1.3K0
Python入门之Python在Win10环境下的配置(图文教程)
请在Python官网下载Python2.7和Python3.6安装包,虽然最新的是3.6版本,但是建议两个包都安装,方便后期在IDE工具切换。 Python官网:https://www.python.
Jetpropelledsnake21
2018/05/02
7540
Python入门之Python在Win10环境下的配置(图文教程)
windows下多个python版本共存
    了解python的人都知道python有2.x版本和3.x版本,而python3.x版本不向下兼容,但是根据具体的需要,有时候要2.x和3.x共存,python共存本身没有问题,只是需要设置一些环境变量和修改一些东西来让它更容易使用。本文主要说明的是windows环境
py3study
2020/01/06
5.5K0
windows下多个python版本共存
Python-Opencv的安装
Windows下安装python2和python3双版本 Python2的Opencv的安装 Python整出的向下不兼容简直是败笔,无奈,还是要处理。 一直都在使用Python2,后来爬虫为了和team统一,使用上Python3。 现在上手图像处理的一下学习,不想用C++,Opencv有Python的接口,还是Python2成熟点,为了后续的出现问题少,还是用Python2吧。
一点儿也不潇洒
2018/08/07
2.3K0
Python-Opencv的安装
python2 手动安装更新pip
现在对于python2版本,直接修改python.exe名字为python2.exe,命令python2 -m pip install –upgrade pip –force-reinstall可能不能够成功更新pip2,这时候需要检查安装文件夹下是否有Scripts文件夹和其完整性。
全栈程序员站长
2022/07/07
1.2K0
python2 手动安装更新pip
window 下python2.7与py
(1)在Path环境变量中检查以下4个变量(Path中的环境变量是以分号隔开的): 1.c:\Python27 2.c:\Python27\Scripts 3.c:\Python35 4.c:\Python35\Scripts
py3study
2020/01/13
5190
Python win10下同时安装python3,python2
安装很简单,只要打开进行下一步安装即可。 在安装的路径上我做了一下小修改,不过大家可以根据自己电脑具体情况修改即可。 我的安装路径则是如下: python2 : D:\Python27 python3 : D:\Python37-32
Devops海洋的渔夫
2019/05/31
7330
Python开发环境搭建1.下载及安装2.一台PC安装多个python版本3.python2和python3版本共存的配置4. pip安装模块时执行的命令5.给python安装第三方模块6.查看pip
现在python3是趋势,很多公司已经逐渐使用python3,但是对于爬虫来说,我们现在仍需用2.7,所以现在我们安装Python2.7.9版本
Python攻城狮
2018/08/23
8270
Python开发环境搭建1.下载及安装2.一台PC安装多个python版本3.python2和python3版本共存的配置4. pip安装模块时执行的命令5.给python安装第三方模块6.查看pip
selenium win7+selenium2.0+python环境搭建
担心最新版的支持不太好,这里我下载的是python 2.7(selenium之前不支持python3.x)
授客
2019/09/12
1.3K0
教你们如何切换Python2与Python3 草稿箱
当同时安装Python2和Python3后,如何兼容并切换使用详解(比如pip使用) 由于历史原因,[Python](https://so.csdn.net/so/search?from=pc_blo
程序员鑫港
2021/12/25
6300
robotframework 学习(1) : 环境搭建,RIDE 快捷方式创建
下载地址:http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
Mokwing
2020/09/08
1.7K1
python2和python3的共存
1.安装过程中可以手动选择安装路径,本文中的安装路径为"D:\python2", "D:\python3"。
似水的流年
2021/03/23
3.9K0
Selenium win7+selenium2.0+python+JetBrains PyCharm环境搭建
担心最新版的支持不太好,这里我下载的是python 2.7(selenium之前不支持python3.x)
授客
2019/09/11
6190
Selenium win7+selenium2.0+python+JetBrains PyCharm环境搭建
推荐阅读
相关推荐
在同一台电脑上同时安装Python2和Python3
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档