上面的例子仅仅是做了一个热身,现在我们将实现 一个完整的例子。
例 5.5. Spring boot with Apache kafka.
SpringApplication
package cn.netkiller;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
//import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
//import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan
// @EnableMongoRepositories
// @EnableJpaRepositories
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Consumer configuration
package cn.netkiller.kafka.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import cn.netkiller.kafka.consumer.Consumer;
@Configuration
@EnableKafka
public class ConsumerConfiguration {
public ConsumerConfiguration() {
// TODO Auto-generated constructor stub
}
@Bean
public Map<String, Object> consumerConfigs() {
HashMap<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections
// to the Kakfa cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// consumer groups allow a pool of processes to divide the work of
// consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Consumer receiver() {
return new Consumer();
}
}
Producer configuration
package cn.netkiller.kafka.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import cn.netkiller.kafka.producer.Producer;
@Configuration
public class ProducerConfiguration {
public ProducerConfiguration() {
// TODO Auto-generated constructor stub
}
@Bean
public Map<String, Object> producerConfigs() {
HashMap<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections
// to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// value to block, after which it will throw a TimeoutException
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<String, String>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
@Bean
public Producer sender() {
return new Producer();
}
}
Consumer
package cn.netkiller.kafka.consumer;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
public class Consumer {
public Consumer() {
// TODO Auto-generated constructor stub
}
private static final Logger logger = LoggerFactory
.getLogger(Consumer.class);
private CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(topics = "helloworld.t")
public void receiveMessage(String message) {
logger.info("received message='{}'", message);
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
Producer
package cn.netkiller.kafka.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
/*
* public Sender() { // TODO Auto-generated constructor stub }
*/
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
// the KafkaTemplate provides asynchronous send methods returning a
// Future
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
// you can register a callback with the listener to receive the result
// of the send asynchronously
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
logger.info("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
logger.error("unable to send message='{}'", message, ex);
}
});
// alternatively, to block the sending thread, to await the result,
// invoke the future’s get() method
}
}
Controller
package cn.netkiller.web;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import cn.netkiller.kafka.consumer.Consumer;
import cn.netkiller.kafka.producer.Producer;
@Controller
@RequestMapping("/test")
public class KafkaTestController {
private static final Logger logger = LoggerFactory.getLogger(IndexController.class);
public KafkaTestController() {
// TODO Auto-generated constructor stub
}
@Autowired
private Producer sender;
@Autowired
private Consumer receiver;
@RequestMapping("/ping")
@ResponseBody
public String ping() {
String message = "PONG";
return message;
}
@RequestMapping("/kafka/send")
@ResponseBody
public String testReceiver() throws Exception {
sender.sendMessage("helloworld.t", "Hello Spring Kafka!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
logger.info(receiver.getLatch().getCount() + "");
return "OK";
}
}
SpringBootTest
package cn.netkiller;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import cn.netkiller.kafka.consumer.Consumer;
import cn.netkiller.kafka.producer.Producer;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringKafkaApplicationTests {
public SpringKafkaApplicationTests() {
// TODO Auto-generated constructor stub
}
@Autowired
private Producer sender;
@Autowired
private Consumer receiver;
@Test
public void testReceiver() throws Exception {
sender.sendMessage("helloworld.t", "Hello Spring Kafka!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(receiver.getLatch().getCount()).isEqualTo(0);
}
}