专栏首页品茗ITSpringBoot入门建站全系列(十七)整合ActiveMq(JMS类消息队列)

SpringBoot入门建站全系列(十七)整合ActiveMq(JMS类消息队列)

SpringBoot入门建站全系列(十七)整合ActiveMq(JMS类消息队列)

一、概述

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)

常见的消息中间件产品:

(1)ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。这里介绍的是ActiveMQ的使用。

(2)RabbitMQ

AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。

(3)ZeroMQ

史上最快的消息队列系统

(4)Kafka

Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。

Jms

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

AMQP

AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。意味着我们可以使用Java的AMQP provider,同时使用一个python的producer加一个rubby的consumer。从这一点看,AQMP可以用http来进行类比,不关心实现的语言,只要大家都按照相应的数据格式去发送报文请求,不同语言的client均可以和不同语言的server链接。

代码可以在SpringBoot组件化构建https://www.pomit.cn/java/spring/springboot.html中的ActiveMQ组件中查看,并下载。

**如果大家正在寻找一个java的学习环境,或者在开发中遇到困难,可以<a

href="https://jq.qq.com/?_wv=1027&k=52sgH1J"

target="_blank">

加入我们的java学习圈,点击即可加入

</a>

,共同学习,节约学习时间,减少很多在学习中遇到的难题。**

二、配置

本文假设你已经引入spring-boot-starter-web。已经是个SpringBoot项目了,如果不会搭建,可以打开这篇文章看一看《SpringBoot入门建站全系列(一)项目建立》

2.1 Maven依赖

使用activemq可以使用spring-boot-starter-activemq,方便快捷,一般springboot对大多数开源项目都做了整合,提供了专用的stater。

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

2.2 配置文件

在application.properties 中需要配置activemq的信息,也可以配置自定义的配置,如:

spring.activemq.broker-url=tcp://localhost:61616
#spring.activemq.user=admin
#spring.activemq.password=secret

jms.destQueueName=destQueue

这里面,

  • spring.activemq.broker-url是springboot自动装配的配置,activemq的地址。
  • spring.activemq.user是springboot自动装配的配置,activemq的用户名,一般自己测试都不会去设置用户名密码的。
  • spring.activemq.password是springboot自动装配的配置,,activemq的密码,一般自己测试都不会去设置用户名密码的。
  • jms.destQueueName,这是我自己定义的一个目的队列名。

三、ActiveMQ的使用

3.1 配置数据转换

ActiveMQ是分为生产者和消费者的,生产者生产的消息,如何能够被消费者正常解析,需要开发者自己对数据转换做定义,如果你非要说,我用字符串也可以啊,那当这个不存在就行了。

下面这个配置是指明了ActiveMQ的数据转换是用MappingJackson2MessageConverter,将json数据转换为对象,或者将对象转换为json。

ActiveMQConfig :

package com.cff.springbootwork.activemq;

import javax.jms.ConnectionFactory;

import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

@Configuration
public class ActiveMQConfig {

	@Bean
	public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
			DefaultJmsListenerContainerFactoryConfigurer configurer) {
		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		configurer.configure(factory, connectionFactory);
		factory.setMessageConverter(jacksonJmsMessageConverter());
		return factory;
	}

	@Bean
	public MessageConverter jacksonJmsMessageConverter() {
		MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
		converter.setTargetType(MessageType.TEXT);
		converter.setTypeIdPropertyName("_type");
		return converter;
	}
}

3.2 生产者

ActiveMQ当然需要生产者来生产信息,然后才发送到消息队列的。

JmsProducer:

package com.cff.springbootwork.activemq.handler;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import com.cff.springbootwork.activemq.model.DefaultMqModel;

@Component
public class JmsProducer {
	@Autowired
	JmsTemplate jmsTemplate;

	@Value("${jms.destQueueName}")
	String destQueueName;

	public void send(DefaultMqModel defaultMqModel) {
		jmsTemplate.convertAndSend(destQueueName, defaultMqModel);
	}
}

这里的JmsTemplate 是Spring自动生成的bean。destQueueName注入的是配置文件中定义的目的队列。然后发送数据。

测试发送数据,ActiveMQRest :

package com.cff.springbootwork.activemq.web;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.cff.springbootwork.activemq.handler.JmsProducer;
import com.cff.springbootwork.activemq.model.DefaultMqModel;
import com.cff.springbootwork.activemq.model.SeccondMqModel;

@RestController
@RequestMapping("/activemq")
public class ActiveMQRest {
	@Autowired
	JmsProducer jmsProducer;

	@RequestMapping(value = "/test", method = { RequestMethod.GET })
	public DefaultMqModel test() {
		DefaultMqModel defaultMqModel = new DefaultMqModel();
		defaultMqModel.setContent("hahahahahahhahaha哈啊哈哈");
		defaultMqModel.setTitle("测试");
		defaultMqModel.setType(1);
		jmsProducer.send(defaultMqModel);
		return defaultMqModel;
	}

