前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据基础系列之kafka011生产者缓存超时,幂等性和事务实现

大数据基础系列之kafka011生产者缓存超时,幂等性和事务实现

作者头像
Spark学习技巧
发布2018-01-30 18:32:57
9570
发布2018-01-30 18:32:57
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

一,demo及相关类

1,基本介绍

KafkaProducer是线程安全的,多线程间共享一个实例比共享多个实例更加高效。首先搞一个demo

代码语言:js
复制
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();

2,ProducerRecord

发往kafka的key/value对。由topic,分区id(可选),key(可选),timestamp(可选),value组成。

如果一个有效的分区ID被指定,Record就会被发到指定的分区。如果,没指定分区id,只指定了key,就会按照key做hash后对分区数取余得到的数值作为分区的id。如果分区id,和key都没有指定,就会以轮训的形式发送Records。

Record还有一个timestamp属性。如果用户没有提供timestamp,生产者将会使用当前时间作为Record的timestamp。Kafka最终使用的时间戳取决于topic配置的时间类型。

1),如果topic配置使用了CreateTime,Broker就会使用生产者生产Record时带的时间戳。

2),如果topic配置使用了LogAppendTime,Record追加到log的时候,Broker会有本地时间代替Producer生产时带的时间戳。

无论是采用的上文中的哪种形式,timestamp都会被包含在RecordMetadata中返回。

代码语言:js
复制
ProducerRecord(String topic, Integer partition, K key, V value)
Creates a record to be sent to a specified topic and partition
  ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)
Creates a record to be sent to a specified topic and partition
  ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
Creates a record with a specified timestamp to be sent to a specified topic and partition
  ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
Creates a record with a specified timestamp to be sent to a specified topic and partition
  ProducerRecord(String topic, K key, V value)
Create a record to be sent to Kafka
  ProducerRecord(String topic, V value)
Create a record with no key

二,缓存和超时

生产者内部有一个buffer,用来缓存Record,同时内部有一个后台线程负责将Record转化为请求,然后将请求发给kafka集群。使用生产者后未关闭,会导致这些资源泄漏。

send方法是异步的。调用他实际上是将Record添加到Buffer中,然后立即返回。这使得生产者可以批量提交消息来提升性能。

acks配置控制发送请求完成的标准。如果设置成all,将会导致生产阻塞,等待所有副本提交日志成功后才算发送完成,超级低效但是可以最大限度的容错。

如果请求失败,生产者会自动尝试,前提是不要设置retries为零。当然,开启失败尝试也就意味着带来了数据重复发送的风险。

生产者为每个分区维护一个buffer,这个buffer的大小由batch.size指定,该值越大表示批量发送的消息数越多,也意味着需要更大的内存。内存数可以估计的。

默认情况下,即使buffer还有剩余的空间没有填充,消息也会被立即发送。如果你想减少请求的次数,可以设置linger.ms参数为大于0的某一值。使生产者发送消息前等待linger.ms指定的时间,这样就可以有更多的消息加入到该batch来。这很像TCP中的Nagle原理。例如,在上面的代码片段中,由于我们设置linger.ms为1ms,100条消息可能在一次请求中全部发送到了Server端。然而,这也意味着加入消息一直不能填充满buffer,我们要延迟一毫秒。

buffer.memory决定者生产者所能用于buffer的总内存大小。如果,消息发送的速度比传输到Server端的速度快,这个buffer空间就会耗尽。当buffer空间耗尽,send调用就会阻塞,超过max.block.ms设置的超时时间后会抛出TimeoutException。

三,序列化

Key.serializer和value.serialize决定者如何将key和value对象转化为字节数组。你可以使用包括bytearrayserializer或stringserializer简单的字符串或字节类型。也可以实现自定义的序列化方式。

四,幂等性

从kafka0.11版本开始,Kafka支持两种额外的模式:幂等性生产者和事务生产者。幂等性强化消息的传递语义,从至少一次到仅仅一次。特别是生产者重试将不再导致消息重复发送。事务生产者允许应用程序将消息原子的发送到多个分区(和主题!)。

设置enable.idempotence为true来开启幂等性,如果设置了这个参数retries配置将会被设置为默认值,也即Integer.MAX_VALUE,max.inflight.requests.per.connection会被设置为1,acks会被设置为all。幂等性生产者不需要修改API,所以现有的应用程序不需要修改就可以使用该特性。

为了利用幂等生产者,必须避免应用程序级重新发送,因为这些不能被去重。例如,如果应用程序运行幂等性,建议不要设置retries,因为他会被设置为默认值(Integer.MAX_VALUE).此外,如果send(producerrecord)返回一个错误甚至无限重试(例如,如果消息送前缓冲区满了),建议关闭生产和检查最后产生消息的内容以确保不重复。

五,事务

为了使用事务生产者和相关的APIs,必须要设置transactional.id属性.如果设置了transactional.id幂等性会自动被启用。支持事务的topic必须要进行容错配置。特别的replication.factor应该设置为3,topic的min.insync.replicas配置必须设置为2.最后,为了从端到端实现事务性保证,必须配置消费者只读取committed 的消息。

transactional.id目的是单生产者实例能从多会话中恢复。该特性就是分区的,状态的应用程序程序中的一个碎片标识符。transactional.id值在一个分区的应用中每个消费者实例必须是唯一的。

所有新的事务性API都会被阻塞,将在失败时抛出异常。举一个简单的例子,一次事务中提交100条消息。

代码语言:js
复制
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

producer.initTransactions();

try {
  producer.beginTransaction();
 for (int i = 0; i < 100; i++)
  producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
 producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
 // We can't recover from these exceptions, so our only option is to close the producer and exit.
 producer.close();
} catch (KafkaException e) {
 // For all other exceptions, just abort the transaction and try again.
 producer.abortTransaction();
}
producer.close();

就如例子一样,每个消费者只能有一个事务开启。在beginTransaction() 和commitTransaction()中间发送的所有消息,都是一次事务的一部分。

事务生产者使用execeptions进行错误状态交流。特别之处,我们不需要为producer.send指定回调函数。任何在事务中不可恢复的错误发生都会抛出一个KafkaException异常(http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord))。

在接受到一个kafkaexection异常之后,通过调用producer.abortTransaction(),可以保证所有的已经写入成功的消息会被标记为aborted,因此保证事务传输。

六,总结

本文主要是阐述缓存和超时机制,序列化及反序列化,幂等性生产者,事务生产者。大家可以根据需要进行选择.

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-07-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档