前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >构造producer---Kafka从入门到精通(六)

构造producer---Kafka从入门到精通(六)

作者头像
用户9919783
发布2022-07-26 12:27:34
5280
发布2022-07-26 12:27:34
举报
文章被收录于专栏:后端从入门到精通

上篇文章说了,kafka新版旧版的区别,producer全部异步发消息,并且提供回调机制callback,判断是否成功,通过分批次发送batching保证吞吐量,分区策略更加合理,旧版本默认是在一段时间内把消息发到固定区域,新版本采用轮询,消息更加均匀。Consumer新版为单线程执行,单个consumer线程管理多个socker,在10版本后,加入了心跳线程,这最多也就算了是双线程。偏移量 在新版本交给kafka处理,舍弃了zookeeper,这样可以依赖kafka备份机制天然实现高可用原理。

Kafka历史---Kafka从入门到精通(五)

一、构造producer

构造一个producer大致需要实现五个步骤:

1)构造一个properties,然后指定bootstrap.server,key.serializer和value.serializer这三个属性。这三个属性是必须的,service代表,localhost:9092,Key.serializer是

org.apache.kafka.common.serialization.stringSerializer。Value.serializer和key的 序列化一致。

2)使用propertites 构造kafkaProducer对象。

3)构造待发送对象,producerRecord,指定把消息发送到topic,partition,以及对应的key和value,其中partition和key不用指定。

4)调用kafkaProducer的send发送消息。

5)关闭kafkaProducer。

1、properties对象的构造

下面详细展开每一步要做的事,首先构造properties这里有三个参数是必须要指定的,他们分别如下:

1、bootstrap.servers:该参数指定host:port,用于kafka的broker服务器连接,producer使用时候会替换成实际的broker列表,如果kafka集群数量很多,那么只需要指定部分broker即可,不需要列出所有机器,因为不管指定几台broker,producer都会通过该参数发现集群中所有broker,该参数指定多台机器只为故障转移,这样即使一台broker挂了,producer重启后依然可以指定其他broker连接kafka集群。

2、Key.serializer:被发送到broker任何格式都必须是字节数组,因此消息的各个组件组件必须首先做序列化,然后才能发送到broker。该参数就是为消息key做序列化用的。这个参数指定是实现了serializer接口类全限定名称,kafka的大部分默认初始类型是primitive type,提供了现成的序列化器。StringSerializer,该类会将一个字符串转成字节数组,这个参数也揭示一个事实,这个用户可以自定义序列化器,只要实现serializer接口就可以。需要注意的时候,producer发送消息不指定key,也是需要配置这个参数的。

3、Value.serializer:org apache.kafka.common.serialization. Serializer,需要注意的是,这个必须写全称,和key.serializer类似,将消息的value部分转成字节数组。

2、KafkaProducer的构造

这时候就开始构造发消息的主入口,所有的功能都由kafkaProducer提供,只需要命令:

Producer<String,String> producer = new KafkaProducer<>(props);

创建producer的时候也可以把key和value序列化,如果序列化了,就不需要在properties的时候序列化。

3、producerRecord对象构造

下一步构造消息实例,java版本的producer使用producerRecord类来代表每条消息,创建producerRecord也很简单,指定topic和value,当然这里还可以指定pratition和key,值得注意的是,时间戳一定要谨慎使用,时间戳索引文件中索引项都是严格按照时间戳顺序,会导致该消息时间序列混乱,因此让kafka自行定义时间戳比较稳妥。

4、发送消息

Kafka producer发消息主要用send方法,虽然send只是两个简单方法签名,但是producer在底层完全实现了异步发送,并且使用java提供的future同时实现了同步发送 和 异步发送 +回调(callback)两种方式。网上教程大部分是那种send,就不管,专有名称叫fire and forget,发送就忘记,这种在实际场景中不被推荐使用,因为对于发送结果producer完全不知道,所以真实的使用场景中,同步和异步发送还是最常见的方法。

异步发送

实际上所有写入操作都是默认异步,java版本的producer和send方法会返回一个java 的future对象供用户稍后获取发送结果,这就是所谓回调机制。

代码语言:javascript
复制
  ProducerRecord producerRecord = new ProducerRecord("kafka-boot", "foo" + i);
                ProducerFactory factory = kafkaTemplate.getProducerFactory();
                Producer producer = factory.createProducer();
                producer.send(producerRecord, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (e == null) {
                           
                        }else if(e instanceof RetriableException){
                           
                        }else{
                           
                        }
                    }
                });

上面方法就是回调,实现了onCompletion方法,该方法两个参数metadata和exception,可以用if判断下,他们不会同时为null,也就是说至少有一个为null,当消息发送成功时候,exception为null,当消息发送失败的时候,metadata为null。

另外,上面的callback实际是java的接口,用户可以自定义callback实现类来处理消息发送后的逻辑,只需要实现org.apache.kafka.clients.producer.Callback接口即可。

同步发送

同步发送 和异步发送是通过future来区分的,调用future.get()无线等待结果返回,即实现同步发送效果。

使用future.get()会一直等待下去,直到kafka broker将返回结果给producer,当结果从broker处返回时get方法要么返回结果,要么抛出异常,由producer自行处理。如果没有错误,get将返回对应的recordMetada实例(包含已发送消息的所有元素),包括消息发送的topic,分区以及消息对应分区的位移信息。

不管同步发送还是异步发送都会发送失败的可能,导致返回异常错误,当前kafka的错误类型包含两类:可重试异常 和 不可重试异常。

常见可重试异常如下:

LeaderNotAvailableException:分区的leader副本不可用,通常出现在leader换届选举期间,通常是瞬时的异常,重试之后可以自行恢复。

NotControllerException:controller当前不可用,(后面会重点讲解controller),通常表面controller在选举,也可以重试恢复。

NetworkException:网络瞬时故障导致的异常。

对于这种可重试的异常,如果在 producer 程序中配置了重试次数,那么只要在规定的重试次数内自行恢复了,便不会出现在 onCompletion exception 中。不过若超过了重试次数仍没有成功,则仍然会被封装进 exception 中。此时就需要 producer 程序自行处理这种异常。

那么不可重试异常哪些呢:

RecordTooLargeException :发送的消息尺寸过大,超过了规定的大小上限 显然这种异常无论如何重试都是无法成功的。

SerializationException :序列化失败异常,这也是无法恢复的

KafkaException :其他类型的异常

所有这些不可重试异常 旦被捕获都会被封装进 Future 的计算结果井返回给 producer 程序,用户需要自行处理这些异常。由于不可重试异常和可重试异常在 producer 程序端可能有不同的处理逻辑,所以需要不同的区分。

5、关闭producer

程序结束一定要close,毕竟producer是占用系统资源的(比如创建了额外线程,申请了很多内存以及创建了socket连接等),因此必须要显式的调用kafkaProducer.close方法关闭producer。

如果只是普通的无参数调用close,则会等producer 会被允许先处理完之前的发送请求后再关闭,即所谓的“优雅”关闭退出( graceful shutdown) ;同时, KafkaProducer 还提供个带超时参数的 close 方法 close(timeout 如果调用此方法, producer 会等待 timeout 时间来完成所有处理中的请求,然后强行退出。这就是说,若 timeout 超时,则 producer 会强制结束,并立即丢弃所有未发送以及未应答的发送请求,在某种程度上,仿佛 producer端的程序丢失了要发送的消息。因此在实际场景中一定要谨慎使用带超时的 close 方法。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-07-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端从入门到精通 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、构造producer
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档