看完本文你将学会以下知识:
大概流程如下图:
kafka.png
大致了解了生产者工作的流程,我们就来看看一个生产者是怎么创建的把!
kafka-console-producer.sh --broker-list localhost:9092 --topic test
,接着就可以通过向控制台输入数据来给kafka生产了,当然,这个没太多实际意义,一般也就用来测试测试。<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++){
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))
,new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//todo something when complete }
});
}
//all done close
producer.close();
是不是觉得很简单?虽然使用起来是很简单,但是要使用好也不是那么容易噢。。。这里请注意以下几点:
1、一定要记得close
producer,以免造成资源浪费
2、send()
是异步的,所以上面的代码是有点问题的,producer.close();
应该在合适的机会调用,而不是代码末尾
3、如果你想使用同步发送,那么只需要简单的producer.send().get()
使用get()
函数就可以了
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
KafkaProducer<String, String> producer
= new KafkaProducer<String, String>(props, new StringSerializer(), new StringSerializer());
//begining transaction
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++){
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
1、每个生产者只能有一个打开的事务,并且需要配置好transactional.id
。
2、beginTransaction()
和commitTransaction()
调用之间发送的所有消息都是单个事务的一部分。
3、send()
不需要指定回调函数,也不需要使用get()
,因为事务是统一处理的,当事务发生错误可以通过KafkaException
来捕获进行处理
ok!上面就是kafka生产者的创建部分内容了,也基本该了解kafka生产者的使用了,为了更好的使用它,我们有必要对它的相关配置来进行详细了解。
send()
方法调用要么被阻塞,要么抛出异常,取决于如何设置 block.on.buffer.full
参数(在 0.9.0.0 版本里被替换成了max.block.ms
,表示在抛出异常之前可以阻塞一段时间)snappy
、gzip
或lz4
,它指定了消息被发送给 broker 之前使用哪一种压缩算法进行压缩。使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
retries
参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。retry.backoff.ms
参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出 Leader 需要多长时间),让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。当批次被填满 或者 等待时间达到
linger.ms
设置的间隔时间,批次里的所有消息会被发送出去,哪怕此时该批次只有一条消息。 所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。
如何保证顺序性:如果把 retries 设为非零整数,同时把
max.in.flight.requests.per.connection
设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,broker 会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。 一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把retries
设为 0。可以把max.in.flight.requests.per.connection
设为 1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。
message.max.bytes
),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。注意区分
batch.size
只是针对一个 topic 的 partition,而max.request.size
针对单次请求的。
关于更多的配置信息,可以查看:http://kafka.apachecn.org/documentation.html#configuration
通过上面的一些讲解,应该已经可以比较友好的使用 kafka生产者了,接下来我们还剩下最后一个部分,kafka的分区
从第一个部分 kafka数据生产流程 我们知道,分区我们是可以自己指定的,也可以是使用默认的分区器。
/**
* Creates a record to be sent to a specified topic and partition
*
* @param topic The topic the record will be appended to
* @param partition The partition to which the record should be sent
* @param key The key that will be included in the record
* @param value The record contents
*/
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}
/**
* org.apache.kafka.clients.producer.internals.DefaultPartitioner
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
//记录了 topic 写入消息的数量,并返回本条消息是第`nextValue`条。
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
阅读源码是件比较需要耐心的事情,但是解释源码,会让人欲仙欲死,这里我只解释下上面代码的几个方法做了什么,具体流程,烦您自己耐心看看咯
nextValue(topic)
的作用就是根据 topic 上一次写入 partition 的序号,返回一个 +1 的序号,并记录。说简单点,其实也就是记录了这个 topic 写入消息的数量,并告诉本条消息你是第几条。int
,并将其二进制数据的首位进行去 0 操作关于 默认的分区器 就到这了,相当的简单,总结下就是,分区可以自己指定,一般在一些数据倾斜的时候发生,大多数时候都是使用的默认分区器,默认分区器会根据 key 进行分区,如果 key=null,会采取轮询的方式进行分区,否则则根据 key 的 hash,进行散列的随机分区。
当然,可能还缺一个自定义分区器,不过,相信这个东西肯定难不倒亲爱的你,这里就不哔哩哔哩了哈