本文简单介绍下kafka0.8的client的producer的实例。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerAddr);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props)
String dataKey = UUID.randomUUID().toString();
String dataValue = UUID.randomUUID().toString();
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
topic,
dataKey,
dataValue
);
producer.send(producerRecord).get();
默认
)producer.send(producerRecord);
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null){
LOGGER.error("send msg to {},error:{}",metadata.topic(),exception);
}
}
});