SpringBoot开发案例之整合Kafka实现消息队列

前言

最近在做一款秒杀的案例,涉及到了同步锁、数据库锁、分布式锁、进程内队列以及分布式消息队列,这里对SpringBoot集成Kafka实现消息队列做一个简单的记录。

Kafka简介

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:

  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
  • 支持通过Kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。

术语介绍

  • Broker Kafka集群包含一个或多个服务器,这种服务器被称为broker
  • Topic 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
  • Partition Partition是物理上的概念,每个Topic包含一个或多个Partition.
  • Producer 负责发布消息到Kafka broker
  • Consumer 消息消费者,向Kafka broker读取消息的客户端。
  • Consumer Group 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

Kafka安装

Kafka需要依赖JAVA环境运行,如何安装JDK这里不做介绍。

下载kafka:

wget http://mirror.bit.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz

将包下载到执行目录并解压:

cd /usr/local/
tar -xcvf kafka_2.11-0.10.0.1.tgz 

修改kafka配置文件:

cd kafka_2.11-0.10.0.1/config/
#编辑配置文件
vi server.properties
broker.id=0
#端口号、记得开启端口,云服务器要开放安全组
port=9092
#服务器IP地址,修改为自己的服务器IP
host.name=127.0.0.1
#zookeeper地址和端口, Kafka支持内置的Zookeeper和引用外部的Zookeeper
zookeeper.connect=localhost:2181

分别启动 kafka 和 zookeeper:

./zookeeper-server-start.sh /usr/local/kafka_2.11-0.10.0.1/config/zookeeper.properties &
./kafka-server-start.sh /usr/local/kafka_2.11-0.10.0.1/config/server.properties &

SpringBoot集成

pom.xml引入:

<!--kafka支持-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.3.5.RELEASE</version><!--$NO-MVN-MAN-VER$-->
</dependency>

application.properties配置:

#kafka相关配置
spring.kafka.bootstrap-servers=192.168.1.180:9092
#设置一个默认组
spring.kafka.consumer.group-id=0
#key-value序列化反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#每次批量发送消息的数量
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288

生产者KafkaSender:

/**
 * 生产者
 * @author 科帮网 By https://blog.52itstyle.com
 */
@Component
public class KafkaSender {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * 发送消息到kafka
     */
    public void sendChannelMess(String channel, String message){
        kafkaTemplate.send(channel,message);
    }
}

消费者:

/**
 * 消费者 spring-kafka 2.0 + 依赖JDK8
 * @author 科帮网 By https://blog.52itstyle.com
 */
@Component
public class KafkaConsumer {
    /**
     * 监听seckill主题,有消息就读取
     * @param message
     */
    @KafkaListener(topics = {"seckill"})
    public void receiveMessage(String message){
        //收到通道的消息之后执行秒杀操作
    }
}

码云下载:从0到1构建分布式秒杀系统

参考

http://kafka.apache.org/

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏架构师之旅

一文读懂Hadoop、HBase、Hive、Spark分布式系统架构

机器学习、数据挖掘等各种大数据处理都离不开各种开源分布式系统,hadoop用户分布式存储和map-reduce计算,spark用于分布式机器学习,hive是分布...

71711
来自专栏BeJavaGod

RabbitMQ 一二事 - 简单队列使用

消息队列目前流行的有三种 1. RabbitMQ 2. ActiveMQ 3. Kafka 这三种都非常强大,RabbitMQ目前用的比较多,也比较流行,阿里也...

3195
来自专栏XAI

ActiveMQ介绍

1、ActiveMQ服务器工作模型       通过ActiveMQ消息服务交换消息。消息生产者将消息发送至消息服务,消息消费者则从消息服务接收这些消息。这些消...

2169
来自专栏沈唁志

百度编辑器UEditor使用教程以及Linux系统上传图片502报错的解决方法

2154
来自专栏yukong的小专栏

easyUI自定义iconeasyUI自定义icon

首先我们需要下载好自己需要的标签并且放在一个文件中,然后把这个文件夹复制到easyui所在目录下的themes目录下 如图

763
来自专栏Jerry的SAP技术分享

如何在Ubuntu里安装Helm

Helm是什么?在战网上玩过暗黑破坏神2代的程序员们应该还记得,Helm是国度的意思。

1333
来自专栏无题

消息中间件的设计与实践

也无风雨也无晴 消息中间件对应用的解耦 如登陆系统负责向消息中间件发送消息,而其他的系统则向消息中间件来订阅这个消息,然后完成自己的工作. 通过消息中间件解耦...

4026
来自专栏张善友的专栏

SQL Server 2008 Service Broker

SQL Server Service Broker 为消息和队列应用程序提供 SQL Server 数据库引擎本机支持。这使开发人员可以轻松地创建使用数据库引擎...

1997
来自专栏java一日一条

分布式系统与消息的投递

消息是一个非常有趣的概念,它是由来源发出一个离散的通信单元,被发送给一个或者一群接受者,无论是单体服务还是分布式系统中都有消息的概念,只是这两种系统中传输消息的...

1123
来自专栏向治洪

深刻理解HDFS工作原理

概述 HDFS(Hadoop Distributed File System )Hadoop分布式文件系统的简称。HDFS被设计成适合运行在通用硬件(commo...

23310

扫码关注云+社区