Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:
项目实例:https://github.com/windwant/kafka-demo
kafka.properties
value.serializer=org.apache.kafka.common.serialization.StringSerializer
key.serializer=org.apache.kafka.common.serialization.StringSerializer
request.required.acks=1
bootstrap.servers=localhost:9092
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=test-consumer-group
Producer:
package org.windwant.kafka;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* Producer
*/
public class MyKafkaProducer {
private Properties props;
public static void main(String[] args) throws ConfigurationException {
new MyKafkaProducer().start();
}
public MyKafkaProducer() throws ConfigurationException {
props = new Properties();
PropertiesConfiguration config = new PropertiesConfiguration("kafka.properties");
config.setReloadingStrategy(new FileChangedReloadingStrategy());
//×Ô¶¯±£´æ
config.setAutoSave(true);
props.put("value.serializer", config.getString("value.serializer"));
props.put("key.serializer", config.getString("key.serializer"));
props.put("request.required.acks", config.getString("request.required.acks"));
props.put("bootstrap.servers", config.getString("bootstrap.servers"));
}
public void start(){
try {
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++) {
RecordMetadata result = producer.send(new ProducerRecord<>("mykafka",
"kafka key: " + Integer.toString(i),
"kafka value: " + Integer.toString(i))).get();
System.out.println("producer send: " + result);
Thread.sleep(1000);
}
producer.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
Consumer:
package org.windwant.kafka;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* Consumer.
*/
public class MyKafkaConsumer {
private Properties props;
public static void main(String[] args) throws ConfigurationException {
new MyKafkaConsumer().start();
}
public MyKafkaConsumer() throws ConfigurationException {
props = new Properties();
PropertiesConfiguration config = new PropertiesConfiguration("kafka.properties");
config.setReloadingStrategy(new FileChangedReloadingStrategy());
//自动保存
config.setAutoSave(true);
props.put("value.deserializer", config.getString("value.deserializer"));
props.put("key.deserializer", config.getString("key.deserializer"));
props.put("bootstrap.servers", config.getString("bootstrap.servers"));
props.put("group.id", config.getString("group.id"));
}
public void start(){
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("mykafka"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
System.out.println();
}
}
}
}