消息队列,英文名:Message Queue,经常缩写为MQ。从字面上来理解,消息队列是一种用来存储消息的队列 。来看一下下面的代码
// 1.创建一个保存字符串的队列
Queue<String> queue = new LinkedList<>();
// 2. 往消息队列中放入消息
queue.offer("hello");
// 3. 从消息队列中取出消息把那个打印
System.out.println(queue.poll());
上述代码,创建了一个队列,先往队列中添加了一个消息,然后又从队列中取出了一个消息。这说明了队列是可以用来存取消息的
「小结」: 「消息队列指的就是将数据放置到一个队列中, 从队列一端进入, 然后从另一端流出的过程」
消息队列在实际应用中包括如下四个场景:
「1、应用耦合:」
多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;
「2、异步处理:」
多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;
「3、 限流削峰:」
广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;
「4、消息驱动的系统:」
系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理 下面详细介绍上述四个场景以及消息队列如何在上述四个场景中使用
「具体场景」:用户为了使用某个应用,进行注册,系统需要发送注册邮件并验证短信。对这两个操作的处理方式有两种:串行及并行。1 ) 串行方式: 新注册信息生成后 , 先发送注册邮件, 再发送验证短信 注意 : 在这种方式下,需要最终发送验证短信后再返回给客户端
2) 并行处理:新注册信息写入后,由发短信和发邮件并行处理
「注意」: 在这种方式下,发短信和发邮件 需处理完成后再返回给客户端。
假设以上三个子系统处理的时间均为 50ms ,且不考虑网络延迟,则总的处理时间:串行:50+50+50=150ms 并行:50+50 = 100ms 如果引入消息队列 , 在来看整体的执行效率 :
在写入消息队列后立即返回成功给客户端,则总的响应时间依赖于写入消息队列的时间,而写入消息队列的时间本身是可以很快的,基本可以忽略不计,因此总的处理时间相比串行提高了2倍,相比并行提高了一倍;
「具体场景:」 用户使用 QQ 相册上传一张图片,人脸识别系统会对该图片进行人脸识别,一般的做法是,服务器接收到图片后,图片上传系统立即调用人脸识别系统,调用完成后再返回成功,如下图所示: 如果引入消息队列 , 在来看整体的执行效率
该方法有如下缺点:
此时图片上传系统并不需要关心人脸识别系统是否对这些图片信息的处理、以及何时对这些图片信息进行处理。事实上,由于用户并不需要立即知道人脸识别结果,人脸识别系统可以选择不同的调度策略,按照闲时、忙时、正常时 间,对队列中的图片信息进行处理。
「具体场景:」 购物网站开展秒杀活动,一般由于瞬时访问量过大,服务器接收过大,会导致流量暴增,相关系统无法处理请求甚至崩溃。而加入消息队列后,系统可以从消息队列中取数据,相当于消息队列做了一次缓冲。
该方法有如下优点:请求先入消息队列,而不是由业务处理系统直接处理,做了一次缓冲 , 极大地减少了业务处理系统的压力;队列长度可以做限制,事实上,秒杀时,后入队列的用户无法秒杀到商品,这些请求可以直接被抛弃,返回活动已结束或商品已售完信息;
「具体场景:」 用户新上传了一批照片, 人脸识别系统需要对这个用户的所有照片进行聚类,聚类完成后由对账系统重新生成用 户的人脸索引( 加快查询 ) 。这三个子系统间由消息队列连接起来,前一个阶段的处理结果放入队列中,后一个阶段从队列中获取消息继续处理。
该方法有如下优点:避免了直接调用下一个系统导致当前系统失败;每个子系统对于消息的处理方式可以更为灵活,可以选择收到消息时就处理,可以选择定时处理,也可以划分时间 段按不同处理速度处理;
点对点模式下包括三个角色
消息发送者生产消息发送到 queue 中,然后消息接收者从 queue 中取出并且消费消息。消息被消费以后, queue 中不再有存储,所以消息接收者不可能消费到已经被消费的消息。
「点对点模式特点:」
发布 / 订阅模式下包括三个角色:
发布者将消息发送到 Topic, 系统将这些消息传递给多个订阅者。发布 / 订阅模式特点:
各种消息队列产品的对比图:
官网:「http://kafka.apache.org/」 kafka 是最初由 linkedin 公司开发的,使用 scala 语言编写, kafka 是一个分布式,分区的,多副本的,多订阅者的日 志系统(分布式MQ 系统),可以用于搜索日志,监控日志,访问日志等 Kafka is a distributed,partitioned,replicated commit logservice 。它提供了类似于 JMS 的特性,但是在设计实现上完全不同,此外它并不是JMS 规范的完整实现。kafka 对消息保存时根据 Topic 进行归类,发送消息者成为 Producer, 消息 接受者成为Consumer, 此外 kafka 集群有多个 kafka 实例组成,每个实例 (server) 成为 broker 。无论是 kafka 集群,还是producer和 consumer 都依赖于 zookeeper 来保证系统可用性集群保存一些 meta 信息
「kakfa的特点:」
apache kafka 是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使能够将消息从一个 端点传递到另一个端点,kafka 适合离线和在线消息消费。kafka 消息保留在磁盘上,并在集群内复制以防止数据丢失。kafka构建在 zookeeper 同步服务之上。它与 apache 和 spark 非常好的集成,应用于实时流式数据分析。
「kafka的主要应用场景:」
「版本说明:」Kafka版本为2.4.1,是2020年3月12日发布的版本。可以注意到Kafka的版本号为:kafka_2.12-2.4.1,因为kafka主要是使用scala语言开发的,2.12为scala的版本号。
「小结:」
「专业术语」
cd /export/software/
tar -xvzf kafka_2.12-2.4.1.tgz -C ../server/
cd /export/server/kafka_2.12-2.4.1/
cd /export/server/kafka_2.12-2.4.1/config
vim server.properties
# 指定broker的id
broker.id=0
# 指定 kafka的绑定监听的地址
listeners=PLAINTEXT://node1:9092
# 指定Kafka数据的位置
log.dirs=/export/server/kafka_2.12-2.4.1/data
# 配置zk的三个节点
zookeeper.connect=node1:2181,node2:2181,node3:2181
cd /export/server
scp -r kafka_2.12-2.4.1/ node2:$PWD
scp -r kafka_2.12-2.4.1/ node3:$PWD
修改另外两个节点的broker.id分别为1和2
---------node2--------------
cd /export/server/kafka_2.12-2.4.1/config
vim server.properties
broker.id=1
listeners=PLAINTEXT://node2:9092
--------node3--------------
cd /export/server/kafka_2.12-2.4.1/config
vim server.properties
broker.id=2
listeners=PLAINTEXT://node3:9092
# 启动ZooKeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动Kafka
cd /export/server/kafka_2.12-2.4.1
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
# 测试Kafka集群是否启动成功 :
使用 jps 查看各个节点 是否出现有kafka
为了方便将来进行一键启动、关闭Kafka,我们可以编写一个shell脚本来操作。将来只要执行一次该脚本就可以快速启动/关闭Kafka。
cd /export/onekey
node1
node2
node3
vim start-kafka.sh
cat /export/onekey/slave | while read line
do
{
echo $line
ssh $line "source /etc/profile;export JMX_PORT=9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 & "
}&
wait
done
vim stop-kafka.sh
cat /export/onekey/slave | while read line
do
{
echo $line
ssh $line "source /etc/profile;jps |grep Kafka |cut -d' ' -f1 |xargs kill -s 9"
}&
wait
done
chmod u+x start-kafka.sh
chmod u+x stop-kafka.sh
./start-kafka.sh
./stop-kafka.sh
创建一个topic(主题)。Kafka中所有的消息都是保存在主题中,要生产消息到Kafka,首先必须要有一个确定的主题。
# 创建名为test的主题
bin/kafka-topics.sh --create --bootstrap-server node1:9092 --topic test
# 查看目前Kafka中的主题
bin/kafka-topics.sh --list --bootstrap-server node1:9092
使用Kafka内置的测试程序,生产一些消息到Kafka的test主题中。
bin/kafka-console-producer.sh --broker-list node1:9092 --topic test
使用下面的命令来消费 test 主题中的消息。
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test
查看 kafka 当中存在的主题
bin/kafka-topics.sh --list --zookeeper node1:2181,node2:2181,node3:2181
运行 describe 查看 topic 的相关详细信息
[root@node01 bin]# ./kafka-topics.sh --describe --zookeeper node01:2181 --topic demo
Topic:demo PartitionCount:3 ReplicationFactor:1 Configs:
Topic: demo Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: demo Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: demo Partition: 2 Leader: 2 Replicas: 2 Isr: 2
任意 kafka 服务器执行以下命令可以增加 topic 分区数
bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8
目前删除 topic 在默认情况下知识打上一个删除的标记,在重新启动 kafka 后才删除。如果需要立即删除,则需要 在server.properties 中配置:
delete.topic.enable=true
然后执行以下命令进行删除 topic
bin/kafka-topics.sh --zookeeper zkhost:port --delete --topic topicName
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.1.0</version>
</dependency>
编写Java程序,将1-100的数字消息写入到Kafka中
生产者代码1: 使用默认异步发生数据方式, 不含回调函数
package com.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
// kafka的生产者的代码:
public class KafkaProducerTest {
public static void main(String[] args) {
//1.1: 构建生产者的配置信息:
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
props.put("acks", "all"); // 消息确认机制: all表示 必须等待kafka端所有的副本全部接受到数据 确保数据不丢失
// 说明: 在数据发送的时候, 可以发送键值对的, 此处是用来定义k v的序列化的类型
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//1. 创建 kafka的生产者对象: KafkaProducer
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//2. 执行数据的发送
for (int i = 0; i < 100; i++) {
// producerRecord对象: 生产者的数据承载对象
ProducerRecord<String, String> producerRecord =
new ProducerRecord<String, String>("dsjlg", Integer.toString(i));
producer.send(producerRecord);
}
//3. 释放资源
producer.close();
}
}
「生产者的代码2: 同步发送操作」
package com.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
// kafka的生产者的代码:
public class KafkaProducerTest2 {
@SuppressWarnings("all")
public static void main(String[] args) {
//1.1: 构建生产者的配置信息:
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("acks", "all"); // 消息确认机制: all表示 必须等待kafka端所有的副本全部接受到数据 确保数据不丢失
// 说明: 在数据发送的时候, 可以发送键值对的, 此处是用来定义k v的序列化的类型
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//1. 创建 kafka的生产者对象: KafkaProducer
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//2. 执行数据的发送
for (int i = 0; i < 100; i++) {
// producerRecord对象: 生产者的数据承载对象
ProducerRecord<String, String> producerRecord =
new ProducerRecord<String, String>("dsjlg", Integer.toString(i));
try {
producer.send(producerRecord).get(); // get方法, 表示是同步发送数据的方式
} catch (Exception e) {
// 如果发生操作, 出现了异常, 认为, 数据发生失败了 ....
e.printStackTrace();
}
}
//3. 释放资源
producer.close();
}
}
「生产者代码3: 异步发生数据, 带有回调函数操作」
package com.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
// kafka的生产者的代码:
public class KafkaProducerTest {
public static void main(String[] args) {
//1.1: 构建生产者的配置信息:
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("acks", "all"); // 消息确认机制: all表示 必须等待kafka端所有的副本全部接受到数据 确保数据不丢失
// 说明: 在数据发送的时候, 可以发送键值对的, 此处是用来定义k v的序列化的类型
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//1. 创建 kafka的生产者对象: KafkaProducer
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//2. 执行数据的发送
for (int i = 0; i < 100; i++) {
// producerRecord对象: 生产者的数据承载对象
ProducerRecord<String, String> producerRecord =
new ProducerRecord<String, String>("dsjlg", Integer.toString(i));
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
// 在参数2中, 表示发生的状态异常, 如果 异常为null 表示数据以及发送成功, 如果不为null, 表示数据没有发送成功
if(exception != null){
// 执行数据发生失败的后措施...
}
}
}); // 异步发送方式
}
//3. 释放资源
producer.close();
}
}
「消费者代码1: 自动提交偏移量数据」
package com.it.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
// kafka的消费者的代码
public class KafkaConsumerTest {
public static void main(String[] args) {
//1.1: 指定消费者的配置信息
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.setProperty("group.id", "test"); // 消费者组的名称
props.setProperty("enable.auto.commit", "true"); // 消费者自定提交消费偏移量信息给kafka
props.setProperty("auto.commit.interval.ms", "1000"); // 每次自动提交偏移量时间间隔 1s一次
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//1. 创建kafka的消费者核心类对象: KafkaConsumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//2. 让当前这个消费, 去监听那个topic?
consumer.subscribe(Arrays.asList("dsjlg")); // 一个消费者 可以同时监听多个topic的操作
while (true) { // 一致监听
//3. 从topic中 获取数据操作: 参数表示意思, 如果队列中没有数据, 最长等待多长时间
// 如果超时后, topic中依然没有数据, 此时返回空的 records(空对象)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
//4. 遍历ConsumerRecords, 从中获取消息数据
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
System.out.println("接收到消息为:"+value);
}
}
}
}
「消费者代码2: 手动提交偏移量数据」
package com.it.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
// kafka的消费者的代码
public class KafkaConsumerTest2 {
public static void main(String[] args) {
//1.1 定义消费者的配置信息
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.setProperty("group.id", "test01"); // 消费者组的名称
props.setProperty("enable.auto.commit", "false"); // 消费者自定提交消费偏移量信息给kafka
//props.setProperty("auto.commit.interval.ms", "1000"); // 每次自动提交偏移量时间间隔 1s一次
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//1. 创建消费者的核心类对象
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
//2. 指定要监听的topic
consumer.subscribe(Arrays.asList("dsjlg"));
//3. 获取数据
while(true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
String value = consumerRecord.value();
// 执行消费数据操作
System.out.println("数据为:"+ value);
// 当执行完成后, 认为消息已经消费完成
consumer.commitAsync(); // 手动提交偏移量信息
}
}
}
}
主要解决了单台服务器存储容量有限的问题 当数据量非常大的时候,一个服务器存放不了,就将数据分成两个或者多个部分,存放在多台服务器上。每个服 务器上的数据,叫做一个分片
副本备份机制解决了 「数据存储的高可用」 问题 当数据只保存一份的时候,有丢失的风险。为了更好的容错和容灾,将数据拷贝几份,保存到不同的机器上。
分片: 解决单台节点存储容量有限的问题, 通过分片进行分布式存储方案 副本: 保证数据不丢失, 提升数据可用性
1) 消息生产分为同步模式和异步模式 2) 消息确认分为三个状态
3) 在同步模式下
4) 在异步模式下
如果broker迟迟不给ack,而Buffer又满了。开发者可以设置是否直接清空Buffer中的数据。
「broker端:」
「消费端:」 通过offset commit 来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,会接着上次的offset进行消费。 而offset的信息在kafka0.8版本之前保存在zookeeper中,在0.8版本之后保存到topic中,即使消费者在运行过程中挂掉了,再次启动的时候会找到offset的值,找到之前消费消息的位置,接着消费,由于offset的信息写入的时候并不是每条消息消费完成后都写入的,所以这种情况有可能会造成重复消费,但是不会丢失消息。
「生产者端」
「broker端」
broker端主要是通过数据的副本和 ack为-1 来保证数据不丢失操作
「消费端」
消息发送端发送消息到 broker 上以后,消息是如何持久化的?
「数据分片」 kafka 使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个 offset 值来表示它在分区中的偏移量。Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,一个分片 并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是<topic_name>_<partition_id>
。 比如创建一个名为firstTopic的topic,其中有3个partition,那么在 kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录,firstTopic-0~3 多个分区在集群中多个broker上的分配方法
「log分段:」
每个分片目录中,kafka 通过分段的方式将 数据 分为多个 LogSegment,一个 LogSegment 对应磁盘上的一个日志文件(00000000000000000000.log)和一个索引文件(如上:00000000000000000000.index),其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。每个LogSegment 的大小可以在server.properties 中log.segment.bytes=107370 (设置分段大小,默认是1gb)选项进行设置。
segment 的 index file 和 data file 2 个文件一一对应,成对出现,后缀".index"和“.log”分别表示为 segment 索引文件、数据文件.命名规则:partion 全局的第一个 segment从 0 开始,后续每个 segment 文件名为上一个 segment文件最后一条消息的 offset 值进行递增。数值最大为 64 位long 大小,20 位数字字符长度,没有数字用 0 填充 第一个 log 文件的最后一个 offset 为:5376,所以下一个segment 的文件命名为: 0000000000000005376.log。对应的 index 为 00000000000000005376.index kafka 这种分片和分段策略,避免了数据量过大时,数据文件文件无限扩张带来的隐患,更有助于消息文件的维护以及被消费的消息的清理。
可以通过kafka自带这条命令可以看到 kafka 消息日志的内容
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log
输出结果为:
offset: 5376 position: 102124 CreateTime: 1531477349287isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5376
可以看到一条消息,会包含很多的字段,如下:
offset: 5371 position: 102124 CreateTime: 1531477349286isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5371
各字段的意义:
offset:记录号 ;
position:偏移量;
createTime 创建时间、
keysize 和 valuesize 表示 key 和 value 的大小
compresscodec 表示压缩编码
payload:表示消息的具体内容
为了提高查找消息的性能,kafka为每一个日志文件添加 了2 个索引文件:OffsetIndex
和 TimeIndex
,分别对应*.index以及*.timeindex, *.TimeIndex 是映射时间戳和相对 offset的文件
「查 看 索 引 内 容 命令:」
sh kafka-run-class.shkafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.index --print-data-log
「索引文件和日志文件内容关系如下」
如上图所示,index 文件中存储了索引以及物理偏移量。log 文件存储了消息的内容。索引文件中保存了部分offset和偏移量position的对应关系。比如 index文件中 [4053,80899],表示在 log 文件中,对应的是第 4053 条记录,物理偏移量(position)为 80899.
比如说,我们要查找 offset=2490 这条消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]这个索引,再到 log 文件中,根据 49111 这个 position 开始查找,比较每条消息的 offset 是否大于等于 2490。最后查找到对应的消息以后返回。
日志的清理策略有两个
「日志压缩策略」 Kafka 还提供了“日志压缩(Log Compaction)”功能,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的 key 和 value 的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心 key 对应的最新的 value。因此,我们可以开启 kafka 的日志压缩功能,服务端会在后台启动Cleaner线程池,定期将相同的key进行合并,只保留最新的 value 值。日志的压缩原理如下图:
「顺序写」 我们现在大部分企业仍然用的是机械结构的磁盘,如果把消息以随机的方式写入到磁盘,那么磁盘首先要做的就是寻址,也就是定位到数据所在的物理地址,在磁盘上就要找到对应的柱面、磁头以及对应的扇区;这个过程相对内存来说会消耗大量时间,为了规避随机读写带来的时间消耗,kafka 采用顺序写的方式存储数据。
「零拷贝」 即使采用顺序写,但是频繁的 I/O 操作仍然会造成磁盘的性能瓶颈,所以 kafka还有一个性能策略:零拷贝 消息从发送到落地保存,broker 维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过 socket 发送给消费者。虽然这个操作描述起来很简单,但实际上经历了很多步骤。如下:
这个过程涉及到 4 次上下文切换以及 4 次数据复制,并且有两次复制操作是由 CPU 完成。但是这个过程中,数据完全没有 进行变化,仅仅是从磁盘复制到网卡缓冲区。通过“零拷贝”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。现代的 unix 操作系统提供一个优化的代码路径,用于将数据从页缓存传输到 socket;在 Linux 中,是通过 sendfile 系统调用来完成的。Java 提供了访问这个系统调用的方法:FileChannel.transferTo API。使用 sendfile,只需要一次拷贝就行,允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的
Kafka生产者在分发数据时(多分区),一般是怎么发送数据呢?要想得到答案,我们不妨通过源码找到,Kafka默认使用DefaultPartitioner.class的分发策略,下面为源码的注释,让我们一起来解读一下:
/**
The default partitioning strategy:
<ul>
<li>If a partition is specified in the record, use it
<li>If no partition is specified but a key is present choose a partition based on a hash of the key
<li>If no partition or key is present choose a partition in a round-robin fashion
*/
其大致意思就是,如果消息指定了分区号,就按指定的分区号;如果没有指定分区号,就取key的哈希值对应的分区;如果既没有指定分区号,也没有key值,就采用轮询的方式,例如有两个个分区,上一条数据分发到了第一个分区,这条数据就会分发到第二个分区,下一条又分发给第一个分区…就这样重复轮询。
Kafka为我们提供了以下策略:
//可根据主题和内容发送
public ProducerRecord(String topic, V value)
//根据主题,key、内容发送
public ProducerRecord(String topic, K key, V value)
//根据主题、分区、key、内容发送
public ProducerRecord(String topic, Integer partition, K key, V value)
//根据主题、分区、时间戳、key,内容发送
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
除上述以外,我们还可以自定义分区,只要实现Partitioner接口的方法即可,使用自定义分区策略之前,需要设置partition.class属性为自定义分区策略的全路径类名。
public class MyPartition implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
Kafka消费者消费数据的速度是非常快的,但如果由于处理Kafka消息时,由于有一些外部IO、或者是产生网络拥堵,就会造成Kafka中的数据积压(或称为数据堆积)。如果数据一直积压,会导致数据出来的实时性受到较大影响。
「出现积压的原因:」
「解决方案:」
END