把消息发送给服务器,但并不关心它是否正常到达。大多数情况下,消息会正常到达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息。
package com.bonc.rdpe.kafka110.producer;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* @Title Producer01.java
* @Description Kafka 生产者发送消息的第一种方式:发送并忘记
* @Author YangYunhe
* @Date 2018-06-21 10:35:34
*/
public class Producer01 {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("acks", "1");
props.put("retries", 3);
props.put("batch.size", 16384); // 16K
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432); // 32M
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String filePath = Producer01.class.getClassLoader().getResource("wechat_data.txt").getPath();
BufferedReader br = new BufferedReader(new FileReader(filePath));
String line;
while((line = br.readLine()) != null) {
// 创建 ProducerRecord 可以指定 topic、partition、key、value,其中 partition 和 key 是可选的
// ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", 0, "key", line);
// ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", "key", line);
ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", line);
// 只管发送消息,不管是否发送成功
producer.send(record);
Thread.sleep(100);
}
producer.close();
}
}
使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待(会返回元数据或者抛出异常), 就可以知道消息是否发送成功。
package com.bonc.rdpe.kafka110.producer;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* @Title Producer02.java
* @Description Kafka 生产者发送消息的第二种方式:同步发送
* @Author YangYunhe
* @Date 2018-06-21 10:38:37
*/
public class Producer02 {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String filePath = Producer02.class.getClassLoader().getResource("wechat_data.txt").getPath();
BufferedReader br = new BufferedReader(new FileReader(filePath));
String line;
while((line = br.readLine()) != null) {
ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", line);
// 程序阻塞,直到该条消息发送成功返回元数据信息或者报错
RecordMetadata metadata = producer.send(record).get();
StringBuilder sb = new StringBuilder();
sb.append("record [").append(line).append("] has been sent successfully!").append("\n")
.append("send to partition ").append(metadata.partition())
.append(", offset = ").append(metadata.offset());
System.out.println(sb.toString());
Thread.sleep(100);
}
producer.close();
}
}
大多数时候,我们并不需要等待响应——尽管 Kafka会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来说不是必需的。
不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志等,这样的情况下可以使用异步发送消息的方式,调用 send() 方法,并指定一个回调函数,服务器在返回响应时调用该函数。
package com.bonc.rdpe.kafka110.producer;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* @Title Producer03.java
* @Description Kafka 生产者发送消息的第三种方式:异步发送
* @Author YangYunhe
* @Date 2018-06-21 11:06:05
*/
public class Producer03 {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String filePath = Producer03.class.getClassLoader().getResource("wechat_data.txt").getPath();
BufferedReader br = new BufferedReader(new FileReader(filePath));
String line;
while((line = br.readLine()) != null) {
ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", line);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
// 如果发送消息成功,返回了 RecordMetadata
if(metadata != null) {
StringBuilder sb = new StringBuilder();
sb.append("message has been sent successfully! ")
.append("send to partition ").append(metadata.partition())
.append(", offset = ").append(metadata.offset());
System.out.println(sb.toString());
}
// 如果消息发送失败,抛出异常
if(e != null) {
e.printStackTrace();
}
}
});
Thread.sleep(100);
}
producer.close();
}
}
在数据量比较大同时对发送消息的顺序没有严格要求时,可以使用多线程的方式发送数据,实现多线程生产者有两种方式:1. 实例化一个 KafkaProducer 对象运行多个线程共享该对象发送消息;2. 实例化多个 KafkaProducer 对象。 由于 Kafka Producer 是线程安全的,所以多个线程共享一个 Kafka Producer 对象在性能上要好很多。
package com.bonc.rdpe.kafka110.thread;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* @Title KafkaProducerThread.java
* @Description 多线程生产者的线程类实现
* @Author YangYunhe
* @Date 2018-06-25 13:54:38
*/
public class KafkaProducerThread implements Runnable {
private KafkaProducer<String, String> producer;
private ProducerRecord<String, String> record;
public KafkaProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
this.producer = producer;
this.record = record;
}
@Override
public void run() {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null) {
System.out.println("exception occurs when sending message: " + exception);
}
if(metadata != null) {
StringBuilder result = new StringBuilder();
result.append("message[" + record.value() + "] has been sent successfully! ")
.append("send to partition ").append(metadata.partition())
.append(", offset = ").append(metadata.offset());
System.out.println(result.toString());
}
}
});
}
}
package com.bonc.rdpe.kafka110.producer;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.bonc.rdpe.kafka110.thread.KafkaProducerThread;
/**
* @Title MultiProducer.java
* @Description 多线程生产者的测试代码
* @Author YangYunhe
* @Date 2018-06-25 14:30:58
*/
public class MultiProducer {
private static final int THREADS_NUMS = 10;
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(THREADS_NUMS);
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record;
try {
for(int i = 0; i < 100; i++) {
record = new ProducerRecord<>("dev3-yangyunhe-topic001", "hello" + i);
executor.submit(new KafkaProducerThread(producer, record));
Thread.sleep(1000);
}
}catch (Exception e) {
System.out.println("exception occurs when sending message: " + e);
}finally {
producer.close();
executor.shutdown();
}
}
}
message[hello0] has been sent successfully! send to partition 1, offset = 705
message[hello1] has been sent successfully! send to partition 0, offset = 705
message[hello2] has been sent successfully! send to partition 2, offset = 704
message[hello3] has been sent successfully! send to partition 1, offset = 706
message[hello4] has been sent successfully! send to partition 0, offset = 706
......
receive.buffer.bytes
send.buffer.bytes
说明:这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。