摘 要
本文将介绍java实现Kafka生产者Producer的简单工具类
kafka:kafka_2.10-0.10.1.1
jdk:1.7
package com.itunic.util;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import net.sf.json.JSONObject;
public class KafkaTools {
/**
*
* 私有静态方法,创建Kafka生产者
*
* @author IG
* @Date 2017年4月14日 上午10:32:32
* @version 1.0.0
* @return KafkaProducer
*/
private static KafkaProducer<String, String> createProducer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "huakServer1:9092,huakServer2:9092");// 声明kafka
// properties.put("value.serializer",
// "org.apache.kafka.common.serialization.ByteArraySerializer");
// properties.put("key.serializer",
// "org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer<String, String>((properties));
}
/**
*
* 传入kafka约定的topicName,json格式字符串,发送给kafka集群
*
* @author IG
* @Date 2017年4月14日 下午1:29:09
* @version 1.0.0
* @param topicName
* @param jsonMessage
*/
public static void sendMessage(String topicName, String jsonMessage) {
KafkaProducer<String, String> producer = createProducer();
producer.send(new ProducerRecord<String, String>(topicName, jsonMessage));
producer.close();
}
/**
*
* 传入kafka约定的topicName,json格式字符串数组,发送给kafka集群<br>
* 用于批量发送消息,性能较高。
*
* @author IG
* @Date 2017年4月14日 下午2:00:12
* @version 1.0.0
* @param topicName
* @param jsonMessages
* @throws InterruptedException
*/
public static void sendMessage(String topicName, String... jsonMessages) throws InterruptedException {
KafkaProducer<String, String> producer = createProducer();
for (String jsonMessage : jsonMessages) {
producer.send(new ProducerRecord<String, String>(topicName, jsonMessage));
}
producer.close();
}
/**
*
* 传入kafka约定的topicName,Map集合,内部转为json发送给kafka集群 <br>
* 用于批量发送消息,性能较高。
*
* @author IG
* @Date 2017年4月14日 下午2:01:18
* @version 1.0.0
* @param topicName
* @param mapMessageToJSONForArray
*/
public static void sendMessage(String topicName, List<Map<Object, Object>> mapMessageToJSONForArray) {
KafkaProducer<String, String> producer = createProducer();
for (Map<Object, Object> mapMessageToJSON : mapMessageToJSONForArray) {
String array = JSONObject.fromObject(mapMessageToJSON).toString();
producer.send(new ProducerRecord<String, String>(topicName, array));
}
producer.close();
}
/**
*
* 传入kafka约定的topicName,Map,内部转为json发送给kafka集群
*
* @author IG
* @Date 2017年4月14日 下午1:30:10
* @version 1.0.0
* @param topicName
* @param mapMessageToJSON
*/
public static void sendMessage(String topicName, Map<Object, Object> mapMessageToJSON) {
KafkaProducer<String, String> producer = createProducer();
String array = JSONObject.fromObject(mapMessageToJSON).toString();
producer.send(new ProducerRecord<String, String>(topicName, array));
producer.close();
}
public static void main(String[] args) throws InterruptedException {
// System.out.println(System.getProperty("file.encoding"));
String[] s = new String[] { "{\"userName\":\"赵四31\",\"pwd\":\"lisi\",\"age\":13}",
"{\"userName\":\"赵四41\",\"pwd\":\"lisi\",\"age\":14}",
"{\"userName\":\"赵四51\",\"pwd\":\"lisi\",\"age\":15}" };
// KafkaTools.sendMessage("logstest",
// "{\"userName\":\"赵四\",\"pwd\":\"lisi\",\"age\":13}");
/*
* for (String a : s) { System.out.println(a); Thread.sleep(3000);
* KafkaTools.sendMessage(topicName, jsonMessages); }
*/
KafkaTools.sendMessage("logstest", s);
}
}