Spring boot with Apache Kafka

本文节选自电子书《Netkiller Java 手札》

5.21. Spring boot with Apache Kafka

Spring boot 1.5.1

5.21.1. 安装 kafka

一下安装仅仅适合开发环境,生产环境请使用这个脚本安装 https://github.com/oscm/shell/tree/master/mq/kafka

cd /usr/local/src
wget http://apache.communilink.net/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
tar zxvf kafka_2.12-0.10.2.0.tgz
mv kafka_2.12-0.10.2.0 /srv/
cp /srv/kafka_2.12-0.10.2.0/config/server.properties{,.original}
echo "advertised.host.name=localhost" >> /srv/kafka_2.12-0.10.2.0/config/server.properties
ln -s /srv/kafka_2.12-0.10.2.0 /srv/kafka		

启动 Kafka 服务

/srv/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties
/srv/kafka/bin/kafka-server-start.sh /srv/kafka/config/server.properties			

-daemon 表示守护进程方式在后台启动

/srv/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
/srv/kafka/bin/kafka-server-start.sh -daemon /srv/kafka/config/server.properties			

停止 Kafka 服务

/srv/kafka/bin/kafka-server-stop.sh
/srv/kafka/bin/zookeeper-server-stop.sh			

5.21.2. maven

		<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>			

5.21.3. Spring boot Application

			package cn.netkiller;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan
@EnableScheduling
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);

	}
}			

5.21.4. EnableKafka

			package cn.netkiller.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

	public KafkaConsumerConfig() {
		// TODO Auto-generated constructor stub
	}

	@Bean
	KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
		factory.setConsumerFactory(consumerFactory());
		// factory.setConcurrency(1);
		// factory.getContainerProperties().setPollTimeout(3000);
		return factory;
	}

	@Bean
	public ConsumerFactory<String, String> consumerFactory() {
		return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
	}

	@Bean
	public Map<String, Object> consumerConfigs() {
		Map<String, Object> propsMap = new HashMap<String, Object>();
		propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");
		propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
		propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
		propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
		propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
		propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
		propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
		propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return propsMap;
	}

	@Bean
	public Listener listener() {
		return new Listener();
	}

}			

5.21.5. KafkaListener

			package cn.netkiller.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Logger;

public class Listener {

	public Listener() {
		// TODO Auto-generated constructor stub
	}

	protected Logger logger = Logger.getLogger(Listener.class.getName());

	public CountDownLatch getCountDownLatch1() {
		return countDownLatch1;
	}

	private CountDownLatch countDownLatch1 = new CountDownLatch(1);

	@KafkaListener(topics = "test")
	public void listen(ConsumerRecord<?, ?> record) {
		logger.info("Received message: " + record.toString());
		System.out.println("Received message: " + record);
		countDownLatch1.countDown();
	}
}			

5.21.6. 测试

			$ cd /srv/kafka
$ bin/kafka-console-producer.sh --broker-list 47.89.35.55:9092 --topic test
This is test message.			

每输入一行回车后发送到你的Spring boot kafka 程序

原文发布于微信公众号 - Netkiller(netkiller-ebook)

原文发表时间:2017-03-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏cloudskyme

JArgs命令行选项解析->Java套件

项目简介和意图 这个小的工程是为java开发者提供的,使用命令行方便的,结构紧凑的解析器工具。 public class OptionTest { priva...

3555
来自专栏程序之美

spring websocket推送

1244
来自专栏你不就像风一样

Jsoup模拟登录带验证码的教务系统(原理详解)

在模拟登陆该教务系统时,笔者观察到该教务系统还有一个不需要验证码即可登陆的网址:http://jwxt.qlu.edu.cn/jsxsd/xsxk/xklc_l...

662
来自专栏Pulsar-V

OpenCV在ubuntu下的编译

opencv的编译 下面我们写一个shell命名为build.sh放在opencv的根目录下面,代码如下: mkdir $1 cd $1 cmake -DWIT...

2835
来自专栏Android开发指南

6.请求网络步骤

34211
来自专栏扎心了老铁

springboot kafka集成(实现producer和consumer)

本文介绍如何在springboot项目中集成kafka收发message。 1、先解决依赖 springboot相关的依赖我们就不提了,和kafka相关的只依赖...

6365
来自专栏Linyb极客之路

spring系列之自定义扩展PropertyPlaceHolderConfigurer

一、PropertyPlaceHolderConfigurer介绍 主要用于将一些配置信息移出xml文件,移到至properties文件 二、拓展使用 1、将...

4455
来自专栏别先生

Hibernate的CRUD以及junit测试

Hibernate的CRUD以及junit测试 1:第一步创建动态工程引包,省略。 2:第二步,创建数据库和数据表,省略。 3:第三步,创建实体类,如User...

1888
来自专栏纯洁的微笑

springboot(十八):使用Spring Boot集成FastDFS

上篇文章介绍了《如何使用Spring Boot上传文件》,这篇文章我们介绍如何使用Spring Boot将文件上传到分布式文件系统FastDFS中。 这个项目会...

3614
来自专栏Hadoop实操

如何使用Java代码访问CDH的Solr服务

CDH集群使用的Solr版本为4.10.3,Java开发中会经常使用到solrj客户端包访问Solr集群。本篇文章主要讲述如何使用Java代码访问Kerbero...

5856

扫码关注云+社区