Spring boot with Apache Kafka

本文节选自电子书《Netkiller Java 手札》

5.21. Spring boot with Apache Kafka

Spring boot 1.5.1

5.21.1. 安装 kafka

一下安装仅仅适合开发环境,生产环境请使用这个脚本安装 https://github.com/oscm/shell/tree/master/mq/kafka

cd /usr/local/src
wget http://apache.communilink.net/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
tar zxvf kafka_2.12-0.10.2.0.tgz
mv kafka_2.12-0.10.2.0 /srv/
cp /srv/kafka_2.12-0.10.2.0/config/server.properties{,.original}
echo "advertised.host.name=localhost" >> /srv/kafka_2.12-0.10.2.0/config/server.properties
ln -s /srv/kafka_2.12-0.10.2.0 /srv/kafka		

启动 Kafka 服务

/srv/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties
/srv/kafka/bin/kafka-server-start.sh /srv/kafka/config/server.properties			

-daemon 表示守护进程方式在后台启动

/srv/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
/srv/kafka/bin/kafka-server-start.sh -daemon /srv/kafka/config/server.properties			

停止 Kafka 服务

/srv/kafka/bin/kafka-server-stop.sh
/srv/kafka/bin/zookeeper-server-stop.sh			

5.21.2. maven

		<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>			

5.21.3. Spring boot Application

			package cn.netkiller;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan
@EnableScheduling
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);

	}
}			

5.21.4. EnableKafka

			package cn.netkiller.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

	public KafkaConsumerConfig() {
		// TODO Auto-generated constructor stub
	}

	@Bean
	KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
		factory.setConsumerFactory(consumerFactory());
		// factory.setConcurrency(1);
		// factory.getContainerProperties().setPollTimeout(3000);
		return factory;
	}

	@Bean
	public ConsumerFactory<String, String> consumerFactory() {
		return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
	}

	@Bean
	public Map<String, Object> consumerConfigs() {
		Map<String, Object> propsMap = new HashMap<String, Object>();
		propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");
		propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
		propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
		propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
		propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
		propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
		propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
		propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return propsMap;
	}

	@Bean
	public Listener listener() {
		return new Listener();
	}

}			

5.21.5. KafkaListener

			package cn.netkiller.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Logger;

public class Listener {

	public Listener() {
		// TODO Auto-generated constructor stub
	}

	protected Logger logger = Logger.getLogger(Listener.class.getName());

	public CountDownLatch getCountDownLatch1() {
		return countDownLatch1;
	}

	private CountDownLatch countDownLatch1 = new CountDownLatch(1);

	@KafkaListener(topics = "test")
	public void listen(ConsumerRecord<?, ?> record) {
		logger.info("Received message: " + record.toString());
		System.out.println("Received message: " + record);
		countDownLatch1.countDown();
	}
}			

5.21.6. 测试

			$ cd /srv/kafka
$ bin/kafka-console-producer.sh --broker-list 47.89.35.55:9092 --topic test
This is test message.			

每输入一行回车后发送到你的Spring boot kafka 程序

原文发布于微信公众号 - Netkiller(netkiller-ebook)

原文发表时间:2017-03-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏林欣哲

Spring Boot 知识点速记

本文是学习2小时学会Spring Boot和Spring Boot进阶之Web进阶的SpringBoot常用知识点速记。 SpringBoot前置知识: Spr...

3087
来自专栏非著名程序员

Retrofit OKHttp 教你怎么持久化管理Cookie

? 投稿作者:黄海杰 原文链接: http://blog.csdn.net/lyhhj/article/details/51345386 绪论 最近小编有点...

31310
来自专栏pangguoming

springboot的Web开发-Web相关配置

一:Spring Boot提供自动配置        通过查看WebMvcAutoConfiguration及WebMvcProperties的源码,可以发现S...

8688
来自专栏码农笔录

java根据ip地址获取城市地域信息

843
来自专栏JAVA高级架构

【原创】自己动手写一个服务网关

1282
来自专栏西安-晁州

spring-boot之简单定时任务

首先是pom.xml依赖: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http...

2020
来自专栏一枝花算不算浪漫

[Spring框架]Spring JDBCTmplate基础入门总结.

3178
来自专栏ImportSource

Spring5以来注册Bean的各种姿势,特别最后的纯编码注册值得尝试

各位好,今天我们的内容是有关Spring 5以来有关注册bean的几种方式。前面两三个是比较常用的方式,最后两种是只有在特殊的场合下才会被用到和想到。我们会分别...

4616
来自专栏大大的微笑

java使用mina和websocket通信

这里以mina整合springMVC为例: //springMVC的配置: <!-- mina --> <bean class="org.spring...

80410
来自专栏吴小龙同學

Android 进程间通信

什么鬼!单例居然失效了,一个地方设置值,另个地方居然取不到,这怎么可能?没道理啊!排查半天,发现这两就不在一个进程里,才恍然大悟…… 什么是进程 按照操作...

2324

扫描关注云+社区