前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spring整合中间件(kafka、RabbitMQ、ActiveMQ、ZeroMQ、TubeMQ、NSQ)-kafka

spring整合中间件(kafka、RabbitMQ、ActiveMQ、ZeroMQ、TubeMQ、NSQ)-kafka

作者头像
逍遥壮士
发布2021-04-13 14:41:26
7980
发布2021-04-13 14:41:26
举报
文章被收录于专栏:技术趋势技术趋势

上文:spring整合中间件(RocketMQ、kafka、RabbitMQ、ActiveMQ、ZeroMQ、TubeMQ、NSQ)


关于kafka的部署可以参考另外的文章:kafka环境安装-基于windows


本文代码:https://gitee.com/hong99/spring/issues/I1N1DF


kafka-java开发

项目结构

│  java_kafkamq.iml
│  pom.xml
│
└─src
    ├─main
    │  ├─java
    │  │  └─com
    │  │      └─hong
    │  │          └─kafka
    │  │                  CreateTopic.java
    │  │                  MyKafkaConsumer.java
    │  │                  MyKafkaProducer.java
    │  │
    │  └─resources
    └─test
        └─java

com.hong.kafka.CreateTopic

package com.hong.kafka;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @author: csh
 * @Date: 2021/3/25 14:02
 * @Description:创建topic
 */
public class CreateTopic {
    public static void main(String[] args) {
        //创建topic
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        AdminClient adminClient = AdminClient.create(props);
        ArrayList<NewTopic> topics = new ArrayList <NewTopic>();
        NewTopic newTopic = new NewTopic("hong3", 1, (short) 3);
        topics.add(newTopic);
        CreateTopicsResult result = adminClient.createTopics(topics);
        try {
            result.all().get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

com.hong.kafka.MyKafkaConsumer

package com.hong.kafka;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

/**
 * @author: csh
 * @Date: 2021/3/25 14:11
 * @Description:
 */
public class MyKafkaConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        //设置服务地址
        props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        //组id
        props.put("group.id", "1,2,3");
        //开启自动确认
        props.put("enable.auto.commit", "true");
        //自动提交1秒
        props.put("auto.commit.interval.ms", "1000");
        //key 序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //值 序列化
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        final KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props);
        //监听topic为hong3
        consumer.subscribe(Arrays.asList("hong3"), new ConsumerRebalanceListener() {
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }

            public void onPartitionsAssigned(Collection <TopicPartition> collection) {
                //将偏移设置到最开始
                consumer.seekToBeginning(collection);
            }
        });
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("收到的值:"+"offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }

        }
    }
}

com.hong.kafka.MyKafkaProducer

package com.hong.kafka;


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @author: csh
 * @Date: 2021/3/25 13:58
 * @Description:kafka生产者
 */
public class MyKafkaProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer <String, String> producer = new KafkaProducer<String,String>(props);
        for (int i = 0; i < 1000; i++){
            producer.send(new ProducerRecord <String, String>("hong3", Integer.toString(i), "发送的值是:"+Integer.toString(i)));
        }
        producer.close();
    }

}

spring_mq/java_kafkamq/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_mq</artifactId>
        <groupId>com.hong</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>java_kafkamq</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.0</version>
        </dependency>
    </dependencies>
</project>

运行生产,再运行消费,结果如下:

springmvc整合kafka

实现用户通过调用http接口,生产端发送mq消息给消费端进行Mybatis添加数据到库中。

spring_kafka_consumer 消费端 端口:8089

spring_kafka_producer 生产端 端口:8088

spring_kafka_producer

│  pom.xml
│  spring_kafka_producer.iml
│
├─src
│  ├─main
│  │  ├─java
│  │  │  └─com
│  │  │      └─hong
│  │  │          └─spring
│  │  │              ├─config
│  │  │              │      TopicAll.java
│  │  │              │
│  │  │              ├─controller
│  │  │              │  │  UserController.java
│  │  │              │  │
│  │  │              │  └─ao
│  │  │              │          UserSaveAO.java
│  │  │              │
│  │  │              └─listener
│  │  │                      KafkaProducerListener.java
│  │  │
│  │  └─resources
│  │          application.properties
│  │          applicationContext.xml
│  │          kafka.properties
│  │          kafka.xml
│  │          log4j2.xml
│  │          logging.properties
│  │
│  └─test
│      └─java
└─web
    └─WEB-INF
            web.xml

WEB-INF/web.xml

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
         version="3.1">
    <servlet>
        <servlet-name>spring_kafka_producer</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:applicationContext.xml,
                classpath:kafka.xml
            </param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <servlet-mapping>
        <servlet-name>spring_kafka_producer</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>
