前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka 新版生产者 API

Kafka 新版生产者 API

作者头像
CoderJed
发布2018-09-13 10:29:13
2.1K0
发布2018-09-13 10:29:13
举报
文章被收录于专栏:Jed的技术阶梯

1. kafka 生产者发送消息的流程

2. Kafka 生产者发送数据的3种方式

(1) 发送并忘记(fire-and-forget)

把消息发送给服务器,但并不关心它是否正常到达。大多数情况下,消息会正常到达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息。

代码语言:javascript
复制
package com.bonc.rdpe.kafka110.producer;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * @Title Producer01.java 
 * @Description Kafka 生产者发送消息的第一种方式:发送并忘记
 * @Author YangYunhe
 * @Date 2018-06-21 10:35:34
 */
public class Producer01 {
    
    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("acks", "1");
        props.put("retries", 3);
        props.put("batch.size", 16384); // 16K
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432); // 32M
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        String filePath = Producer01.class.getClassLoader().getResource("wechat_data.txt").getPath();
        BufferedReader br = new BufferedReader(new FileReader(filePath));

        String line;
        while((line = br.readLine()) != null) {
            // 创建 ProducerRecord 可以指定 topic、partition、key、value,其中 partition 和 key 是可选的
            // ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", 0, "key", line);
            // ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", "key", line);
            ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", line);
            
            // 只管发送消息,不管是否发送成功
            producer.send(record);
            Thread.sleep(100);
        }
        producer.close();
    }
}

(2) 同步发送

使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待(会返回元数据或者抛出异常), 就可以知道消息是否发送成功。

代码语言:javascript
复制
package com.bonc.rdpe.kafka110.producer;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * @Title Producer02.java 
 * @Description Kafka 生产者发送消息的第二种方式:同步发送
 * @Author YangYunhe
 * @Date 2018-06-21 10:38:37
 */
public class Producer02 {
    
    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        String filePath = Producer02.class.getClassLoader().getResource("wechat_data.txt").getPath();
        BufferedReader br = new BufferedReader(new FileReader(filePath));

        String line;
        while((line = br.readLine()) != null) {
            ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", line);
            // 程序阻塞,直到该条消息发送成功返回元数据信息或者报错
            RecordMetadata metadata = producer.send(record).get();
            StringBuilder sb = new StringBuilder();
            sb.append("record [").append(line).append("] has been sent successfully!").append("\n")
                .append("send to partition ").append(metadata.partition())
                .append(", offset = ").append(metadata.offset());
            System.out.println(sb.toString());
            Thread.sleep(100);
        }
        producer.close();
    }
}

(3) 异步发送

大多数时候,我们并不需要等待响应——尽管 Kafka会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来说不是必需的。

不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志等,这样的情况下可以使用异步发送消息的方式,调用 send() 方法,并指定一个回调函数,服务器在返回响应时调用该函数。

代码语言:javascript
复制
package com.bonc.rdpe.kafka110.producer;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * @Title Producer03.java 
 * @Description Kafka 生产者发送消息的第三种方式:异步发送
 * @Author YangYunhe
 * @Date 2018-06-21 11:06:05
 */
public class Producer03 {
    
    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        
        String filePath = Producer03.class.getClassLoader().getResource("wechat_data.txt").getPath();
        BufferedReader br = new BufferedReader(new FileReader(filePath));

        String line;
        while((line = br.readLine()) != null) {
            ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", line);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    // 如果发送消息成功,返回了 RecordMetadata
                    if(metadata != null) {
                        StringBuilder sb = new StringBuilder();
                        sb.append("message has been sent successfully! ")
                            .append("send to partition ").append(metadata.partition())
                            .append(", offset = ").append(metadata.offset());
                        System.out.println(sb.toString());
                    }
                    // 如果消息发送失败,抛出异常
                    if(e != null) {
                        e.printStackTrace();
                    }
                }
            });
            Thread.sleep(100);
        }
        producer.close();
    }
}

3. 多线程生产者

