专栏首页实时计算Kafka JAVAAPI的使用之Producer(核心原理与示例)

Kafka JAVAAPI的使用之Producer(核心原理与示例)

通过https://www.cnblogs.com/tree1123/p/11243668.html 已经对consumer有了一定的了解。producer比consumer要简单一些。

一、旧版本producer

0.9.0.0版本以前,是由scala编写的旧版本producer。

入口类:kafka.producer.Producer

代码示例:

Properties properties = new Properties();
        properties.put("metadata.broker.list", "kafka01:9092,kafka02:9092");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("request.requird.acks", "1");
        ProducerConfig config = new ProducerConfig(properties);
        Producer<String, String> producer = new Producer<String, String>(config);
        KeyedMessage<String,String> msg = new KeyedMessage<String,String>("topic","hello");
        Producer.send(msg);

旧版本是同步机制,等待响应。吞吐性很差。在0.9.0.0版本以后,正式下架了。

旧版本的方法:

send   发送
close   关闭
sync   异步发送  有丢失消息的可能性

二、新版本producer

旧版本producer由scala编写,0.9.0.0版本以后,新版本producer由java编写。

新版本主要入口类是:org.apache.kafka.clients.producer.KafkaProducer

常用方法:

send  实现消息发送主逻辑
close  关闭producer   
metrics  获取producer的实时监控指标数据 比如发送消息的速率

Kafka producer要比consumer设计简单一些,主要就是向某个topic的某个分区发送一条消息。partitioner决定向哪个分区发送消息。用户指定key,默认的分区器会根据key的哈希值来选择分区,如果没有指定key就以轮询的方式选择分区。也可以自定义分区策略。

确定分区后,producer寻找到分区的leader,也就是该leader所在的broker,然后发送消息,leader会进行副本同步ISR。

producer会启两个线程,主线程封装ProducerRecord类,序列化后发给partitioner,然后发送到内存缓冲区。

另一个I/O线程,提取消息分batch统一发送给对应的broker。

示例代码:

Properties properties = new Properties();
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 600; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i));
            System.out.println("testkafka"+i);
        }
        kafkaProducer.close();

1、构造Properties对象,bootstrap.servers key.serializer value.serializer是必须指定的。

2、使用Properties构造KafkaProducer对象。

3、构造ProducerRecord 指定topic 分区 key value。

4、KafkaProducer的send方法发送。

5、关闭KafkaProducer。

Properties主要参数:

bootstrap.servers 和consumer一样,指定部分broker即可。而且broker端如果没有配ip地址,要写成主机名。

key.serializer value.serializer 序列化参数 一定要全类名 没有key也必须设置。

acks 三个值

​ 0: producer完全不管broker的处理结果 回调也就没有用了 并不能保证消息成功发送 但是这种吞吐量最高

​ all或者-1: leader broker会等消息写入 并且ISR都写入后 才会响应,这种只要ISR有副本存活就肯定不会丢失,但吞 吐量最低。

​ 1: 默认的值 leader broker自己写入后就响应,不会等待ISR其他的副本写入,只要leader broker存活就不会丢失,即保证了不丢失,也保证了吞吐量。

buffer.memory 缓冲区大小 字节 默认是33554432 就是发送消息的内存缓冲区大小 过小的话会影响吞吐量

compression.type 设置是否压缩消息 默认值是none 压缩后可以降低IO开销提高吞吐,但是会增大CPU开销。

​ 支持三种: GZIP Snappy LZ4 性能 LZ4 > Snappy > GZIP

retries 发送消息重试的次数 默认0 不重试 重试可能造成重复发送 可能造成乱序

​ retry.backoff.ms 设置重试间隔 默认100毫秒

batch.size 调优重要的参数 batch小 吞吐量也会小 batch大 内存压力会大 默认值是16384 16KB

linger.ms 发送延时 默认是0 0的话不用等batch满就发送 延时的话可以提高吞吐 看具体情况进行调整

max.request.size producer能够发送最大消息的大小 默认1048576字节 如果消息很大 需要修改它

request.timeout.ms 发送请求后broker在规定时间返回 默认30秒 超过就是超时了。

Send方法

fire and forget 就是上边的示例

Properties properties = new Properties();
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 600; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i));
            System.out.println("testkafka"+i);
        }
        kafkaProducer.close();

异步回调 不阻塞