</web-app>

spring_mq/spring_kafka_producer/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_mq</artifactId>
        <groupId>com.hong</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spring_kafka_producer</artifactId>

    <dependencies>
        <dependency>
            <artifactId>spring_mq_common_api</artifactId>
            <version>1.0-SNAPSHOT</version>
            <groupId>com.hong</groupId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.3.11.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.1</version>
        </dependency>
    </dependencies>

    <!--静态资源导出问题-->
    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>
</project>

logging.properties

org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
    <appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
        <RollingFile name="RollingFile" fileName="logs/app.log"
                     filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
            <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
            <SizeBasedTriggeringPolicy size="5 MB"/>
        </RollingFile>
    </appenders>
    <loggers>
        <root level="DEBUG">
            <appender-ref ref="Console"/>
            <appender-ref ref="RollingFile"/>
        </root>
    </loggers>
</configuration>

kafka.xml

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd" default-lazy-init="true">

   <!-- 定义producer的参数 -->
   <bean id="producerProperties" class="java.util.HashMap">
      <constructor-arg>
         <map>
            <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
            <entry key="group.id" value="${kafka.group.id}"/>
            <entry key="retries" value="${kafka.retries}" />
            <entry key="batch.size" value="${kafka.batch.size}" />
            <entry key="linger.ms" value="1" />
            <entry key="buffer.memory" value="${kafka.buffer.memory}" />
            <entry key="key.serializer"
                  value="org.apache.kafka.common.serialization.StringSerializer" />
            <entry key="value.serializer"
                  value="org.apache.kafka.common.serialization.StringSerializer" />
         </map>
      </constructor-arg>
   </bean>

    <!-- 创建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties"/>
        </constructor-arg>
    </bean>
    <!--监听器-->
   <bean id="kafkaProducerListener" class="com.hong.spring.listener.KafkaProducerListener" />


   <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
   <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
      <constructor-arg ref="producerFactory" />
      <constructor-arg name="autoFlush" value="true" />
      <!--默认topic-->
      <property name="defaultTopic" value="${kafka.producer.defaultTopic}" />
      <property name="producerListener" ref="kafkaProducerListener"/>

   </bean>
</beans>

kafka.properties

#集群配置
kafka.bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
#组配置
kafka.group.id=1,2,3
#尝试次数
kafka.retries=3
#批次大小
kafka.batch.size=16384
#内存大小
kafka.buffer.memory=33554432
#默认topic
kafka.producer.defaultTopic=hong

applicationContext.xml

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
      xmlns:mvc="http://www.springframework.org/schema/mvc"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">


   <!-- 配置组件扫描 -->
   <context:component-scan base-package="com.hong.spring"></context:component-scan>
   <!--加载配置文件-->
   <context:property-placeholder location="classpath:kafka.properties"/>

   <!-- 开启注解 -->
   <context:annotation-config />

   <mvc:default-servlet-handler />


   <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
        id="internalResourceViewResolver">
      <!-- 前缀 -->
      <property name="prefix" value="/WEB-INF/pages/" />
      <!-- 后缀 -->
      <property name="suffix" value=".html" />
      <property name="contentType" value="text/html"/>
   </bean>

   <!--开启mvc注解事务-->
   <!-- 定义注解驱动 -->
   <mvc:annotation-driven>
      <mvc:message-converters>
         <!-- 设置支持中文 -->
         <bean class="org.springframework.http.converter.StringHttpMessageConverter">
            <property name="supportedMediaTypes">
               <list>
                  <value>text/plain;charset=UTF-8</value>
                  <value>text/html;charset=UTF-8</value>
               </list>
            </property>
         </bean>
         <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
      </mvc:message-converters>
   </mvc:annotation-driven>
</beans>

application.properties

logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR

com.hong.spring.listener.KafkaProducerListener

package com.hong.spring.listener;

import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
/**
 *
 * 功能描述: kafka生产监听
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/3/29 14:47
 */