在数据量比较大同时对发送消息的顺序没有严格要求时,可以使用多线程的方式发送数据,实现多线程生产者有两种方式:1. 实例化一个 KafkaProducer 对象运行多个线程共享该对象发送消息;2. 实例化多个 KafkaProducer 对象。 由于 Kafka Producer 是线程安全的,所以多个线程共享一个 Kafka Producer 对象在性能上要好很多。

(1) 线程类实现

代码语言:javascript
复制
package com.bonc.rdpe.kafka110.thread;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * @Title KafkaProducerThread.java 
 * @Description 多线程生产者的线程类实现
 * @Author YangYunhe
 * @Date 2018-06-25 13:54:38
 */
public class KafkaProducerThread implements Runnable {
    
    private KafkaProducer<String, String> producer;
    private ProducerRecord<String, String> record;
    
    public KafkaProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
        this.producer = producer;
        this.record = record;
    }

    @Override
    public void run() {
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(exception != null) {
                    System.out.println("exception occurs when sending message: " + exception);
                }
                if(metadata != null) {
                    StringBuilder result = new StringBuilder();
                    result.append("message[" + record.value() + "] has been sent successfully! ")
                        .append("send to partition ").append(metadata.partition())
                        .append(", offset = ").append(metadata.offset());
                    System.out.println(result.toString());
                }
            }
        });
    }
}

(2) 发送消息的具体实现

代码语言:javascript
复制
package com.bonc.rdpe.kafka110.producer;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.bonc.rdpe.kafka110.thread.KafkaProducerThread;

/**
 * @Title MultiProducer.java 
 * @Description 多线程生产者的测试代码
 * @Author YangYunhe
 * @Date 2018-06-25 14:30:58
 */
public class MultiProducer {
    
    private static final int THREADS_NUMS = 10;
    
    public static void main(String[] args) {
        
        ExecutorService executor = Executors.newFixedThreadPool(THREADS_NUMS);
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record;
        
        try {
            for(int i = 0; i < 100; i++) {
                record = new ProducerRecord<>("dev3-yangyunhe-topic001", "hello" + i);
                executor.submit(new KafkaProducerThread(producer, record));
                Thread.sleep(1000);
            }
        }catch (Exception e) {
            System.out.println("exception occurs when sending message: " + e);
        }finally {
            producer.close();
            executor.shutdown();
        }
    }
}

(3) 运行结果:

代码语言:javascript
复制
message[hello0] has been sent successfully! send to partition 1, offset = 705
message[hello1] has been sent successfully! send to partition 0, offset = 705
message[hello2] has been sent successfully! send to partition 2, offset = 704
message[hello3] has been sent successfully! send to partition 1, offset = 706
message[hello4] has been sent successfully! send to partition 0, offset = 706

......

4. Kafka Producer 常用配置(kafka-1.1.0)

(1) acks

  • 类型:string
  • 默认值:1
  • 可设置值:[all, -1, 0, 1]
  • 重要性:高
  • 说明:
    • 0:生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
    • 1:只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象的 get() 方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。
    • all:只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过,它的延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。
    • -1:作用与"all"是一样的。

(2) buffer.memory

  • 类型:long
  • 默认值:33554432(32M)
  • 可设置值:[0,...]
  • 重要性:高
  • 说明:该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。 这个时候,send()方法调用要么被阻塞,要么抛出异常,取决于如何设置 max.block.ms (类型:long,默认值:60000(1分钟),可设置值:[0,...],重要性:中等)参数。表示在抛出异常之前可以阻塞的时间。

(3) compression.type

  • 类型:string
  • 默认值:none
  • 可设置值:[none, gzip, snappy, lz4]
  • 重要性:高
  • 说明:该参数可以指定消息被发送给 broker 之前使用哪一种压缩算法进行压缩。snappy 压缩算法由 Google 发明,它占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。gzip 压缩算法一般会占用较多的 CPU,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。

(4) retries

  • 类型:int
  • 默认值:0
  • 可设置值:[0,...,2147483647]
  • 重要性:高
  • 说明:生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过 retry.backoff.ms(类型:long,默认值:100, 可设置值:[0,...],重要性:低) 参数来改变这个时间间隔。 建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间),让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不过有些错误不是临时性错误,没办法通过重试来解决(比如"消息太大"错误)。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。

