TOC
记录下kafka生产者遇到的一些问题,主要基于0.8/0.9版本的producer api。
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统)。Kafka主要被用于两大类应用:1.在应用间构建实时的数据流通道;2.构建传输或处理数据流的实时流式应用。
链接:
【logo图】
发送消息后不需要逻辑程序关心是否发送成功。这个是默认的写法,依赖producer api本身的高可用(配置相关参数后失败了也会重试),且默认就是高吞吐地异步发送。绝大部分情况下数据是会成功的,但是也会有失败的情况。
public Producer(String topic)
{
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "localhost:9092");
// Use random partitioner. Don't need the key type. Just set it to Integer.
// The message is of type String.
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
this.topic = topic;
}
public void run() {
int messageNo = 1;
while(true)
{
String messageStr = new String("Message_" + messageNo);
producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
messageNo++;
}
}
优点:
缺点:
参考代码:
producer.send(record).get();
即sender()方法后再调用get()方法会同步地等待结果返回,根据结果可以判断是否发送成功。
优点:
缺点:
class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
producer.send(record, new DemoProducerCallback());
通过callback机制,异步地触发式检查broker返回的结果,从而检查每次发送的结果。
要使用callback函数,先要实现org.apache.kafka.clients.producer.Callback接口,该接口只有一个onCompletion方法。如果发送异常,onCompletion的参数Exception e会为非空。
callback性能损失不容小视,但其吞吐仍然远远大于同步的模式。
【性能对比测试】
总之,我们需要根据我们具体的业务场景实现我们的生产方式。
我们现网综合权衡成本效率下,默认使用的是acks= 1。
顾名思义,当设置为大于零的值,客户端会重新发送任何发送失败的消息。主要问题:
序列化用到的类。因为一些很复杂的业务问题,我们现网中以string为主。在某些特性开发中,会使用avro。
现网一般不压缩(生产机器cpu性能不太好),特性开发场景使用Snappy偏多。
影响吞吐\实时性的两个指标。
现网中需要详细地压测着两个指标,以达到吞吐和实时性之间的平衡。
0.8版本的producer会存在要死broker分区的情况,导致kafka多分区之间数据不均匀的情况。
解决方法有两种:
TBD
《震惊了!原来这才是kafka!》
https://www.jianshu.com/p/d3e963ff8b70
《Kafka基础-生产者发送消息》
https://blog.csdn.net/gangchengzhong/article/details/80745974
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。