@Log4j2
public class KafkaProducerListener implements ProducerListener {
    /**
     *
     * 功能描述: 成功响应
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/3/29 14:46
     */
    public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
        log.info("==========kafka发送数据成功(日志开始)==========");
        log.info("----------topic:"+topic);
        log.info("----------partition:"+partition);
        log.info("----------key:"+key);
        log.info("----------value:"+value);
        log.info("----------RecordMetadata:"+recordMetadata);
        log.info("~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~");
    }

    /**
     * 发送消息错误后调用
     */
    public void onError(String topic, Integer partition, Object key, Object value, Exception exception) {
        log.info("==========kafka发送数据错误(日志开始)==========");
        log.info("----------topic:"+topic);
        log.info("----------partition:"+partition);
        log.info("----------key:"+key);
        log.info("----------value:"+value);
        log.info("----------Exception:"+exception);
        log.info("~~~~~~~~~~kafka发送数据错误(日志结束)~~~~~~~~~~");
        exception.printStackTrace();

    }

    /**
     * 方法返回值代表是否启动kafkaProducer监听器
     */
    public boolean isInterestedInSuccess() {
        log.info("kafkaProducer监听器启动");
        return true;
    }
}

com.hong.spring.controller.UserController

package com.hong.spring.controller;

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.config.TopicAll;
import com.hong.spring.controller.ao.UserSaveAO;
import com.hong.spring.entity.User;
import com.hong.spring.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.BeanUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @Auther: csh
 * @Date: 2020/8/18 16:11
 * @Description:
 */
@RestController
@RequestMapping("/user/")
@Log4j2
public class UserController {

    @Resource
    private KafkaTemplate kafkaTemplate;


    @RequestMapping("save")
    public DataResponse<Boolean> save(UserSaveAO ao){
        log.info("添加用户入参{}",JSONObject.toJSONString(ao));
        if(null==ao){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
       try {
           User user =new User();
           BeanUtils.copyProperties(ao,user);
           //在正式的时候 可以将通tags再正式区分不同的业务 比如:更新 、新增 等
           kafkaTemplate.send(TopicAll.HONG_TOPIC, JSONObject.toJSONString(user));
           return DataResponse.BuildFailResponse("添加用户成功!");
       }catch (Exception e){
           log.error("添加出错{}",e);
           return DataResponse.BuildFailResponse("添加出错请重试!");
       }
    }
}

com.hong.spring.controller.ao.UserSaveAO

package com.hong.spring.controller.ao;

import lombok.Data;

import java.io.Serializable;

/**
 * @author: csh
 * @Date: 2021/3/16 11:21
 * @Description:用户入参
 */
@Data
public class UserSaveAO implements Serializable {
    private Integer id;
    private String username;
    private Integer age;
}

com.hong.spring.config.TopicAll

package com.hong.spring.config;

/**
 * @author: csh
 * @Date: 2021/3/16 14:15
 * @Description:存放所有的topic
 */
public class TopicAll {
    //用户topic
    public static final String HONG_TOPIC="kafka_user_topic";
}

部署后发送消息:http://localhost:8088/user/save?username=spring_kafka_hong&age=1111

username:spring_kafka_hong
age:1111

spring_kafka_consumer

WEB-INF/web.xml

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
         version="3.1">
    <servlet>
        <servlet-name>spring_kafka_consumer</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:applicationContext.xml,
                classpath:kafka.xml
            </param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <servlet-mapping>
        <servlet-name>spring_kafka_consumer</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>
</web-app>

spring_mq/spring_kafka_consumer/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_mq</artifactId>
        <groupId>com.hong</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spring_kafka_consumer</artifactId>
    <dependencies>
        <dependency>
            <artifactId>spring_mq_common_api</artifactId>
            <version>1.0-SNAPSHOT</version>
            <groupId>com.hong</groupId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.3.11.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.1</version>
        </dependency>
    </dependencies>

    <!--静态资源导出问题-->
    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>6</source>
                    <target>6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

mybatis.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
        PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>

    <!-- settings -->
    <settings>
        <!-- 打开延迟加载的开关 -->
        <setting name="lazyLoadingEnabled" value="true"/>
        <!-- 将积极加载改为消极加载(即按需加载) -->
        <setting name="aggressiveLazyLoading" value="false"/>
        <!-- 打开全局缓存开关(二级缓存)默认值就是 true -->
        <setting name="cacheEnabled" value="true"/>
        <!-- 开启驼峰命名转换 Table(create_time) -> Entity(createtime) -->
        <setting name="mapUnderscoreToCamelCase" value="true"/>
        <!-- 使用列别名代替列名 默认:true seslect name as title from table -->
        <setting name="useColumnLabel" value="true"/>
        <!--使用jdbc的getGeneratedKeys获取数据库自增主键值-->
        <setting name="useGeneratedKeys" value="true"/>
    </settings>

