完整教程请订阅专栏教程《rabbitmq/kafka实战教程》https://blog.csdn.net/zpcandzhj/category_10152842.html
Kafka 最初是由Linkedin 即领英公司基于Scala和 Java语言开发的分布式消息发布-订阅系统,现已捐献给Apache软件基金会。Kafka 最被广为人知的是作为一个 消息队列(mq)系统存在,而事实上kafka已然成为一个流行的分布式流处理平台。其具有高吞吐、低延迟的特性,许多大数据处理系统比如storm、spark、flink等都能很好地与之集成。按照Wikipedia上的说法,kafka的核心数据结构本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”。总的来讲,kafka通常具有3重角色:
一句话概括:Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),在业界主要应用于大数据实时处理领域。
如图所示,kafka的体系结构中通常包含多个Producer(生产者)、多个Consumer(消费者)、多个Broker(Kafka服务器)以及一个zookeeper集群。
体系结构中几个角色
Topic (主题)和Partition(分区)是Kafka 中的两个核心概念。在Kafka 中,消息以topic为单位进行归类。生产者必须将消息发送到指定的topic,即发送到Kafka 集群的每一条消息都必须指定一个主题;消费者消费消息也要指定主题,即消费者负责订阅主题并进行消费。
试想如果一个Topic在Kafka中只对应一个存储文件,那么海量数据场景下这个文件所在机器的I/O 将会成为这个主题的性能瓶颈,而分区解决了这个问题。在Kafka中一个topic可以分为多个分区(partition),每个分区通常以分布式的方式存储在不同的机器上。一个特定的partition只属于一个topic。kafka通常用来处理超大规模数据,因此创建主题时可以立即指定多个分区以提高处理性能。当然也可以创建完成后再修改分区数。同一个主题的不同分区包含的消息是不同的。底层存储上,每一个分区对应一个可追加写的Log文件,消息在被追加到分区Log文件时会分配一个特定的offset (偏移量),offset 是消息在分区中的唯一标识, Kafka 通过offset 来保证消息在分区内的有序性。offset 并不跨越分区,即Kafka 保证的是分区有序而不是全局有序。 下图展示了消息的追加写入:
Kafka中的分区可以分布在不同的服务器( broker )上,因此主题可以通过分区的方式跨越多个broker ,相比单个broker 、单个分区而言并行度增加了,性能提升不少。
在分区之下,Kafka又引入了副本(Replica)的概念。如果说增加分区数实现了水平扩展,增加副本数则是实现了纵向扩展,并提升了容灾能力。同一分区的不同副本中保存的消息是相同的。需要注意的是,在同一时刻,副本之间并非完全一样,因为同步存在延迟。副本之间是一主多从的关系。leader 副本负责处理读写请求, follower 副本只负责与leader 副本进行消息同步。副本存在不同的broker中,当leader 副本出现故障时,从follower 副本中重新选举新的leader 副本对外提供读写服务。Kafka 通过多副本机制实现了故障的自动转移,当Kafka 集群中某个broker 挂掉时,副本机制保证该节点上的partition数据不丢失,仍然能保证kafka服务可用。
下图展示了一个多副本的架构。本例中kafka集群中有4台broker,主题分区数为3,且副本因子也为3。生产者和消费者客户端只和leader副本进行交互,follower副本只负责和leader进行消息同步。每个分区都存在不同的broker中,如果每个broker单独部署一台机器的话,那么不同的Partition及其副本在物理上便是隔离的。
可以认为topic 是逻辑上的概念,partition是物理上的概念,因为每个partition 都对应于一个.log文件存储在kafka 的log目录下,该log 文件中存储的就是producer 生产的数据。Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的offset(偏移位)。消费者组中的每个消费者,每次消费完数据都会向kafka服务器提交offset,以便出错恢复时从上次的位置继续消费。
消费组(Consumer Group)是Kafka的消费理念中一种特有的概念,每个消费者都属于一个消费组。生产者的消息发布到主题后,只会被投递给订阅该主题的每个消费组中的一个消费者。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;所有的消费者都有一个与之对应的消费者组,即消费者组是逻辑上的一个订阅者。消费者组之间互不影响,多个不同的消费者组可以同时订阅一个Topic,此时消息会同时被每个消费者组中一个消费者消费。
理解上述概念有助于在实际应用中规划topic分区数,消费者数、生产者数。实际生产中,一般分区数和消费者数保持相等,如果这个主题的消费者数大于主题的分区数,那么多出来的消费者将消费不到数据,只能浪费系统资源。
如前文所述,生产者生产的消息会不断追加到log 文件末尾,为防止log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个partition 分为多个segment。每个segment都有对应的.index文件和.log文件以及.timeindex文件。这些文件位于kafka的配置文件server.properties中配置项log.dirs所指定目录下的一个文件夹中,该文件夹的命名规则为:topic名称+分区序号。例如test这个topic 设置了三个分区,则会创建对应的文件夹test-0,test-1,test-2。每个文件下的索引和日志格式文件如下,index和log文件以当前segment 的第一条消息的offset 命名。
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex 00000000000000130610.index 00000000000000130610.log 00000000000000121343.timeindex
日志分段文件对应的两个索引文件主要是用来提高查找消息的速度。偏移量索引建立了消息位移(offset)和物理地址之间的映射关系;时间戳索引则方便根据指定时间戳查找偏移量信息。每写入一定量(kafka配置文件参数log.index.interval.bytes 指定,默认值为4096 ,即4KB )的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。可以通过配置log.index.interval.bytes 的值,增加或减小索引项的密度。
kafka高效读写的原因之一在于使用了顺序写磁盘技术和零拷贝技术。
1)顺序写盘
传统的消息中间件比如RabbitMQ使用内存作为默认的存储介质,而将磁盘作为备选介质,以此实现高吞吐、低延迟的特性。事实上有研究表明,同样的磁盘,顺序写速度能到600MB/s,而随机写只有100K/s。这与磁盘的机械特性有关,顺序写省去了大量磁头寻址时间。因此顺序写磁盘的速度甚至快于随机写内存。因此Kafka 在设计时采用了文件追加的方式来顺序写入消息到磁盘中。此外kafka还充分利用了磁盘页缓存来减少磁盘IO。
2)零拷贝
零拷贝是指将数据直接从磁盘复制到网卡设备中而无需经过应用程序。零拷贝大大提高了应用程序的性能,减少了系统在内核模式和用户模式之间的上下文切换。在netty等框架中也使用了Zero-Copy技术来提升IO性能。在Linux中,零拷贝依赖操作系统底层的sendfile()函数,其实JDK中的FileChannel.transferTo()方法底层实现即依赖sendfile()函数。
举个栗子,比如服务端要将本地文件传递给客户端,两种不同的技术流程分别如下:
传统非零拷贝技术
首先要调用read()系统函数将磁盘中的文件复制到内核态的Read Buffer中,在CPU的控制下,再将内核态数据复制到用户态下。然后调用系统函数write()将用户模式下的数据复制到内核模式下的Socket Buffer中。最后将内核态的Socket Buffer中的数据复制到硬件网卡设备中传输。上述过程中,数据白白地从内核态到用户态“浪”了一圈,即2次复制操作,4次内核态、用户态上下文切换。再来看看零拷贝是如何处理的。
零拷贝技术
零拷贝技术使用操作系统支持的DMA ( Direct Memory Access )技术将文件内容复制到内核态下的ReadBuffer 中。数据没有被复制到Socket Buffer。只有包含数据的位置和长度的信息的文件描述符传送到Socket Buffer中。数据直接从内核态传输到网卡外设中,操作系统的内核、用户态上下文切换只有2次,数据复制也减少了。
约定:本文所有的软件都安装在/usr/local/myapp/下,所有对linux的操作建议使用root用户直接干,避免不必要的麻烦!
官网下载linux系统jdk压缩包,本文使用jdk-8u261-linux-x64.tar.gz,下载到本地后上传到linux服务器。
zoo.cfg
文件如下:
#数据目录
dataDir=/usr/local/myapp/zookeeper/zookeeper-3.6.1/dataDir
#单独指定事务日志目录
dataDir=/usr/local/myapp/zookeeper/zookeeper-3.6.1/dataLogDir
#zk对外服务端口
clientPort=2181
#基本时间单位,和时间相关的配置都是该值的倍数;即zk服务器单次心跳间隔时间,单位毫秒
tickTime=2000
#投票选举leader时zk服务器连上leader的时间限制,initLimit*tickTime为总的超时时间
initLimit=10
#zk正常工作时leader和follower之间最多心跳数限制,syncLimit*tickTime为总超时容忍时间,超过此时间,follower将从zk集群中被剔除
syncLimit=5
注意:上述配置中的数据和日志文件路径先要在系统中创建
[root@vm1 conf]# mkdir -p /usr/local/myapp/zookeeper/zookeeper-3.6.1/dataDir
[root@vm1 conf]# mkdir -p /usr/local/myapp/zookeeper/zookeeper-3.6.1/dataLogDir
其它配置文件参数含义参见官网说明
机器规划:3台服务器,1台leader,2台follower,每台机器都在/etc/hosts中配置好域名ip映射
机器主机名 | 机器IP | 机器角色 |
---|---|---|
vm1 | 192.168.174.129 | leader(master) |
vm2 | 192.168.174.131 | follower(slave) |
vm3 | 192.168.174.130 | follower(slave) |
下列所有的安装都可以先在第一台机器进行,然后再拷贝到集群其它机器中修改。
在vm1上安装完jdk后拷贝到vm2、vm3中,vm1上的安装参照单机jdk安装。
[root@vm1 ~]# scp -r /usr/local/myapp/jdk/jdk1.8.0_261/ root@vm2:/usr/local/myapp/jdk/jdk1.8.0_261/ [root@vm1 ~]# scp -r /usr/local/myapp/jdk/jdk1.8.0_261/ root@vm3:/usr/local/myapp/jdk/jdk1.8.0_261/ [root@vm1 ~]# scp /etc/profile root@vm2:/etc/profile [root@vm1 ~]# scp /etc/profile root@vm3:/etc/profile [root@vm2 ~]# source /etc/profile [root@vm3 ~]# source /etc/profile
在每台机器 上执行java -version 查看jdk版本,确保jdk已正确安装
Zookeeper集群原则上需要2n+1个实例才能保证集群有效性,所以集群规模至少是3台。vm1上的安装参照单机zookeeper安装。
在单机安装的基础上增加下列配置:
/conf/zoo.cfg
文件中增加节点信息
#server.A=B:C:D,A表示服务器编号;B是IP;C是该服务器与leader通信端口;D是leader挂掉后重新选举所用通信端口
server.1=192.168.174.129:2888:3888
server.2=192.168.174.131:2888:3888
server.3=192.168.174.130:2888:3888
至此zookeeper集群模式搭建成功,执行 zkServer.sh stop 命令停掉leader可以发现又会重新选举出leader
按照单机kafka安装配置的步骤先在第一台机器vm1上解压、配置kafka。
使用web管理页面或仪表盘管理kafka更加方便日常维护。
下载可能很慢,耐心等待。
解压到本地指定目录
[root@vm1 ~]# tar -zxvf kafka-eagle-bin-2.0.1.tar.gz
[root@vm1 ~]# cd kafka-eagle-bin-2.0.1/ [root@vm1 kafka-eagle-bin-2.0.1]# tar -zxvf kafka-eagle-web-2.0.1-bin.tar.gz -C /usr/local/myapp/ [root@vm1 ~]# mv /usr/local/myapp/kafka-eagle-web-2.0.1/ /usr/local/myapp/kafka-eagle
环境变量配置
export KE_HOME=/usr/local/myapp/kafka-eagle export PATH= P A T H : PATH: PATH:KE_HOME/bin
source /etc/profile
修改Kafka-Eagle配置文件system-config.properties
# zookeeper和kafka集群配置
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.174.129:2181,192.168.174.130:2181,192.168.174.131:2181
######################################
# kafka eagle webui port
# web页面访问端口号
######################################
kafka.eagle.webui.port=8048
######################################
# kafka jdbc driver address
# kafka默认使用Centos自带的sqlite数据库,配置下数据库文件存放路径即可
######################################
kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:/usr/local/myapp/kafka-eagle/db/ke.db
kafka.eagle.username=root
kafka.eagle.password=www.kafka-eagle.org
启动kafka-eagle
[root@vm1 bin]# cd /usr/local/myapp/kafka-eagle/bin
[root@vm1 bin]# ./ke.sh start
[root@vm1 bin]# ./ke.sh status
如果出现错误,请查看日志日志是否出问题
/usr/local/myapp/kafka-eagle/logs
如果出现内存不足导致的错误,可以调小ke.sh中设置的JVM内存占用后再次启动
vim kafka-eagle/bin/ke.sh
export KE_JAVA_OPTS=”-server -Xmx256M -Xms256M -XX:MaxGCPauseMillis=20 -XX:+UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80″
如果没问题,则直接登录
http://192.168.174.129:8048
默认用户名:admin
默认密码:123456
第二种方式是在kafka的bin/kafka-run-class.sh 脚本中加入:JMX_PORT=9966,每个节点依次添加完配置再启动kafka。
WARN [Consumer clientId=consumer-console-consumer-32492-1, groupId=console-consumer-32492] 1 partitions have leader brokers without a matching listener, including [testtopic02-0] (org.apache.kafka.clients.NetworkClient)
重新启动kafka服务又会恢复!
1.brokerId
kafka集群中broker的唯一标识。一般可以从 1开始递增。broker 在启动时会在ZooKeeper 中的/brokers/ids 路径下创建一个以当前brokerId 为名称的虚节点,broker 的健康状态检查就依赖于此虚节点。当broker 下线时,该虚节点会自动删除,其他broker 节点或客户端通过判断/brokers/ids 路径下是否有此broker的brokerld 节点来确定该broker 的健康状态。broker启动后在日志目录log.dirs下会生成meta.properties 文件。如果server.properties和meta.properties配置的broker.id不一致,将启动失败,抛出异常: InconsistentBrokerldException
2.log.dirs
设置Kafka 日志文件存放的根目录,可以以逗号分隔的方式配置多个目录,如果不配置,默认值为:/tmp/kafka-logs
3.listeners
broker对外提供服务的地址,即客户端连接kafka broker的地址。比如listeners=PLAINTEXT://192.168.174.129:9092,客户端就可以使用192.168.174.129:9092这个地址连接kafka,发送或接受消息。还有一个advertised.listeners配置项,用于IaaS环境,比如云服务器通常配备有多块网卡,即包含私网网卡和公网网卡,此时可以设置advertised.listeners 配置项绑定公网IP 供外部客户端使用,而配置listeners 参数来绑定内网IP 地址供broker间通信使用。
4.zookeeper.connect
设置kafka broker 连接的ZooKeeper 集群的服务地址,ZooKeeper 集群中有多个节点时可以使用逗号分隔。
5.message.max.bytes
指定broker 所能接收消息体的最大大小(如果启用压缩的话就是压缩后的最大大小),单位字节。
6.log.retention.hours
日志文件保存时间,默认7天。
更多参数含义参见官方文档:http://kafka.apache.org/documentation/
Kafka Producer发送消息可以采用同步或者异步的方式。
引入Java客户端依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
核心类
KafkaProducer:生产者对象,用来发送数据
ProducerConfig:设置生产者的一系列配置参数
ProducerRecord:每条数据都要封装成一个ProducerRecord 对象
Producer客户端代码
不带回调函数:KafkaProducer#send(ProducerRecord<K,V>)
public class MyProducer1 {
public static void main(String[] args) throws Exception{
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092,vm2:9092,vm3:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 1);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 32);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * 1024 * 1024);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 20; i++) {
producer.send(new ProducerRecord<String, String>("topic_test", Integer.toString(i), "value:" + i));
}
producer.close();
}
}
带回调函数:KafkaProducer#send(ProducerRecord<K,V>, Callback)
public class MyProducer2 {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092,vm2:9092,vm3:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 1);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 32);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * 1024 * 1024);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("topic_test", Integer.toString(i), "value:" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (null == exception) {
System.out.println("send success->" + metadata.offset());
} else {
exception.printStackTrace();
}
}
});
}
producer.close();
}
}
回调函数会在producer收到ack时异步调用,该方法有两个参数:RecordMetadata、Exception。这两个参数是互斥的关系,即如果Exception为null,则消息发送成功,此时RecordMetadata必定不为null。消息发送异常时,RecordMetadata为null,而exception不为null。消息发送失败会自动重试,不需在回调函数中手动重试。重试次数由参数retries设定。
KafkaProducer设定参数retries,如果发送消息到broker时抛出异常,且是允许重试的异常,那么就会最大重试retries参数指定的次数。
同步是指,一条消息发送后会阻塞当前线程,直至返回ack消息。
有上面异步的例子可以看出,producer的send方法返回对象是Future类型,因此可以通过调用Future对象的get()方法触发同步等待。
因此与异步代码区别只有一处:
producer.send(new ProducerRecord<String, String>("topic_test", Integer.toString(i), "value:" + i)).get();
使用分区提高了topic的处理性能,提高了topic的并发性。我们可以将kafka 生产者发送的数据封装成ProducerRecord
对象。ProducerRecord
对象中有partition
属性。每条ProducerRecord
会被发送到特定的partition
中。
在这里插入图片描述
生产者发送消息时的分区原则如下:
(1)构造器中指明 partition 时,直接将指定值作为 partiton值; (2)没有指明partition值但有key的情况下,将key的hash值与topic 的 partition 数进行取余得到 partition 值; (3)既没有指定 partition 值又没有传入 key 值时,第一次调用时随机生成1个整数(后面每次调用在这个整数上自增),并与这个topic的partition数取模得到partition值,即采用round-robin 算法。
kafka采用ack机制保证数据可靠性。topic的每个partition所在的broker收到producer发送的数据后会向producer发送ack消息。如果producer没有收到ack,将会触发重试机制。那么kafka何时给生产者返回ack响应呢?在partition的副本中,有leader和follower之分,如果需要所有的follower都同步完成才发ack,这时因为某种故障,某个follower迟迟不能与leader进行同步,那么就会一直等下去,直至其同步完成才会发送ack,这样势必影响kafka性能。kafka是如何优化这个问题的呢?
在kafka中,一个主题的分区中所有的副本集合称之为AR(Assigned Replicas)。leader节点维护了一个动态的集合,即in-sync replica set (ISR),指的是和leader保持一定程度同步的follower集合(包括leader节点在内)。与leader副本同步滞后过多的副本组成OSR(Out-of-Sync Replicas)集合,因此AR=ISR+OSR。正常情况下所有的follower都应该和leader保持一定程度的同步,即AR=ISR。
kafka的做法是根据用户配置的ack级别来确认何时返回客户端ack消息。
Acks参数用来指定分区中有多少副本收到这条消息之后生产者才能确认这条消息是写入成功的,有3种级别的配置。
acks=1,即默认配置。生产者发送消息后,只要分区的leader副本成功写入消息,就会收到服务器的ack成功响应。如果消息写入leader失败,比如leader已经挂了,正在重新选举,此时生产者客户端会收到错误响应,可以进行重发。如果写入leader成功、follower同步完成之前leader挂了,将会出现消息丢失。所以acks=1是Kafka消息可靠性和吞吐量之间的折中方案,一般也是默认的配置。
acks=0,生产者发送消息之后无需等待任何服务端的响应。如果在消息从发送到写入Kafka 的过程中出现某些异常,导致Kafka 并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks=0 可以使Kafka达到最大的吞吐量。
acks=-1,和acks=all效果一样。生产者在消息发送之后,需要等待ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的ack响应。在其他配置环境相同的情况下,acks 设置为-1 可以达到最高的数据可靠性。但是如果在follower 同步完成后,broker给生产者客户端发送ack之前,leader发生故障,将会造成数据重复。
将acks级别设置为-1,可以保证producer和kafka server之间不丢失数据,即保证At least once 语义。
对应的,acks级别设置为0,可以保证每条消息只会发送1次,不会重复,即At most once语义。
有些场景下,数据的消费者要求数据既不丢失也不重复,即Exactly Once语义。0.11版本的 Kafka,引入了一项重大特性:幂等性。即不论producer向server发送多少次重复数据,server端只会持久化1条。将producer中的enable.idompotence参数设为true即可开启幂等性。开启幂等性的Producer在初始化的时候会分配一个pid,发往同一个Partition的消息会附带sequence number。Kafka服务端,即broker端会对<pid,partition,seqnumber>做缓存,当具有相同键的消息提交时,broker只会持久化一条。但是PID重启就会变化,并且不同的Partition具有不同的键,所以kafka的幂等性无法保证跨分区、跨会话的Exactly Once。
注意,Kafka的幂等性只是Kafka自身的一种机制,无法保证业务层的幂等。通常我们需要自行实现业务侧的幂等控制。
生产者客户端通过实现接口org.apache.kafka.clients.producer.ProducerInterceptor生成一个生产者拦截器。
Kafka Producer会在消息序列化和计算分区之前调用拦截器的onSend()方法,用户可以在此方法中进行消息发送前的业务定制。一般不修改ProducerRecord的topic、key、partition等信息。Kafka Producer会在消息被应答(ack)之前或者消息发送失败时调用拦截器的 onAcknowledgement()方法,此方法在用户设置的异步CallBack()方法之前执行。onAcknowledgement方法的业务逻辑越简单越好,否则会影响发送性能,因为该方法运行在Producer的I/O线程中。
拦截器案例:定义2个拦截器,并使用它们
/** * 拦截器案例-第1个拦截器在消息体前增加时间戳信息 */
public class ProducerInterceptor1 implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 在消息体前加上时间戳
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(),
System.currentTimeMillis() + "_" + record.value(), record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
/** * 拦截器案例-第2个拦截器统计发送成功和失败的消息 */
public class ProducerInterceptor2 implements ProducerInterceptor<String, String> {
private AtomicInteger successCounter = new AtomicInteger();
private AtomicInteger failCounter = new AtomicInteger();
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
successCounter.getAndIncrement();
} else {
failCounter.getAndIncrement();
}
}
@Override
public void close() {
System.out.println("successCounter:" + successCounter.get());
System.out.println("failCounter:" + failCounter.get());
System.out.println("发送成功率:" + successCounter.get() / (failCounter.get() + successCounter.get()));
}
@Override
public void configure(Map<String, ?> configs) {
}
}
/** * 使用自定义生产者拦截器 * Kafka Producer会在消息序列化和计算分区之前调用拦截器的onSend方法,可以在此方法中进行业务定制。一般不修改ProducerRecord的topic、key、partition等信息 * Kafka Producer会在消息被应答(ack)之前或者消息发送失败时调用拦截器的 onAcknowledgement 方法,此方法在用户设置的异步CallBack方法之前执行。 * onAcknowledgement方法的业务逻辑越简单越好,否则会影响发送性能,因为该方法运行在Producer的I/O线程中 */
public class ProducerWithInterceptor {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092,vm2:9092,vm3:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * 1024 * 1024);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//构建拦截器链
List<String> interceptors = new ArrayList<>();
interceptors.add(ProducerInterceptor1.class.getName());
interceptors.add(ProducerInterceptor2.class.getName());
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("topic_test", "hello " + i);
kafkaProducer.send(record);
}
//关闭producer,才能触发Interceptor的close方法
kafkaProducer.close();
}
}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
核心类:
KafkaConsumer:消费者对象,用来消费数据
ConsumerConfig:设置消费者的一系列配置参数
KafkaConsumer#poll(Duration):消费者客户端核心方法,即从服务端拉取(poll)消息。后面章节将展示不同位移(offset)提交方式下如何编写消费者客户端API代码。
消费者客户端可以指定消费某个主题的特定分区,KafkaConsumer
中的assign(Collection<TopicPartition> partitions)
方法可以指定需要订阅的分区集合。也可以在配置文件中配置partition.assignment.strategy配置项指定自定义分区策略。
默认不指定的情况下,kafka使用内置的分配策略。即Range(默认)和Robin。当然还有一种更为复杂的StickyAssignor分配策略。
自动提交offset
自动位移提交(commit)的动作是在poll()方法里完成的,每次向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。自动提交消费位移的方式非常简便,免去了复杂的位移提交逻辑,使得应用层代码非常简洁。如果在下一次自动提交消费位移之前,消费者宕机了,那么又得从上一次位移提交的地方重新开始消费,这将导致重复消费。可以减小位移提交的时间间隔来减小消息重复的时间窗口,但是这会使移提交更加频繁。
代码:
/** * 自动提交offset */
public class MyConsumer1 {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092,vm2:9092,vm3:9092");
//group.id相同的属于同一个消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//自动提交offset,每1s提交一次
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("topic_test"));
//消费者启动死循环不断消费
while (true) {
//一旦拉取到数据就返回,否则最多等待duration设定的时间
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
});
}
}
}
手动提交offset
/** * 手动同步提交offset */
public class MyConsumer2 {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092,vm2:9092,vm3:9092");
//group.id相同的属于同一个消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
//关闭自动提交offset,手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("topic_test"));
//消费者启动死循环不断消费
while (true) {
//一旦拉取到数据就返回,否则最多等待duration设定的时间
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
});
//同步提交,线程会阻塞,直到当前批次offset提交成功
kafkaConsumer.commitSync();
}
}
}
/** * 手动异步提交offset */
public class MyConsumer3 {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092,vm2:9092,vm3:9092");
//group.id相同的属于同一个消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
//关闭自动提交offset,手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("topic_test"));
//消费者启动死循环不断消费
while (true) {
//一旦拉取到数据就返回,否则最多等待duration设定的时间
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
});
//异步提交,可以带回调函数,线程不会阻塞
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.out.println("提交失败:" + offsets);
}
}
});
}
}
}
自定义offset
在Kafka中,offset默认存储在broker的内置Topic中,我们可以自定义存储位置。比如为了保证消费和提交偏移量同时成功或失败,我们可以利用数据库事务来实现,把offset存储在Mysql即可。下面的例子仅为示例代码,其中getOffset和commitOffset方法可以根据所选的offset存储系统(比如mysql)自行实现。
/** * 自定义offset提交 * 在Kafka中,offset默认存储在broker的内置Topic中,我们可以自定义存储位置 * 比如为了保证消费和提交偏移量同时成功或失败,我们可以利用数据库事务来实现,把offset存储在Mysql即可 * 下面的例子仅为示例代码,其中getOffset和commitOffset方法可以根据所选的offset存储系统(比如mysql)自行实现 */
public class MyConsumer4 {
public static Map<TopicPartition, Long> currentOffset = new HashMap<>();
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092,vm2:9092,vm3:9092");
//group.id相同的属于同一个消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
//关闭自动提交offset,手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("topic_test"), new ConsumerRebalanceListener() {
//该方法会在Rebalanced之前调用
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitOffset(currentOffset);
}
//该方法会在Rebalanced之后调用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
currentOffset.clear();
for (TopicPartition partition : partitions) {
//定位到每个分区最近提交的offset位置继续消费
kafkaConsumer.seek(partition, getOffset(partition));
}
}
});
//消费者启动死循环不断消费
while (true) {
//一旦拉取到数据就返回,否则最多等待duration设定的时间
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
});
//提交offset
commitOffset(currentOffset);
}
}
/** * 获取某分区最新的offset * * @param partition * @return */
private static long getOffset(TopicPartition partition) {
return 0;
}
/** * 提交该消费者所有分区的offset * * @param currentOffset */
private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
}
}
Kafka consumer在poll()方法返回之前会先调用拦截器的onConsume()方法,可以在此方法里预先对消息进行定制化操作。Kafka consumer在提交完消费位移之后会调用拦截器的onCommit()方法。
拦截器案例:定义1个消费者拦截器并使用
/** * 自定义消费者拦截器 * Kafka consumer在poll()方法返回之前会先调用拦截器的onConsume方法,可以在此方法里预先对消息进行定制化操作 * Kafka consumer在提交完消费位移之后会调用拦截器的onCommit方法, * 本拦截器功能: * 对消息的时间戳进行判断,过滤掉不满足时效(过期)的消息 * 消费完成后打印位移信息 */
public class ConsumerInterceptor1 implements ConsumerInterceptor<String, String> {
private static final long EXPIRE_INTERVAL = 10000;
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
HashMap<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>(64);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> recordsInPartition = records.records(partition);
List<ConsumerRecord<String, String>> filteredRecords = new ArrayList<>();
for (ConsumerRecord<String, String> record : recordsInPartition) {
if (System.currentTimeMillis() - record.timestamp() < EXPIRE_INTERVAL) {
filteredRecords.add(record);
}
}
if (!filteredRecords.isEmpty()) {
newRecords.put(partition, filteredRecords);
}
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp, offset) -> {
System.out.println(tp + ":" + offset.offset());
});
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
/** * 使用自定义消费者拦截器 */
public class ConsumerWithInterceptor1 {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092");
//group.id相同的属于同一个消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//自动提交offset,每1s提交一次
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//使用自定义消费者拦截器
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor1.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("topic_test"));
//消费者启动死循环不断消费
while (true) {
//一旦拉取到数据就返回,否则最多等待duration设定的时间
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, key = %d, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.timestamp(), record.value());
});
}
}
}
/** * 测试ConsumerWithInterceptor中时间戳过滤是否生效 * 可以在发送消息时手动修改Producer Record的timestamp */
public class Producer4ConsumerWithInterceptor1 {
private static final long EXPIRE_INTERVAL = 10000;
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "vm1:9092,vm2:9092,vm3:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * 1024 * 1024);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record;
if (i % 2 == 0) {
record = new ProducerRecord<>("topic_test", "hello " + i);
} else {
//奇数序号消息时间戳向前调整,测试消费者拦截器是否能过滤掉
record = new ProducerRecord<>("topic_test", null, System.currentTimeMillis() - EXPIRE_INTERVAL, null, "hello " + i);
}
kafkaProducer.send(record);
}
//关闭producer,才能触发Interceptor的close方法
kafkaProducer.close();
}
}
详细内容请订阅专栏教程《rabbitmq/kafka实战教程》https://blog.csdn.net/zpcandzhj/category_10152842.html
详细内容请订阅专栏教程《rabbitmq/kafka实战教程》https://blog.csdn.net/zpcandzhj/category_10152842.html
完整内容请订阅专栏教程《rabbitmq/kafka实战教程》https://blog.csdn.net/zpcandzhj/category_10152842.html
欢迎关注公众号【程猿薇茑】获取各种教程。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/181192.html原文链接:https://javaforall.cn