Properties properties = new Properties();
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 600; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i),new Callback(){
              public void onCompletion(RecordMetadata metadata, Exception e) {
                         if(e != null) {
                            e.printStackTrace();
                         } else {
                            System.out.println("The offset of the record we just sent is: " +       metadata.offset());
                         }
                     }           
            });
            System.out.println("testkafka"+i);
        }
        kafkaProducer.close();

同步发送 无限等待返回

producer.send(record).get()

重试机制

如果需要自定义重试机制,就要在回调里对不同异常区别对待,常见的几种如下:

可重试异常

LeaderNotAvailableException :分区的Leader副本不可用,这可能是换届选举导致的瞬时的异常,重试几次就可以恢复 NotControllerException:Controller主要是用来选择分区副本和每一个分区leader的副本信息,主要负责统一管理分区信息等,也可能是选举所致。

NetWorkerException :瞬时网络故障异常所致。

不可重试异常

SerializationException:序列化失败异常

RecordToolLargeException:消息尺寸过大导致。

示例代码:

 producer.send(myRecord,
                   new Callback() {
                       public void onCompletion(RecordMetadata metadata, Exception e) {
                           if(e ==null){
                               //正常处理逻辑
                               System.out.println("The offset of the record we just sent is: " + metadata.offset()); 
                               
                           }else{
                                   
                                 if(e instanceof RetriableException) {
                                    //处理可重试异常
                                    ......
                                 } else {
                                    //处理不可重试异常
                                    ......
                                 }
                           }
                       }
                   });
分区机制

partitioner决定向哪个分区发送消息。用户指定key,默认的分区器会根据key的哈希值来选择分区,如果没有指定key就以轮询的方式选择分区。也可以自定义分区策略。

对于有key的消息,java版本的producer自带的partitioner会根据murmur2算法计算消息key的哈希值。然后对总分区数求模得到消息要被发送到的目标分区号。

自定义分区策略:

创建一个类,实现org.apache.kafka.clients.producer.Partitioner接口

主要分区逻辑在Partitioner.partition中实现:通过topic key value 一同确定分区

在构造KafkaProducer得Properties中设置partitioner.class 为自定义类 注意是全类名

序列化机制

常用的serializer

ByteArraySerializer.class

ByteBufferSerializer.class

BytesSerializer.class

DoubleSerializer.class

IntegerSerializer.class

LongSerializer.class

StringSerializer.class

但是其他一些复杂的就需要自定义序列化:

1、定义数据格式

2、创建自定义序列化类,实现org.apache.kafka.common.serialization.Serializer接口

3、在KafkaProducer的Properties中设置key.serializer value.serializer为自定义类

以上均为单线程的情况,但producer是线程安全的,单线程适合分区较少的情况,分区较多可以多线程但对内存损耗较大。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 动态加载css方法实现和深入解析

    此动态加载css方法 loadCss,剥离自Sea.js,并做了进一步的优化(优化代码后续会进行分析)。

    我是leon
  • Jsp和Servlet有什么区别?

    Web容器加载Servlet并将其实例化后,Servlet生命周期开始,容器运行其init()方法进行Servlet的初始化;请求到达时调用Servlet的se...

    李红
  • confd + Nacos | 无代码侵入的配置变更管理

    为什么要支持confd,老的应用配置管理模式是启动时读取配置文件,然后重新读取配置文件需要应用重启。

    Java技术栈
  • .Net轻松实现支付宝服务窗网页授权并获取用户相关信息

     最近在开发一个商业街区的聚合扫码支付功能,其中需要用到的有支付宝,微信两种支付方式,当然对于开发微信支付而已作为自己的老本行已经比较熟悉了,然而对于我来说支...

    追逐时光
  • 柯里化与反柯里化

    当执行var add = currying(...)时,add变量已经指向了next方法。此时,allArgs在next方法内部有引用到,所以不能被GC回收。也...

    我是leon
  • 分享:手把手生成漂亮的静态文档说明页

    最近经常被问 https://t.itmuch.com/doc.html 文档页是怎么制作的,考虑到步骤略复杂,写篇手记总结下吧。

    用户1516716
  • JavaScript嗅探执行神器-sniffer.js,你值得拥有!

    这样,不管a.js文件多大,Wall.say('wall')都可以等到文件真正加载完后,再执行。

    我是leon
  • java怎么发起HttpRequest请求,返回状态码和内容并解析json

    gfu
  • java反射机制简单介绍

    gfu
  • java读取txt中的内容,批量处理重复的内容,对数据库字段的修改非常有用!

    先来一个简单的例子,比如给了我们db table中的几个字段,我们需要拼写sql语句去重复插入。

    gfu

扫码关注云+社区

领取腾讯云代金券