    <!-- 别名定义 -->
    <typeAliases>
        <package name="com.hong.spring.entity"/>
    </typeAliases>

</configuration>

logging.properties

org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
org.apache.jasper.servlet.TldScanner.level = FINE

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
    <appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
        <RollingFile name="RollingFile" fileName="logs/app.log"
                     filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
            <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
            <SizeBasedTriggeringPolicy size="5 MB"/>
        </RollingFile>
    </appenders>
    <loggers>
        <root level="INFO">
            <appender-ref ref="Console"/>
            <appender-ref ref="RollingFile"/>
        </root>
    </loggers>
</configuration>

kafka.xml

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd" default-lazy-init="true">

   <!-- 定义producer的参数 -->
   <bean id="consumerProperties" class="java.util.HashMap">
      <constructor-arg>
         <map>
            <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
            <entry key="group.id" value="${kafka.group.id}"/>
            <entry key="auto.commit.interval.ms" value="${kafka.auto.commit.interval.ms}" />
            <!--<entry key="retries" value="${kafka.retries}" />-->
            <!--<entry key="batch.size" value="${kafka.batch.size}" />-->
            <entry key="linger.ms" value="1" />
            <entry key="buffer.memory" value="${kafka.buffer.memory}" />
            <entry key="session.timeout.ms" value="${kafka.session.timeout.ms}" />
            <entry key="enable.auto.commit" value="${kafka.enable.auto.commit}" />
            <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
            <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
            <entry key="key.deserializer" value="${kafka.key.deserializer}"/>
            <entry key="value.deserializer" value="${kafka.value.deserializer}"/>
         </map>
      </constructor-arg>
   </bean>

   <!--监听器-->
   <bean id="kafkaConsumerListener" class="com.hong.spring.listener.UserListener" />

    <!-- 创建kafkatemplate需要使用的producerfactory bean -->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties"/>
        </constructor-arg>
    </bean>

   <!--容器配置信息-->
   <bean id="userListener"
        class="org.springframework.kafka.listener.config.ContainerProperties">
      <!-- 订阅主题 -->
      <constructor-arg value="${kafka_user_topic}" />
      <property name="messageListener" ref="kafkaConsumerListener" />
   </bean>


   <!--消费者容器-->
   <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
      <constructor-arg ref="consumerFactory"/>
      <constructor-arg ref="userListener"/>
   </bean>
</beans>

kafka.properties

#集群配置
kafka.bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
#组配置
kafka.group.id=1,2,3
#尝试次数
kafka.retries=3
#批次大小
kafka.batch.size=16384
#内存大小
kafka.buffer.memory=33554432
#默认topic
kafka.producer.defaultTopic=hong
#自动提前
kafka.enable.auto.commit=true
#超时时间
kafka.session.timeout.ms=15000
#
kafka.auto.commit.interval.ms=1000
#key序列化
kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#值序列化
kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#用户topic
kafka_user_topic=kafka_user_topic

jdbc.properties

config.properties:
#数据库驱动
jdbc.driver=com.mysql.jdbc.Driver
#数据库连接url
jdbc.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8
#数据库用户名
jdbc.user=root
#数据库密码
jdbc.password=123456

applicationContext.xml

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
      xmlns:mvc="http://www.springframework.org/schema/mvc"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">


   <!-- 配置组件扫描 -->
   <context:component-scan base-package="com.hong.spring"></context:component-scan>
   <!--加载配置文件-->
   <context:property-placeholder location="classpath:jdbc.properties,classpath:kafka.properties"/>

   <!-- 开启注解 -->
   <context:annotation-config />
   <!--开启注解事务-->
   <tx:annotation-driven transaction-manager="transactionManager" />
   <!--放行静态资源-->
   <mvc:default-servlet-handler />


   <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
        id="internalResourceViewResolver">
      <!-- 前缀 -->
      <property name="prefix" value="/WEB-INF/pages/" />
      <!-- 后缀 -->
      <property name="suffix" value=".html" />
      <property name="contentType" value="text/html"/>

   </bean>

   <!--开启mvc注解事务-->
   <!-- 定义注解驱动 -->
   <mvc:annotation-driven>
      <mvc:message-converters>
         <!-- 设置支持中文 -->
         <bean class="org.springframework.http.converter.StringHttpMessageConverter">
            <property name="supportedMediaTypes">
               <list>
                  <value>text/plain;charset=UTF-8</value>
                  <value>text/html;charset=UTF-8</value>
               </list>
            </property>
         </bean>
         <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
      </mvc:message-converters>
   </mvc:annotation-driven>


   <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
      <!-- 基础配置 -->
      <property name="url" value="${jdbc.url}"></property>
      <property name="driverClassName" value="${jdbc.driver}"></property>
      <property name="username" value="${jdbc.user}"></property>
      <property name="password" value="${jdbc.password}"></property>

