Kafka的Producer、Broker和Consumer之间采用的是一套自行设计的基于TCP层的协议,Kafka的这套协议完全是为了Kafka自身的业务需求而定制的;
Kafka系统采用的是Reactor多线程模型,即通过一个Acceptor线程处理所有的新连接,通过多个Processor线程对请求进行处理(比如解析协议、封装请求、、转发等);
kafka老的版本中,以NIO作为网络通信的基础,通过将多个Socket连接注册到一个Selector上进行监听,只用一个线程就能管理多个连接,这极大的节省了多线程的资源开销;
在Kafka之后的新版本中,依然以NIO作为网络通信的基础,也使用了Reactor多线程模型,不同的是,新版本将具体的业务处理模块(Handler模块)独立出去了,并用单独的线程池进行控制;
新增Handler模块优点在于:
Kafka中两个角色之间通信的基本单位是Request/Response:
RequestOrResponse => MessageSize (RequestMessage | ResponseMessage) | ||
---|---|---|
名称 | 类型 | 描述 |
MessageSize | int32 | 表示RequestMessage或者ResponseMessage的长度; |
RequestMessage/ResponseMessage | - | 表示Request或者Response的内容,在下面将会介绍其具体格式; |
定义了通信双方交换数据的基本结构,通信的过程能够简单地表示为:客户端打开与服务器端的Socket,而后往Socket写入一个int32的数字表示此次发送的Request有多少字节,而后继续往Socket中写入对应字节数的数据。服务器端先读出一个int32的整数从而获取此次Request的大小,而后读取对应字节数的数据从而获得Request的具体内容。服务器端处理了请求后,也用一样的方式来发送响应; |
RequestMessage => ApiKey ApiVersion CorrelationId ClientId Request | ||
---|---|---|
名称 | 类型 | 描述 |
ApiKey | int16 | 表示此次请求的API编号 |
ApiVersion | int16 | 表示请求的API的版本,有了版本后就能够作到后向兼容 |
CorrelationId | int32 | 由客户端指定的一个数字惟一标示此次请求的id,服务器端在处理完请求后也会把一样的CorrelationId写到Response中,这样客户端就能把某个请求和响应对应起来了 |
ClientId | string | 客户端指定的用来描述客户端的字符串,会被用来记录日志和监控,它惟一标示一个客户端 |
Request | - | Request的具体内容 |
ResponseMessage => CorrelationId Response | ||
---|---|---|
名称 | 类型 | 描述 |
CorrelationId | int32 | 对应Request的CorrelationId |
Response | - | 对应Request的Response,不一样的Request的Response的字段是不同的 |
Producer生产消息并推送(Push)给Broker,而后Consumer再从Broker那里取走(Pull)消息。Producer生产的消息就是由Message来表示的,对用户来说它就是键-值对;
Message => Crc MagicByte Attributes Key Value | ||
---|---|---|
名称 | 类型 | 描述 |
CRC | int32 | 表示这条消息(不包括CRC字段自己)的校验码 |
MagicByte | int8 | 表示消息格式的版本,用来作后向兼容,目前值为0 |
Attributes | int8 | 表示这条消息的元数据,目前最低两位用来表示压缩格式 |
Key | bytes | 表示这条消息的Key,能够为null |
Value | bytes | 表示这条消息的Value。Kafka支持消息嵌套,也就是把一条消息做为Value放到另一条消息里面 |
MessageSet用来组合多条Message,它在每条Message的基础上加上了Offset和MessageSize;MessageSet是个数组,数组的每一个元素由三部分组成,分别是Offset,MessageSize和Message;
MessageSet => [Offset MessageSize Message] | ||
---|---|---|
名称 | 类型 | 描述 |
Offset | int64 | 它用来做为log中的序列号,Producer在生产消息的时候还不知道具体的值是什么,能够随便填个数字进去 |
MessageSize | int32 | 表示这条Message的大小 |
Message | - | 表示这条Message的具体内容 |
压缩方式 | 编码 |
---|---|
不压缩 | 0 |
Gzip | 1 |
Snappy | 2 |
LZ4 | 3 |
由于单条消息中重复内容可能很少,因此一般把多条消息放在一块儿组成MessageSet,而后再把MessageSet放到一条Message里面去,从而提升压缩比率;
设计思路:
Kafka网络通信架构主要由两大部分构成:SocketServer 和 RequestHandlerPool: