本文节选自电子书《Netkiller Java 手札》
Spring boot 1.5.1
一下安装仅仅适合开发环境,生产环境请使用这个脚本安装 https://github.com/oscm/shell/tree/master/mq/kafka
cd /usr/local/src
wget http://apache.communilink.net/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
tar zxvf kafka_2.12-0.10.2.0.tgz
mv kafka_2.12-0.10.2.0 /srv/
cp /srv/kafka_2.12-0.10.2.0/config/server.properties{,.original}
echo "advertised.host.name=localhost" >> /srv/kafka_2.12-0.10.2.0/config/server.properties
ln -s /srv/kafka_2.12-0.10.2.0 /srv/kafka
启动 Kafka 服务
/srv/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties
/srv/kafka/bin/kafka-server-start.sh /srv/kafka/config/server.properties
-daemon 表示守护进程方式在后台启动
/srv/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
/srv/kafka/bin/kafka-server-start.sh -daemon /srv/kafka/config/server.properties
停止 Kafka 服务
/srv/kafka/bin/kafka-server-stop.sh
/srv/kafka/bin/zookeeper-server-stop.sh
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
package cn.netkiller;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
package cn.netkiller.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
public KafkaConsumerConfig() {
// TODO Auto-generated constructor stub
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
// factory.setConcurrency(1);
// factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<String, Object>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return propsMap;
}
@Bean
public Listener listener() {
return new Listener();
}
}
package cn.netkiller.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Logger;
public class Listener {
public Listener() {
// TODO Auto-generated constructor stub
}
protected Logger logger = Logger.getLogger(Listener.class.getName());
public CountDownLatch getCountDownLatch1() {
return countDownLatch1;
}
private CountDownLatch countDownLatch1 = new CountDownLatch(1);
@KafkaListener(topics = "test")
public void listen(ConsumerRecord<?, ?> record) {
logger.info("Received message: " + record.toString());
System.out.println("Received message: " + record);
countDownLatch1.countDown();
}
}
$ cd /srv/kafka
$ bin/kafka-console-producer.sh --broker-list 47.89.35.55:9092 --topic test
This is test message.
每输入一行回车后发送到你的Spring boot kafka 程序