      <!-- 关键配置 -->
      <!-- 初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时 -->
      <property name="initialSize" value="3" />
      <!-- 最小连接池数量 -->
      <property name="minIdle" value="2" />
      <!-- 最大连接池数量 -->
      <property name="maxActive" value="15" />
      <!-- 配置获取连接等待超时的时间 -->
      <property name="maxWait" value="10000" />

      <!-- 性能配置 -->
      <!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->
      <property name="poolPreparedStatements" value="true" />
      <property name="maxPoolPreparedStatementPerConnectionSize" value="20" />

      <!-- 其他配置 -->
      <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
      <property name="timeBetweenEvictionRunsMillis" value="60000" />
      <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
      <property name="minEvictableIdleTimeMillis" value="300000" />
      <!-- 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,
                  执行validationQuery检测连接是否有效。-->
      <property name="testWhileIdle" value="true" />
      <!-- 这里建议配置为TRUE,防止取到的连接不可用 ,申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。-->
      <property name="testOnBorrow" value="true" />
      <!-- 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能 -->
      <property name="testOnReturn" value="false" />
   </bean>

   <!--事务管理器-->
   <!-- sqlSessionFactory -->
   <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
      <!-- 加载 MyBatis 的配置文件 -->
      <property name="configLocation" value="classpath:mybatis.xml"/>
      <!-- 数据源 -->
      <property name="dataSource" ref="dataSource"/>
      <!-- 所有配置的mapper文件 -->
      <property name="mapperLocations" value="classpath*:com/hong/spring/mapper/*.xml" />
   </bean>

   <!-- Mapper 扫描器 -->
   <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
      <!-- 扫描 包下的组件 -->
      <property name="basePackage" value="com.hong.spring.dao" />
      <!-- 关联mapper扫描器 与 sqlsession管理器 -->
      <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
   </bean>
   <!--事务配置-->
   <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
      <property name="dataSource" ref="dataSource" />
   </bean>
</beans>

application.properties

logging.level.root=WARN
logging.level.org.springframework.web=INFO
logging.level.org.hibernate=ERROR

com.hong.spring.provider.UserServiceImpl

package com.hong.spring.provider;

import com.hong.spring.api.IUserService;
import com.hong.spring.dao.UserMapper;
import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import com.hong.spring.utils.DataResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;


/**
 * @Auther: csh
 * @Date: 2020/8/18 15:16
 * @Description:用户实现
 */
@Service("userService")
public class UserServiceImpl implements IUserService {
    @Autowired
    private UserMapper userDao;

