众所周知,Kafka作为一款优秀的消息中间件,在我们的日常工作中,我们也会接触到Kafka,用其来进行削峰、解耦等,作为开发的你,是否也是这么使用kafka的:
服务A作为生产者Producer来生产消息发送到Kafka集群,消费者Consumer通过订阅Topic来消费对应的kafka消息,一般都会将消息体进行序列化发送,消费者在消费时对消息体进行反序列化,然后进行其余的业务流程。
乍看之下,上述流程没有什么大的问题,但是你是否考虑过对于Producer或者Consumer的消息格式万一被改变,会不会造成反序列化的失败,影响业务?对于kafka而言,它是通过字节的形式进行数据传递的,它是不存在对传递数据格式检查的机制,kafka本身也是解耦的,Producer和Consumer之间只是通过Topic进行沟通的。为了保证在使用kafka时,Producer和Consumer之间消息格式的一致性,此时Schema Registry就派上用场了。
什么是Schema Registry?
Schema Registry是一个独立于Kafka Cluster之外的应用程序,通过在本地缓存Schema来向Producer和Consumer进行分发,如下图所示:
在发送消息到Kafka之前,Producer会先与Schema Registry进行通信,检查该schema是否可用,如果没有找到schema,便会在schema registry注册并缓存一份,接着Producer可以获得该schema,并且以该schema的形式对数据进行序列化,最后以预先唯一的schema ID和字节的形式发送到Kafka
当Consumer处理消息时,会从拉取到的消息中获得schemaIID,并以此来和schema registry通信,并且使用相同的schema来反序列化消息。如果此时没有匹配到对应的schema,schema registry会抛出一个error让Producer知道schema协议被破坏。
数据序列化的格式
在我们知道Schema Registry如何在Kafka中起作用,那我们对于数据序列化的格式应该如何进行选择?在我们选择合适的数据序列化格式时需要考虑的点:
1、是否序列化格式为二进制
2、是否我们可以使用schemas来强制限制数据结构
AVRO的简单介绍
AVRO是一个开源的二进制数据序列化格式。它提供了丰富的数据结构,并在c#和Java等静态类型编程语言上提供了代码生成功能。
如下是一个使用JSON格式定义的AVRO Schema的例子:
{
"type":"record",
"name":"User",
"namespace":"com.example.models.avro",
"fields":[
{
"name":"userID",
"type":"string",
"doc":"User ID of a web app"
},
{
"name":"customerName",
"type":"string",
"doc":"Customer Name",
"default":"Test User"
}
]
}
Schema演化
在我们使用Kafka的过程中,随着业务的复杂变化,我们发送的消息体也会由于业务的变化或多或少的变化(增加或者减少字段),Schema Registry对于schema的每次变化都会有对应一个version来记录的
当schema被首次创建,它会拥有一个唯一的schema ID和version,随着业务的变化,schema也在演进,我们做一些变化以及该变化是否兼容,我们会得到一个新的schema ID和新的version。有两种方式可以校验schema是否兼容
1、 采用maven plugin(在Java应用程序中)
2、采用REST 调用
到这里,Schema Register在kafka中实践分享就到这里结束了
参考链接:
https://en.wikipedia.org/wiki/Comparison_of_data-serialization_formats
https://www.confluent.io/blog/avro-kafka-data/
https://docs.confluent.io/platform/current/schema-registry/develop/maven-plugin.html#schema-registry-test-compatibility