前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka-producer(数据生产)笔记

kafka-producer(数据生产)笔记

原创
作者头像
皮皮熊
发布2019-05-13 01:24:37
1.6K0
发布2019-05-13 01:24:37
举报
文章被收录于专栏:大数据与实时计算

TOC

记录下kafka生产者遇到的一些问题,主要基于0.8/0.9版本的producer api。

一、kafka简介

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统)。Kafka主要被用于两大类应用:1.在应用间构建实时的数据流通道;2.构建传输或处理数据流的实时流式应用。

链接:

【logo图】

image.png
image.png

二、生产者基本实现

1.示意图

image.png
image.png

2.具体实现:

2.1 Fire-and-forget模式

发送消息后不需要逻辑程序关心是否发送成功。这个是默认的写法,依赖producer api本身的高可用(配置相关参数后失败了也会重试),且默认就是高吞吐地异步发送。绝大部分情况下数据是会成功的,但是也会有失败的情况。

官方demo

代码语言:txt
复制
  public Producer(String topic)
  {
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("metadata.broker.list", "localhost:9092");
    // Use random partitioner. Don't need the key type. Just set it to Integer.
    // The message is of type String.
    producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
    this.topic = topic;
  }
  
  public void run() {
    int messageNo = 1;
    while(true)
    {
      String messageStr = new String("Message_" + messageNo);
      producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
      messageNo++;
    }
  }

优点:

  • 写法简单,且producer api本身即提供一定的高可用
  • 吞吐高,默认即异步发送

缺点:

  • 当producer api本身的高可用不可靠时即会出现一些异常的情况,且程序本身很难捕获具体那条数据异常。

2.2 同步模式

参考代码:

代码语言:txt
复制
producer.send(record).get();

即sender()方法后再调用get()方法会同步地等待结果返回,根据结果可以判断是否发送成功。

优点:

  • 可以捕获每批发送记录的情况,并可以在业务层面做一些二次处理

缺点:

  • 吞吐下降严重

2.3 callback模式

代码语言:txt
复制
class DemoProducerCallback implements Callback {
	@Override
	public void onCompletion(RecordMetadata recordMetadata, Exception e) {
		if (e != null) {
			e.printStackTrace();
		}
	}
}
 
producer.send(record, new DemoProducerCallback());

通过callback机制,异步地触发式检查broker返回的结果,从而检查每次发送的结果。

要使用callback函数,先要实现org.apache.kafka.clients.producer.Callback接口,该接口只有一个onCompletion方法。如果发送异常,onCompletion的参数Exception e会为非空。

callback性能损失不容小视,但其吞吐仍然远远大于同步的模式。

【性能对比测试】

总之,我们需要根据我们具体的业务场景实现我们的生产方式。

三、producer参数调优

1. acks

  • acks=-1 强一致,不会丢数据。吞吐量会下降
  • acks=0 发过去就完事了,不关心broker是否处理成功,可能丢数据。
  • acks= 1 当写Leader成功后就返回,其他的replica都是通过fetcher去同步的,所以kafka是异步写,主备切换可能丢数据。

我们现网综合权衡成本效率下,默认使用的是acks= 1。

2. retries

顾名思义,当设置为大于零的值,客户端会重新发送任何发送失败的消息。主要问题:

  • retries 不生效问题(大坑),主要是一些不可重试地异常,如“message size too large”(消息太大)
  • 写入顺序的问题:重试会导致数据写入的无序

3. serializer.class

序列化用到的类。因为一些很复杂的业务问题,我们现网中以string为主。在某些特性开发中,会使用avro。

4. compression.codec

现网一般不压缩(生产机器cpu性能不太好),特性开发场景使用Snappy偏多。

5. batch.num.messages & queue.buffering.max.ms

影响吞吐\实时性的两个指标。

  • batch.num.messages:顾名思义,一批发送的消息数量
  • queue.buffering.max.ms:队列中buff的最大时间数目(数据即使没到batch的量也会发送)

现网中需要详细地压测着两个指标,以达到吞吐和实时性之间的平衡。

四、分区问题

0.8版本的producer会存在要死broker分区的情况,导致kafka多分区之间数据不均匀的情况。

解决方法有两种:

  • api端指定key: key对应分区不存在会出现异常
  • 0.9版本api已经内置相关的算法,直接升级即可

五、其他问题

1. 生产幂等性

2. kafka生产高吞吐原理

六、kafka高可用生产的一种尝试

TBD

参考

《震惊了!原来这才是kafka!》

https://www.jianshu.com/p/d3e963ff8b70

《Kafka基础-生产者发送消息》

https://blog.csdn.net/gangchengzhong/article/details/80745974

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、kafka简介
  • 二、生产者基本实现
    • 1.示意图
      • 2.具体实现:
        • 2.1 Fire-and-forget模式
        • 2.2 同步模式
        • 2.3 callback模式
    • 三、producer参数调优
      • 1. acks
        • 2. retries
          • 3. serializer.class
            • 4. compression.codec
              • 5. batch.num.messages & queue.buffering.max.ms
              • 四、分区问题
              • 五、其他问题
                • 1. 生产幂等性
                  • 2. kafka生产高吞吐原理
                  • 六、kafka高可用生产的一种尝试
                  • 参考
                  相关产品与服务
                  数据库
                  云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档