    @Override
    public DataResponse<List<User>> findByAll() {
        List <User> allUserList = userDao.findAllUserList();
        int allTotal = userDao.findAllTotal();
        return DataResponse.BuildSuccessResponse(allUserList,allTotal);
    }
    @Override
    @Transactional
    public DataResponse <Boolean> save(User user) {
        if(null==user){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int save = userDao.save(user);
        return DataResponse.BuildSuccessResponse(save>0?true:false);
    }

    @Override
    public DataResponse <Boolean> insertBatch(List <User> list) {
        if(null==list){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        int batchSave = userDao.insertBatch(list);
        return DataResponse.BuildSuccessResponse(batchSave>0?true:false);
    }

    @Override
    @Transactional
    public DataResponse <Boolean> update(User user) {
        if(null==user || user.getId()==null){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int update = userDao.update(user);
        return DataResponse.BuildSuccessResponse(update>0?true:false);
    }
    @Override
    public DataResponse <User> findById(int i) {
        User byId = userDao.findById(i);
        return DataResponse.BuildSuccessResponse(byId);
    }

    @Override
    public DataResponse <List <User>> findByPage(UserAO ao) {
        if(ao==null){
            ao.setPage(0);
            ao.setPageSize(10);
        }else{
            ao.setPage(ao.getPageSize() * ao.getPage());
        }
        int allTotal = userDao.findAllTotal();
        List <User> byPage = userDao.findByPage(ao);
        return DataResponse.BuildSuccessResponse(byPage,allTotal);
    }
}

com/hong/spring/mapper/UserMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.hong.spring.dao.UserMapper">
    <resultMap type="com.hong.spring.entity.User" id="user">
        <id column="id" property="id" />
        <result column="user_name" property="username" />
        <result column="age" property="age" />
    </resultMap>

    <select id="findById" resultType="com.hong.spring.entity.User">
      SELECT * FROM user WHERE id = #{id,jdbcType=INTEGER}
    </select>

    <select id="findByPage" resultMap="user" parameterType="com.hong.spring.entity.ao.UserAO">
        select * from user where 1=1 limit #{page},#{pageSize}
    </select>

    <select id="findAllUserList" resultMap="user">
      SELECT * FROM user
    </select>

    <select id="findAllTotal" resultType="int">
      SELECT count(*) FROM user
    </select>

    <insert id="save" >
         INSERT INTO user ( user_name, age)
        VALUES (#{username,jdbcType=VARCHAR},
        #{age,jdbcType=INTEGER})
    </insert>

    <insert id="insertBatch">
        insert into user
        ( user_name, age)
        values
        <foreach collection="list" item="user" index="index"
                 separator=",">
            (#{user.username,jdbcType=VARCHAR},#{user.age,jdbcType=INTEGER})
        </foreach>
    </insert>

    <update id="update" >
        update user
        <set>
            <if test="username !=null">
                user_name=#{username,jdbcType=VARCHAR},
            </if>
            <if test="age !=null">
                age =#{age,jdbcType=INTEGER}
            </if>
        </set>
        where id = #{id,jdbcType=INTEGER}
    </update>
</mapper>

com.hong.spring.listener.UserListener

package com.hong.spring.listener;

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.api.IUserService;
import com.hong.spring.entity.User;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.listener.MessageListener;

/**
 * @author: csh
 * @Date: 2021/3/16 11:14
 * @Description:用户监听
 */
@Log4j2
public class UserListener implements MessageListener<String,String> {

    @Autowired
    private IUserService userService;


    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        try {
            log.info(String.format("消费者1收到了消息。topic: %s, partition: %s, offset: %s, key: %s, value: %s", record.topic(),
                    record.partition(), record.offset(), record.key(), record.value()));
            User user = JSONObject.parseObject(record.value(), User.class);
            log.info("获取的用户信息{}", JSONObject.toJSONString(user));
            if(null!=user){
                userService.save(user);
            }
        }catch (Exception e){
            log.error("添加用户异常{}",e);
        }
    }
}

com.hong.spring.dao.UserMapper

package com.hong.spring.dao;

import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 15:04
 * @Description:用户dao层
 */

public interface UserMapper {

    /**
     *
     * 功能描述:查询总条数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:31
     */
    List<User> findAllUserList();
    /**
     *
     * 功能描述:获取总数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int findAllTotal();
    /**
     *
     * 功能描述:更新
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int update(User user);
    /**
     *
     * 功能描述:添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    int save(User user);
    /**
     *
     * 功能描述:批量添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 15:46
     */
    int insertBatch(@Param("list") List <User> list);
    /**
     *
     * 功能描述:通过id查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    User findById(int id);
    /**
     *
     * 功能描述:通过分页查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 16:05
     */
    List<User> findByPage(UserAO ao);
}

启动消费者发现接收到的消息如下:

16:31:45.944 [messageListenerContainer-C-1] INFO com.hong.spring.listener.UserListener - 消费者1收到了消息。topic: kafka_user_topic, partition: 0, offset: 1, key: null, value: {"age":1111,"username":"spring_kafka_hong"}
16:31:46.034 [messageListenerContainer-C-1] INFO com.hong.spring.listener.UserListener - 获取的用户信息{"age":1111,"username":"spring_kafka_hong"}
16:31:47.955 [messageListenerContainer-C-1] INFO com.alibaba.druid.pool.DruidDataSource - {dataSource-1} inite

springboot 整合 kafka

springboot_kafka_api api请求接口 端口:8286 dubbo:20880

springboot_kafka_consumer kafka消费端 端口:8287 dubbo:20881

springboot_kafka_producer kafka生产端 端口:8288 dubbo:20882

实现,通过api请求进来去调用producer的dubbo接口,然后producer发送kafka消息给consumer,consumer消费消息后再调用producer的dubbo进行插入数据库。

springboot_kafka_api

项目结构

│  pom.xml
│
└─src
    └─main
        ├─java
        │  └─com
        │      └─hong
        │          └─springboot
        │              │  Application.java
        │              │
        │              └─controller
        │                      IndexController.java
        │                      UserController.java
        │
        └─resources
                application.properties

com.hong.springboot.controller.IndexController

package com.hong.springboot.controller;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author: csh
 * @Date: 2021/1/12 10:16
 * @Description:首页
 */
@RestController
public class IndexController {
    @RequestMapping("/")
    public String index(){
        return "成功!";
    }
}

com.hong.springboot.controller.UserController

package com.hong.springboot.controller;


import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.config.annotation.Reference;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 16:11
 * @Description:
 */
@RestController
@Slf4j
@RequestMapping("/user")
public class UserController {
    @Reference
    private IUserService userService;

    @GetMapping("/findByAll")
    public DataResponse<List<User>> findByAll(){
        try {
            return userService.findByAll();
        } catch (Exception e){
            log.error("查询出错{}",e);
        }
        return DataResponse.BuildFailResponse("查询出错!");
    }

    @PostMapping("/save")
    public DataResponse<Boolean> save(User ao){
        if(null==ao || ao.getAge()==null || StringUtils.isBlank(ao.getUsername())){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        DataResponse <Boolean> save = userService.save(ao);
        return save;
    }
}

com.hong.springboot.Application

package com.hong.springboot;


import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:springboot dubbo消费端
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@EnableDubbo
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

application.properties

#dubbo configuration
#服务名称
dubbo.application.name=springboot_dubbo_consumer
#注册中心协议
dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20880
#协议名称
dubbo.protocol.name=dubbo
#扫包
dubbo.scan.basePackages=com.hong.springboot.controller

#避免端口冲突
server.port=8286

springboot_all/springboot_kafka_api/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.hong.springboot</groupId>
        <artifactId>springboot_all</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath/>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hong.springboot</groupId>
    <artifactId>springboot_kafka_api</artifactId>


    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.hong.springboot</groupId>
            <artifactId>springboot_mq_api</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba.boot</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>0.2.0</version>
        </dependency>
        <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.4-beta</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>

    <!--静态资源导出问题-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>

</project>

springboot_kafka_producer

项目结构

│ pom.xml
│
└─src
    └─main
        ├─java
        │ └─com
        │ └─hong
        │ └─springboot
        │ │ Application.java
        │ │
        │ ├─config
        │ │ DruidConfig.java
        │ │ TopicAll.java
        │ │
        │ ├─dao
        │ │ UserMapper.java
        │ │
        │ └─provider
        │ UserServiceImpl.java
        │
        └─resources
                application.properties

com.hong.springboot.config.DruidConfig

package com.hong.springboot.config;

import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

/**
 * @author: csh
 * @Date: 2021/1/8 18:08
 * @Description:数据源配置
 */
@Configuration
public class DruidConfig {
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource(){
        return new DruidDataSource();
    }
}

com.hong.springboot.config.TopicAll

package com.hong.springboot.config;

/**
 * @author: csh
 * @Date: 2021/3/16 14:15
 * @Description:存放所有的topic
 */
public class TopicAll {
    //用户topic
    public static final String USER_TOPIC ="springboot_user_topic";
}

com.hong.springboot.dao.UserMapper

package com.hong.springboot.dao;

import com.hong.springboot.entity.User;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Select;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 15:04
 * @Description:用户dao层
 */

public interface UserMapper {
    @Select("select id,user_name,age from user")
    List<User> findAllUser();

    @Insert("insert into user (user_name,age) values(#{username},#{age})")
    int insert(User user);
}

com.hong.springboot.provider.UserServiceImpl

package com.hong.springboot.provider;


import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.config.annotation.Service;
import com.alibaba.fastjson.JSONObject;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.config.TopicAll;
import com.hong.springboot.dao.UserMapper;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;


/**
 * @Auther: csh
 * @Date: 2020/8/18 15:16
 * @Description:用户实现
 */
@Service(interfaceClass = IUserService.class,timeout = 6000)
@Slf4j
public class UserServiceImpl implements IUserService {
    @Autowired
    private UserMapper userDao;
    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;

    @Override
    public DataResponse<List<User>> findByAll() {
        List <User> allUserList = userDao.findAllUser();
        return DataResponse.BuildSuccessResponse(allUserList,allUserList.size());
    }

    @Override
    public DataResponse <Boolean> save(User userAO) {
        try {
            log.info("需要kafka添加的用户信息{}",JSONObject.toJSONString(userAO));
            kafkaTemplate.send(TopicAll.USER_TOPIC, JSONObject.toJSONString(userAO));
            return DataResponse.BuildSuccessResponse(true);
        }catch (Exception e){
            log.error("添加kafka用户信息失败{}",e);
            return DataResponse.BuildFailResponse("添加用户失败!");
        }
    }

    @Transactional
    @Override
    public DataResponse <Boolean> reallySave(User user) {
        log.info("要添加的用户信息{}",JSONObject.toJSONString(user));
        if(null==user || user.getAge()==null || StringUtils.isEmpty(user.getUsername())){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        int insert = userDao.insert(user);
        return insert>0?DataResponse.BuildSuccessResponse(true):DataResponse.BuildFailResponse("失败",false);
    }
}

com.hong.springboot.Application

package com.hong.springboot;

import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@MapperScan("com.hong.springboot.dao")
@EnableDubbo
public class Application  {

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

}

application.properties

#### kafka生产配置
#============== kafka ===================
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094

#=============== provider =======================
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432

#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
spring.kafka.producer.acks=1

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#dubbo configuration
#服务名称
dubbo.application.name=springboot_kafka_producer

dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20881
#协议名称
dubbo.protocol.name=dubbo

#避免端口冲突
server.port=8287
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456


#mybatis配置
mybatis.typeAliasesPackage=com.hong.springboot.entity

springboot_all/springboot_kafka_producer/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.hong.springboot</groupId>
        <artifactId>springboot_all</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath/>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hong.springboot</groupId>
    <artifactId>springboot_kafka_producer</artifactId>


    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.hong.springboot</groupId>
            <artifactId>springboot_mq_api</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.3</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.boot</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>0.2.0</version>
        </dependency>
        <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.4-beta</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <!--<build>-->
        <!--<plugins>-->
            <!--<plugin>-->
                <!--<groupId>org.springframework.boot</groupId>-->
                <!--<artifactId>spring-boot-maven-plugin</artifactId>-->
                <!--<configuration>-->
                    <!--<skip>true</skip>-->
                <!--</configuration>-->
            <!--</plugin>-->
        <!--</plugins>-->
    <!--</build>-->
    <!--静态资源导出问题-->
    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>

</project>

springboot_kafka_consumer

项目结构

│ pom.xml
│
└─src
    └─main
        ├─java
        │ └─com
        │ └─hong
        │ └─springboot
        │ │ Application.java
        │ │
        │ └─listener
        │ UserListener.java
        │
        └─resources
                application.properties

com.hong.springboot.listener.UserListener

package com.hong.springboot.listener;

import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

/**
 * @author: csh
 * @Date: 2021/3/16 11:14
 * @Description:用户监听
 */
@Service
@Log4j2
public class UserListener {

    @Reference
    private IUserService userService;

    @KafkaListener(topics = "${kafka.user_topic}")
    public void onMessage(String message){
        log.info("用户message"+message);
        User user = JSONObject.parseObject(message, User.class);
        log.info("springboot获取的用户信息{}", JSONObject.toJSONString(user));
        DataResponse<Boolean> save = userService.reallySave(user);
        log.info("添加结果{}",JSONObject.toJSONString(save));
    }

}

com.hong.springboot.Application

package com.hong.springboot;

import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@EnableDubbo
public class Application  {

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

}

application.properties

#### kafka配置消费者 start ####
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=1,2,3
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=true
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=1000

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer


#避免端口冲突
server.port=8288

#dubbo configuration
#服务名称
dubbo.application.name=springboot_kafka_consumer

dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20882
#协议名称
dubbo.protocol.name=dubbo
#扫包
dubbo.scan.basePackages=com.hong.springboot.listener

#用户topic
kafka.user_topic=springboot_user_topic

springboot_all/springboot_kafka_consumer/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.hong.springboot</groupId>
        <artifactId>springboot_all</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath/>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hong.springboot</groupId>
    <artifactId>springboot_kafka_consumer</artifactId>


    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.hong.springboot</groupId>
            <artifactId>springboot_mq_api</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.boot</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>0.2.0</version>
        </dependency>
        <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.4-beta</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <!--<build>-->
        <!--<plugins>-->
            <!--<plugin>-->
                <!--<groupId>org.springframework.boot</groupId>-->
                <!--<artifactId>spring-boot-maven-plugin</artifactId>-->
                <!--<configuration>-->
                    <!--<skip>true</skip>-->
                <!--</configuration>-->
            <!--</plugin>-->
        <!--</plugins>-->
    <!--</build>-->
    <!--静态资源导出问题-->
    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>

</project>

启动:api 消费者 生产者 然后请求如下

结果

最后

目前很多分布式日志收集框架底层都是有kafka的身影,在性能上面也是非常优秀的,单台kafka最高支持17.3W/s,但是kafka不支持消息失败重试,不像rocketmq会一整套机制来保证,并且kafka不支持定时消息。

参考文章:

https://www.cnblogs.com/linjiqin/p/11950758.html

https://www.cnblogs.com/linjiqin/p/11950758.html

https://github.com/spring-projects/spring-kafka/tree/master/samples/sample-01


本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 技术趋势 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档