我们会创建一个名为 my-topic Kafka 主题(Topic),然后创建一个使用该主题发送记录的 Kafka 生产者。Kafka 发送记录可以使用同步方式,也可以使用异步方式。...对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常。 5. 异步发送消息 假设消息在应用程序和 Kafka 集群之间一个来回需要 10ms。...大多数时候,我们并不需要等待响应,尽管 Kafka 会把主题、分区以及消息的偏移量发送回来,但对于发送端的应用程序来说不是必需的。...不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志,或者把消息写入错误消息文件以便日后分析。 为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。...(record, new AsyncSendCallback()); producer.close(); 为了使用回调,需要一个实现了 org.apache.kafka.clients.producer.Callback
,分别为:普通发送(发后即忘)、同步发送、异步发送。.../** * @description: 方式二:同步发送消息,可靠性高,要么消息被发送成功,要么发生异常。如果发生异常,可以捕获并进行相应的处理。...(); } 3、异步发送 单纯的send()方法就是异步请求,不过与 ”发后即忘“ 方式不同的是,我们需要对发送失败的消息进行异常日志记录,方便日后分析。...为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持,示例如下: /** * @description: 方式三:异步发送消息,增加一个回调函数。...(); } 三、生产者拦截器 1、拦截器概述 生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求
同步发送的方式可 靠性高,要么消息被发送成功,要么发生异常。如果发生异常 ,则可以捕获并进行相应的处理,而不会像“发后即忘”的方式直接造成消息的丢失。...1.3 异步发送 异步发送一般是在 send()方法里指定一个回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认。...有读者或许会有疑问, send()方法的返回值类型就是 Future,而 Future本身就可以用作异步的逻辑处理 。...使用 Callback的方式非常简洁明了, Kafka 有 响应时就会回调 , 要么发送成功,要么抛出异常。...生产者拦截器既可以用来在消息发送前做一些准备工作, 比如按照某个规则过滤不符合要求的消息、修改消息的内容等, 也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
异步发送 普通异步发送 需求:创建Kafka生产者,采用异步的方式发送到Kafka broker 异步发送流程 Code <!...over - 4 36 over - 5 37 over - 6 38 over - 7 39 over - 8 40 over - 9 忽略我这个offset … 我都发了好多次了… 看控制台的吧 带回调函数的异步发送...回调函数callback()会在producer收到ack时调用,为异步调用。...如果Exception为null,说明消息发送成功, 如果Exception不为null,说明消息发送失败 带回调函数的异步发送流程 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。...调用send方法,发送消息 for (int i = 0; i < 10; i++) { // 添加回调 // 该方法在Producer收到
需要自己处理各种异常情况; (2)需要自己管理offset(以实现消息传递的其他语义); Group消费模型更加简单,但是不灵活: (1)不需要自己处理异常情况,不需要自己管理offset; (2)只能实现...,轮询,随机) 设置生产者参数 (缓存队列长度,发送时间,同步/异步参数设置) 根据负载均衡算法和设置的生产者参数构造Producer对象 while True getMessage:从上游获得一条消息...按照kafka要求的消息格式构造kafka消息 根据分区算法得到分区 发送消息 处理异常 2.4.两种生产模型对比 同步生产模型: (1)低消息丢失率; (2)高消息重复率(由于网络原因,回复确认未收到...); (3)高延迟 (每发一条消息需要确认) (使用在不丢消息场景) 异步生产模型: (1)低延迟; (2)高发送性能;(每秒一个分区发50万条) (3)高消息丢失率(无确认机制,发送端队列满了,消息会丢掉...2.6.java客户端参数调优 message.send.max.retries: 发送失败重试次数; retry.backoff.ms :未接到确认,认为发送失败的时间; producer.type:
上篇文章说了,kafka新版旧版的区别,producer全部异步发消息,并且提供回调机制callback,判断是否成功,通过分批次发送batching保证吞吐量,分区策略更加合理,旧版本默认是在一段时间内把消息发到固定区域...和 异步发送 +回调(callback)两种方式。...异步发送 实际上所有写入操作都是默认异步,java版本的producer和send方法会返回一个java 的future对象供用户稍后获取发送结果,这就是所谓回调机制。...不管同步发送还是异步发送都会发送失败的可能,导致返回异常错误,当前kafka的错误类型包含两类:可重试异常 和 不可重试异常。...producer 程序,用户需要自行处理这些异常。
旧版本的方法: send 发送 close 关闭 sync 异步发送 有丢失消息的可能性 二、新版本producer 旧版本producer由scala编写,0.9.0.0版本以后,新版本...另一个I/O线程,提取消息分batch统一发送给对应的broker。...acks 三个值 0: producer完全不管broker的处理结果 回调也就没有用了 并不能保证消息成功发送 但是这种吞吐量最高 all或者-1: leader broker会等消息写入 并且...testkafka0613"+i)); System.out.println("testkafka"+i); } kafkaProducer.close(); 异步回调...(record).get() 重试机制 如果需要自定义重试机制,就要在回调里对不同异常区别对待,常见的几种如下: 可重试异常 LeaderNotAvailableException :分区的Leader
消息的发送(同步、异步、回调) ProducerRecord是消息的载体。...消息的发送主要有三种模式 发后即忘(fire-and-forget) 同步(sync) 异步(async) 案例中的发送方式就是发后即忘,它只管往kafka中发送消息而不关心消息是否正确送达。...案例中,send方法之后直接链式调用了get()方法来阻塞等待Kafka的响应,知道消息发送成功或发生异常。如果发生异常,那么就需要捕获异常并交由逻辑处理层。...来了解一下异步发送方式,一般是在send方法里指定一个Callback回调函数,Kafka在返回响应时调用该函数来实现异步发送确认。Kafka有响应时就会回调,要么发送成功,要么抛出异常。...如果Producer与Kafka处于不同的机房,则可以适当调大这个参数。
(); } } (3) 异步发送 大多数时候,我们并不需要等待响应——尽管 Kafka会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来说不是必需的。...不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志等,这样的情况下可以使用异步发送消息的方式,调用 send() 方法,并指定一个回调函数,服务器在返回响应时调用该函数。...Kafka 生产者发送消息的第三种方式:异步发送 * @Author YangYunhe * @Date 2018-06-21 11:06:05 */ public class Producer03...一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。...如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。
异步发送 异步发送指的是我们调用 send() 方法,并制定一个回调函数,服务器在返回响应时调用该函数。 下一节我们会重新讨论这三种实现。...大多数时候,虽然Kafka 会返回 RecordMetadata 消息,但是我们并不需要等待响应。 为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。...如果 kafka 返回一个错误,onCompletion 方法会抛出一个非空(non null)异常,这里我们只是简单的把它打印出来,如果是生产环境需要更详细的处理,然后在 send() 方法发送的时候传递一个...在 Kafka 中,压缩会发生在两个地方:Kafka Producer 和 Kafka Consumer,为什么启用压缩?说白了就是消息太大,需要变小一点 来使消息发的更快一些。...因为消息的发送也分为 同步 和 异步,Kafka 为了保证消息的高效传输会决定是同步发送还是异步发送。
执行之后的结果: 同样的也能获取结果,同时发现回调的线程并不是上文同步时的主线程,这样也能证明是异步回调的。...同时回调的时候会传递两个参数: RecordMetadata 和上文一致的消息发送成功后的元数据。 Exception 消息发送过程中的异常信息。...从这里也可以看出为什么之前说发送完成后元数据和异常信息只会出现一个。 Producer 参数解析 发送流程讲完了再来看看 Producer 中比较重要的几个参数。...还是由于网络问题,本来消息已经成功写入了但是没有成功响应给 producer,进行重试时就可能会出现消息重复。这种只能是消费者进行幂等处理。...配置一个最大 producer 个数。 发送消息时首先获取一个 producer,获取的同时判断是否达到最大上限,没有就新建一个同时保存到内部的 List 中,保存时做好同步处理防止并发问题。
UnknownTopicOrPartitionException是可重试异常 4.1 两种重试方案 4.1.1 kafka 客户端配置 4.1.2 producer 代码捕获异常并手工重试 1....(未测试具体耗时) 所以当 kafka 正在创建这个 topic 的时候,producer 就向其发数据,那肯定 topic 是不存在的,因此报这个异常。 4....4.1 两种重试方案 4.1.1 kafka 客户端配置 spring.kafka.producer.retries = 3 4.1.2 producer 代码捕获异常并手工重试 可以通过实现ListenableFutureCallback...>接口,设置回调。...// 实现这个回调方法,判断 Throwable 类型,手工处理重试 void onFailure(Throwable var1); 参考 Kafka常见错误整理 Kafka运维填坑
Producer异步发送演示 在上文中介绍了AdminClient API的使用,现在我们已经知道如何在应用中通过API去管理Kafka了。...Producer API具有以下几种发送模式: 异步发送 异步阻塞发送 异步回调发送 接下来,使用一个简单的例子演示一下异步向Kafka发送消息。...异步回调发送演示 如果想要在发送完消息后获取结果,比起直接调用Future的get方法更好的方式是使用异步回调的消息发送形式。...在send方法中支持传入一个回调函数,当消息发送完毕后,会调用回调函数并将结果当作参数传入,此时我们就可以在回调函数中对结果进行处理。...代码示例: /** * 演示Producer异步回调发送 */ public static void producerAsyncCallbackSend() throws Exception {
举个例子如何实现自定义的partitioner呢,假设我们有个类似审计功能,审计功能发送kafka的时候可以给他分配字符串“audit”,我们想让这类消息发到topic最后一个分区上,便于后续统一处理,...Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法对消息做任何处理,但最好不要修改消息的所属topic和分区,否则影响分区计算。...四、无消息丢失配置 Producer采用的是异步发送消息机制,kafkaProducer.send方法仅仅把消息放入缓冲区,由一个专属的I/O线程负责提取缓冲区的消息并封装到batch中,然后发送出去。...导致records1发送失败,同时kafka又配置了重试机制,max.in.flight.requests.per.connection大于1(默认是5),这样会造成消息乱序,而实际场景很多情况需要包装按顺序消费...true,使得内存缓冲区被填满时producer处于阻塞状态,并且停止接受新消息而不是抛出异常。
执行之后的结果: 同样的也能获取结果,同时发现回调的线程并不是上文同步时的 主线程,这样也能证明是异步回调的。...同时回调的时候会传递两个参数: RecordMetadata 和上文一致的消息发送成功后的元数据。 Exception 消息发送过程中的异常信息。...从这里也可以看出为什么之前说发送完成后元数据和异常信息只会出现一个。 Producer 参数解析 发送流程讲完了再来看看 Producer 中比较重要的几个参数。...还是由于网络问题,本来消息已经成功写入了但是没有成功响应给 producer,进行重试时就可能会出现 消息重复。这种只能是消费者进行幂等处理。...配置一个最大 producer 个数。 发送消息时首先获取一个 producer,获取的同时判断是否达到最大上限,没有就新建一个同时保存到内部的 List中,保存时做好同步处理防止并发问题。
同样的也能获取结果,同时发现回调的线程并不是上文同步时的 主线程,这样也能证明是异步回调的。 同时回调的时候会传递两个参数: RecordMetadata 和上文一致的消息发送成功后的元数据。...从这里也可以看出为什么之前说发送完成后元数据和异常信息只会出现一个。 Producer 参数解析 发送流程讲完了再来看看 Producer 中比较重要的几个参数。...还是由于网络问题,本来消息已经成功写入了但是没有成功响应给 producer,进行重试时就可能会出现 消息重复。这种只能是消费者进行幂等处理。...配置一个最大 producer 个数。 发送消息时首先获取一个 producer,获取的同时判断是否达到最大上限,没有就新建一个同时保存到内部的 List中,保存时做好同步处理防止并发问题。...但在过期之前都会处理完剩余的任务。 所以使用哪一个得视情况而定。 总结 本文内容较多,从实例和源码的角度分析了 Kafka 生产者。 希望看完的朋友能有收获,同时也欢迎留言讨论。
Kafka主要被用于两大类应用:1.在应用间构建实时的数据流通道;2.构建传输或处理数据流的实时流式应用。...这个是默认的写法,依赖producer api本身的高可用(配置相关参数后失败了也会重试),且默认就是高吞吐地异步发送。绝大部分情况下数据是会成功的,但是也会有失败的情况。...api本身即提供一定的高可用 吞吐高,默认即异步发送 缺点: 当producer api本身的高可用不可靠时即会出现一些异常的情况,且程序本身很难捕获具体那条数据异常。...三、producer参数调优 1. acks acks=-1 强一致,不会丢数据。吞吐量会下降 acks=0 发过去就完事了,不关心broker是否处理成功,可能丢数据。...acks= 1 当写Leader成功后就返回,其他的replica都是通过fetcher去同步的,所以kafka是异步写,主备切换可能丢数据。
会以 RabbitMQ 和 Kafka 这两个常用的消息系统来说明。 1. Producer 弄丢消息 Producer 向 MQ 发消息,很简单,发过去就完事儿了。...思路很简单,让 MQ 发一个 接受确认声明(ack) 就行了,就像快递需要签收一样。 例如 RabbitMQ,有两种方式可以确保发送消息的安全。...1)事务消息 Producer 发送消息之前,先开启事务,然后再发送。 如果 RabbitMQ 没有正常收到消息,Producer 会收到异常信息,回滚事务。...confirm 模式还有一个回调机制,Producer 可以准备一个失败的接口,供 RabbitMQ 在接收失败时调用。...Producer 收到失败通知,或者超时了,可以执行相应的处理逻辑,例如重发。 confirm 模式是异步的,比事务消息更高效,使用更为广泛。
解决问题的办法也很简单,根据抛出来的具体异常日志进行处理,比如空指针啊什么的。或者直接这个接口里面的逻辑用异步线程处理。...Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender) 异常源码 Sender...判断是否超时的条件是: 【现在的时间 - Batch的创建时间 > 最大交付时间(delivery.timeout.ms) 】 关于本部分异常,强烈建议你先了解一下相关知识图解Kafka Producer...发送/处理Request 时间(包括重试时间)+ 处理Response + UserCallBack 用户回调 > delivery.timeout.ms 假如,发起的Request的目标Node网络异常...优先查看UserCallBack回调接口是不是有性能问题, 建议用异步线程处理回调。
领取专属 10元无门槛券
手把手带您无忧上云