spring集成kafka

一、添加依赖项

compile 'org.springframework.kafka:spring-kafka:1.2.2.RELEASE'

二、发消息(生产者)

2.1 xml配置

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans
 5          http://www.springframework.org/schema/beans/spring-beans.xsd">
 6 
 7     <bean id="producerProperties" class="java.util.HashMap">
 8         <constructor-arg>
 9             <map>
10                 <!--kafka的服务地址,多个地址用英文逗号连接-->
11                 <entry key="bootstrap.servers" value="192.168.0.10:9092,192.168.0.11:9092,192.168.0.12:9092"/>
12                 <entry key="group.id" value="0"/>
13                 <entry key="retries" value="10"/>
14                 <entry key="batch.size" value="16384"/>
15                 <entry key="linger.ms" value="1"/>
16                 <entry key="buffer.memory" value="33554432"/>
17                 <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>
18                 <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
19             </map>
20         </constructor-arg>
21     </bean>
22 
23     <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
24         <constructor-arg>
25             <ref bean="producerProperties"/>
26         </constructor-arg>
27     </bean>
28 
29     <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
30         <constructor-arg ref="producerFactory"/>
31         <constructor-arg name="autoFlush" value="true"/>
32         <!--topic名字-->
33         <property name="defaultTopic" value="dc-monitor"/>
34     </bean>
35 
36 </beans>

2.2 发送代码示例

    @Test
    public void send() throws InterruptedException, ExecutionException, TimeoutException {
        KafkaTemplate template = context.getBean(KafkaTemplate.class);
        String msg = "中华人民共和国万岁!";
        ListenableFuture<SendResult<String, String>> future = template.sendDefault(msg);
        SendResult<String, String> result = future.get(10, TimeUnit.SECONDS);
        System.out.println("发送成功=====>" + msg);
    }

三、收消息(消费者)

3.1 xml配置

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans
 5          http://www.springframework.org/schema/beans/spring-beans.xsd">
 6 
 7     <bean id="consumerProperties" class="java.util.HashMap">
 8         <constructor-arg>
 9             <map>
10                 <!--kafka的服务地址,多个地址用英文逗号连接-->
11                 <entry key="bootstrap.servers" value="192.168.0.10:9092,192.168.0.11:9092,192.168.0.12:9092"/>
12                 <entry key="group.id" value="0"/>
13                 <entry key="enable.auto.commit" value="true"/>
14                 <entry key="auto.commit.interval.ms" value="1000"/>
15                 <entry key="session.timeout.ms" value="15000"/>
16                 <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
17                 <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
18             </map>
19         </constructor-arg>
20     </bean>
21 
22     <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
23         <constructor-arg ref="consumerProperties"/>
24     </bean>
25 
26     <!-- 实际执行消息消费的类 -->
27     <bean id="kafkaConsumer" class="com.cnblogs.yjmyzz.consumer.DemoKafkaConsumer"/>
28 
29     <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
30         <!--topic名字-->
31         <constructor-arg value="dc-monitor"/>
32         <property name="messageListener" ref="kafkaConsumer"/>
33     </bean>
34 
35     <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
36           init-method="doStart">
37         <constructor-arg ref="consumerFactory"/>
38         <constructor-arg ref="containerProperties"/>
39     </bean>
40 
41 </beans>

3.2 接收代码示例

public class DemoKafkaConsumer implements MessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        System.out.println("收到消息=====>" + data.value());
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏安恒网络空间安全讲武堂

[HCTF] share write up

从http://share.2018.hctf.io/robots.txt中获取到题目部分源码

1012
来自专栏Google Dart

Flutter 构建完整应用手册-联网 顶

从大多数应用程序获取互联网上的数据是必要的。 幸运的是,Dart和Flutter为这类工作提供了工具!

1342
来自专栏高性能服务器开发

+从零实现一款12306刷票软件1.4

这里还有个注意细节,就是通过POST请求发送的数据需要对一些符号做URL Encode,这个我在上一篇文章《从零实现一个http服务器》也详细做了介绍,还不清楚...

2342
来自专栏开发技术

druid抛出的异常------javax.management.InstanceAlreadyExistsException引发的一系列探索

  最近项目中有个定时任务的需求,定时检查mysql数据与etcd数据的一致性,具体实现细节就不说了,今天要说的就是实现过程中遇到了druid抛出的异常,以及解...

2443
来自专栏xingoo, 一个梦想做发明家的程序员

Java程序员的日常—— Spring Boot单元测试

关于Spring boot 之前没有用Spring的时候是用的MockMvc,做接口层的测试,原理上就是加载applicationContext.xml文件,然...

3445
来自专栏Kubernetes

原 深入分析Kubernetes Sche

2054
来自专栏FreeBuf

腾讯御见捕获Flash 0day漏洞(CVE-2018-5002)野外攻击

腾讯御见威胁情报中心近日监控到一例使用Adobe Flash 0day漏洞(CVE-2018-5002)的APT攻击,攻击者疑通过即时聊天工具和邮箱等把恶意Ex...

1190
来自专栏移动端开发

NSURLSession 所有的都在这里(一)

5603
来自专栏hbbliyong

一个实用的却被忽略的命名空间:Microsoft.VisualBasic

  当你看到这个命名空间的时候,别因为是VB的东西就匆忙关掉网页,那将会是您的损失,此命名空间中的资源最初目的是为了简化VB.NET开发而创建的,所以Mic...

3356
来自专栏Kubernetes

原 荐 Kubernetes Resourc

更多关于kubernetes的深入文章,请看我csdn或者oschina的博客主页。 ResoureQuota介绍 关于ResoureQuota和Resourc...

5309

扫码关注云+社区

领取腾讯云代金券