(5) batch.size

  • 类型:int
  • 默认值:16384(16K)
  • 可设置值:[0,...]
  • 重要性:中等
  • 说明:当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。

(6) linger.ms

  • 类型:long
  • 默认值:0
  • 可设置值:[0,...]
  • 重要性:中等
  • 说明:该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。把 linger.ms 设置成比 0 大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。

(7) max.in.flight.requests.per.connection

  • 类型:int
  • 默认值:5
  • 可设置值:[1,...]
  • 重要性:低
  • 说明:该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。 把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。

(8) max.request.size

  • 类型:int
  • 默认值:1048576
  • 可设置值:[0,...]
  • 重要性:中等
  • 说明:该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1MB,那么可以发送的单个最大消息为 1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1KB。另外,broker 对可接收的消息最大值也有自己的限制(message.max.bytes(类型:int,默认值:1000012,大约0.95M,可设置值:[0,...],重要性:高)),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。

(9) receive.buffer.bytes 和 send.buffer.bytes

receive.buffer.bytes

  • 类型:int
  • 默认值:32768(32K)
  • 可设置值:[-1,...]
  • 重要性:中等

send.buffer.bytes

  • 类型:int
  • 默认值:131072(128K)
  • 可设置值:[-1,...]
  • 重要性:中等

说明:这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

(10) client.id

  • 类型:string
  • 默认值:""
  • 可设置值:任意字符串
  • 重要性:中等
  • 说明:该参数可以是任意的字符串,服务器会用它来识别消息的来源。

(11) request.timeout.ms

  • 类型:int
  • 默认值:30000
  • 可设置值:[0,...]
  • 重要性:中等
  • 说明:该参数指定了生产者在发送数据时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。[metadata.fetch.timeout.ms] and [timeout.ms] have been removed. They were initially deprecated in Kafka 0.9.0.0.

(12) max.block.ms

  • 类型:long
  • 默认值:60000
  • 可设置值:[0,...]
  • 重要性:中等
  • 说明:该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。

(13) connections.max.idle.ms

  • 类型:long
  • 默认值:540000
  • 可设置值:[0,...]
  • 重要性:中等
  • 说明:关闭空闲连接的等待时间,检测到空闲的连接后,默认等待9分钟才会关闭这个连接。

(14) metadata.max.age.ms

  • 类型:long
  • 默认值:300000
  • 可设置值:[0,...]
  • 重要性:低
  • 说明:更新元数据的时间间隔,在等待该参数配置的时间后,即使 producer 没有发现任何 partition 或 leader 的变化,也会强制刷新元数据。

(15) reconnect.backoff.ms

  • 类型:long
  • 默认值:50
  • 可设置值:[0,...]
  • 重要性:低
  • 说明:尝试重新连接 broker 的时间间隔。

(16) reconnect.backoff.max.ms

  • 类型:long
  • 默认值:1000
  • 可设置值:[0,...]
  • 重要性:低
  • 说明:如果重新连接的时间累积到达该参数的配置时间还没有连接到 broker,那么宣告连接失败。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.06.25 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. kafka 生产者发送消息的流程
  • 2. Kafka 生产者发送数据的3种方式
    • (1) 发送并忘记(fire-and-forget)
      • (2) 同步发送
        • (3) 异步发送
        • 3. 多线程生产者
          • (1) 线程类实现
            • (2) 发送消息的具体实现
              • (3) 运行结果:
              • 4. Kafka Producer 常用配置(kafka-1.1.0)
                • (1) acks
                  • (2) buffer.memory
                    • (3) compression.type
                      • (4) retries
                        • (5) batch.size
                          • (6) linger.ms
                            • (7) max.in.flight.requests.per.connection
                              • (8) max.request.size
                                • (9) receive.buffer.bytes 和 send.buffer.bytes
                                  • (10) client.id
                                    • (11) request.timeout.ms
                                      • (12) max.block.ms
                                        • (13) connections.max.idle.ms
                                          • (14) metadata.max.age.ms
                                            • (15) reconnect.backoff.ms
                                              • (16) reconnect.backoff.max.ms
                                              领券
                                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档