	@RequestMapping(value = "/test2", method = { RequestMethod.GET })
	public SeccondMqModel test2() {
		SeccondMqModel seccondMqModel = new SeccondMqModel();
		seccondMqModel.setContent("asdasdasd哈啊哈哈");
		seccondMqModel.setTitle("测试2");
		seccondMqModel.setRemark("第二个");
		seccondMqModel.setType(2);
		jmsProducer.send(seccondMqModel);
		return seccondMqModel;
	}
}

3.3 消费者

消费者监听指定的队列,需要用@JmsListener注解标明它是一个消费者。

JmsConsumer :

package com.cff.springbootwork.activemq.handler;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import com.cff.springbootwork.activemq.model.DefaultMqModel;
import com.cff.springbootwork.activemq.service.BusinessSerivce;

@Component
public class JmsConsumer {

	@Autowired
	BusinessSerivce businessSerivce;

	@JmsListener(destination = "${jms.destQueueName}")
	public void processMessage(DefaultMqModel defaultMqModel) {
		businessSerivce.doBusiness(defaultMqModel);
	}
}

这里,@JmsListener注解表明它在监听我们配置文件中配置的队列,方法的参数可以直接传入对象,因为我们在3.1中配置了统一的MessageConverter,它可以自动解析成对象。

BusinessSerivce是我自定义的一个业务逻辑处理service.

BusinessSerivce:

package com.cff.springbootwork.activemq.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.cff.springbootwork.activemq.model.DefaultMqModel;
import com.cff.springbootwork.activemq.model.SeccondMqModel;

@Service
public class BusinessSerivce {
	private final Logger log = LoggerFactory.getLogger(this.getClass());

	public void doBusiness(DefaultMqModel defaultMqModel) {
		log.info(defaultMqModel.toString());
		
		if(defaultMqModel.getType() == 2 && defaultMqModel instanceof SeccondMqModel){
			SeccondMqModel seccondMqModel = (SeccondMqModel) defaultMqModel;
			log.info(seccondMqModel.remark);
		}
	}
}

四、过程中用到的实体

DefaultMqModel:

package com.cff.springbootwork.activemq.model;

public class DefaultMqModel {
	public String title;
	public String content;
	public Integer type;

	public String getTitle() {
		return title;
	}

	public void setTitle(String title) {
		this.title = title;
	}

	public String getContent() {
		return content;
	}

	public void setContent(String content) {
		this.content = content;
	}

	public Integer getType() {
		return type;
	}

	public void setType(Integer type) {
		this.type = type;
	}

	@Override
	public String toString() {
		return "DefaultMqModel [title=" + title + ", content=" + content + ", type=" + type + "]";
	}

}

SeccondMqModel:

package com.cff.springbootwork.activemq.model;

public class SeccondMqModel extends DefaultMqModel {
	public String remark;

	public String getRemark() {
		return remark;
	}

	public void setRemark(String remark) {
		this.remark = remark;
	}

	@Override
	public String toString() {
		return "SeccondMqModel [remark=" + remark + ", title=" + title + ", content=" + content + ", type=" + type
				+ "]";
	}

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • SpringBoot入门建站全系列(三十五)整合Oauth2做单机版认证授权

    OAuth 2.0 规范定义了一个授权(delegation)协议,对于使用Web的应用程序和API在网络上传递授权决策非常有用。OAuth被用在各钟各样的应用...

    品茗IT
  • SpringBoot入门建站全系列(十二)Spring Security使用token做认证

    Spring 是一个非常流行和成功的 Java 应用开发框架。Spring Security 基于 Spring 框架,提供了一套 Web 应用安全性的完整解决...

    品茗IT
  • Spring和WebSocket整合详解(建立Web聊天室)

    WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。

    品茗IT
  • 记一次学习SpringBoot RequestBodyAdvice ResponseBodyAdvice RestControllerAdvice【技能篇】

    今天老板给我了一套代码,然后我就拿过去研究,代码的风格是SSM + Shiro + nginx + SpringBoot的MVC架构风格,springboot,...

    奕仁
  • Spring Cloud Gateway快速体验

    GlobalFilter只要注册到Spring容器,就可以应用在所有请求,比如监控请求耗时

    十毛
  • SpringBoot集成Swagger2

    在一些接口项目中,API的使用很频繁,所以一款API在线文档生成和测试工具非常有必要。而Swagger UI就是这么一款很实用的在线工具 本博客介绍如何在公司...

    SmileNicky
  • SpringCloud 2.x学习笔记:15、Spring Cloud Gateway之Filter过滤器(Greenwich版本)

    AddRequestHeaderGatewayFilterFactory的源码

    程裕强
  • ssh登录实现

    工程目录 ? 配置文件详解 Spring的applicationContext.xml文件 <span ><?xml version="1.0" encodin...

    用户1141560
  • Spring Boot系列——7步集成RabbitMQ

    RabbitMQ是一种我们经常使用的消息中间件,通过RabbitMQ可以帮助我们实现异步、削峰的目的。

    JackieZheng
  • SpringBoot系列之前后端接口安全技术JWT

    JWT的全称为Json Web Token (JWT),是目前最流行的跨域认证解决方案,是在网络应用环境间传递声明而执行的一种基于JSON的开放标准((RFC ...

    SmileNicky

扫码关注云+社区

领取腾讯云代金券