前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spring集成activemq 原

spring集成activemq 原

作者头像
尚浩宇
发布2018-08-17 10:16:32
7160
发布2018-08-17 10:16:32
举报
文章被收录于专栏:杂烩

两个项目,一个生产者一个消费者,这里只贴出关键代码(队列模式和订阅模式),文章最后会附上项目地址,有需要的可以自行下载。项目访问地址http://localhost:8080/activemq-producer/Test

生产者

代码语言:javascript
复制
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
		destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<!-- JMS消息服务器的IP和端口号 -->
				<property name="brokerURL">
					<value>tcp://127.0.0.1:61616</value>
				</property>
				<!-- 是否异步发送 -->  
                <property name="useAsyncSend" value="true" />
			</bean>
		</property>
		<property name="maxConnections" value="100" />
		<!-- 超时设置 -->
		<!-- <property name="idleTimeout" value="60"/> -->
	</bean>	
	<!-- 发送消息的目的地(一个主题) -->  
    <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <!-- 设置消息主题的名字 -->  
        <constructor-arg index="0" value="Online.Notice.Topic" />  
    </bean>  	
	<!-- 队列Template -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory">
			<ref local="jmsFactory" />
		</property>
		<!-- 默认队列名称 -->
		<property name="defaultDestinationName" value="sams" />
		<!-- 区别它采用的模式为false是p2p, true是订阅 -->
		<!-- <property name="pubSubDomain" value="false" /> -->
	</bean>	
	<!-- 订阅Template -->
	<bean id="topicjmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="jmsFactory" />  
        <property name="defaultDestination" ref="myDestination" />  
        <!-- 订阅发布模式 -->  
        <property name="pubSubDomain" value="true" />  
        <property name="receiveTimeout" value="10000" />  
	</bean>	
	<!-- 短信队列 -->
	<bean id="sams" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="sams" />
	</bean>
	<!-- 消息发送者 -->
	<bean id="QueueSender" class="com.producer.QueueSender">
		<constructor-arg ref="sams" />
		<property name="jmsTemplate" ref="jmsTemplate" />
	</bean>

queues.java

代码语言:javascript
复制
public class QueueSender {
	private Destination destination;
	private JmsTemplate jmsTemplate;

	public QueueSender(ActiveMQQueue queue) {
		this.destination = queue;
	}
	public void send(final String message) throws JMSException {
		jmsTemplate.send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message);
			}
		});
	}
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
}

Test.java

代码语言:javascript
复制
/**
 * Servlet implementation class Test
 */
public class Test extends HttpServlet {
	private static final long serialVersionUID = 1L;
	private static final Logger LOGGER=Logger.getLogger(Test.class);
    /**
     * Default constructor. 
     */
    public Test() {
        // TODO Auto-generated constructor stub
    }
	protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		doPost(request, response);
	}
	protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		ApplicationContext ac1 = WebApplicationContextUtils.getRequiredWebApplicationContext(request.getServletContext());
		JmsTemplate jmsTemplate=(JmsTemplate) ac1.getBean("jmsTemplate");
		jmsTemplate.send(new MessageCreator() {
			
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage("send a message~!");   
			}
		});
		jmsTemplate.send("callcenter",new MessageCreator() {
			
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage("send a message to callcenter~!");   
			}
		});
		JmsTemplate topicjmsTemplate=(JmsTemplate) ac1.getBean("topicjmsTemplate");
		topicjmsTemplate.send(new MessageCreator() {  
            public Message createMessage(Session session) throws JMSException {  
                TextMessage msg = session.createTextMessage();  
                msg.setStringProperty("phrCode", "C001");  
                msg.setText("Hello World!");  
                return msg;  
            }  
        });  
	}

}

消费者项目

代码语言:javascript
复制
<!-- 1. 配置connectionFactory -->
	<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
		destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL">
					<!-- JMS消息服务器的IP和端口号 -->
					<value>tcp://127.0.0.1:61616</value>
				</property>
			</bean>
		</property>
		<property name="maxConnections" value="100"/>
		<!-- <property name="idleTimeout" value="60"/> -->
	</bean>
	<!-- 来自短信系统的消息 -->
	<bean id="names" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="sams,callcenter" />
	</bean>
	 <!-- 订阅消息 -->  
    <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <!-- 设置消息主题的名字 -->  
        <constructor-arg index="0" value="Online.Notice.Topic" />  
    </bean>  
	<!-- 队列监听器 -->
	<bean id="QueueReceiverListener" class="com.customer.QueueReceiverListener">
	</bean>
	<!-- 队列监听器的容器 -->
	<bean id="QueueReceiverContainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="concurrentConsumers" value="2" />
		<property name="connectionFactory" ref="jmsFactory" />
		<!-- 监听的队列 -->
		<property name="destination" ref="names" />
		<property name="messageListener" ref="QueueReceiverListener" />
		<property name="pubSubNoLocal" value="false"/>
	</bean>
	<!-- 生产消息配置 (自己定义)-->  
    <bean id="myTopicConsumer" class="com.customer.SimpleJMSReceiver" />  
	<!-- 消息监听器 -->  
    <bean id="myTopicListener"  
        class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
        <constructor-arg ref="myTopicConsumer" />  
        <!-- 接收消息的方法名称 -->  
        <property name="defaultListenerMethod" value="receive" />  
        <!-- 不进行消息转换 -->  
        <property name="messageConverter"><null/></property>  
    </bean>  
	<!-- 订阅监听器的容器 -->
	<bean id="myListenerContainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsFactory" />  
        <!-- 发布订阅模式 -->  
        <property name="pubSubDomain" value="true"/>  
        <!-- 消息持久化 -->  
        <property name="subscriptionDurable" value="true"/>  
        <property name="receiveTimeout" value="10000"/>  
        <!-- 接收者ID -->  
        <property name="clientId" value="client_119" />  
        <property name="durableSubscriptionName" value="client_119"/>  
        <property name="destination" ref="myDestination" />  
        <property name="messageListener" ref="myTopicListener" /> 
	</bean>

QueueReceiverListener.java

代码语言:javascript
复制
public class QueueReceiverListener implements SessionAwareMessageListener {
	private static final Logger LOGGER = Logger.getLogger(QueueReceiverListener.class);

	public void onMessage(Message message, Session session) throws JMSException {
				TextMessage msg = (TextMessage) message;			
				try {
					Thread.currentThread().sleep(5000);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
	}
}

SimpleJMSReceiver.java

代码语言:javascript
复制
public class SimpleJMSReceiver {
	public void receive(TextMessage message) throws JmsException, JMSException {  
        System.out.println(message.getStringProperty("phrCode"));  
        System.out.println(message.getText());  
    } 
}

pom.xml两个项目都是一样的

代码语言:javascript
复制
<!-- 属性配置 -->
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>
	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>3.1.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-test</artifactId>
			<version>3.1.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>javax.annotation</groupId>
			<artifactId>jsr250-api</artifactId>
			<version>1.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-pool</artifactId>
			<version>5.9.0</version>
		</dependency>
		<!-- 添加Spring依赖 -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-beans</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jdbc</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-web</artifactId>
			<version>4.0.6.RELEASE</version>
		</dependency>
	</dependencies>

下面是输出信息:

代码语言:javascript
复制
17:18:39,725  INFO Test:45 - --------发送队列消息开始--------
17:18:39,725 DEBUG DefaultListableBeanFactory:245 - Returning cached instance of singleton bean 'jmsTemplate'
17:18:39,979 DEBUG JmsTemplate:464 - Executing callback on JMS Session: PooledSession { ActiveMQSess
17:18:40,029  INFO QueueReceiverListener:16 - -------接收消息开始---------
17:18:40,030 DEBUG QueueReceiverListener:20 - QueueReceiverListener接收到报文:send a message~!
17:18:40,030 DEBUG QueueReceiverListener:22 - QueueReceiverListener接收到报文:send a message~!

17:18:40,046 DEBUG JmsTemplate:567 - Sending created message: ActiveMQTextMessage {commandId = 0, re
17:18:40,047  INFO Test:60 - -------发送队列消息结束---------
17:18:40,047  INFO Test:61 - -------发送订阅消息开始---------
17:18:40,047 DEBUG DefaultListableBeanFactory:245 - Returning cached instance of singleton bean 'topicjmsTemplate'
17:18:40,057 DEBUG JmsTemplate:464 - Executing callback on JMS Session: PooledSession { ActiveMQSession {id=ID:shy-PC-55695-1440407919807-1:3:1,started=false} }
17:18:40,058 DEBUG JmsTemplate:567 - Sending created message: ActiveMQTextMessage {commandId = 0, re
17:18:40,059  INFO Test:73 - -------发送订阅消息结束---------
17:18:40,061 DEBUG DefaultMessageListenerContainer:313 - Received message of type [class org.apache.
C001
Hello World!

项目下载地址:http://pan.baidu.com/s/1o6DvqrC

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2015/08/24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
短信
腾讯云短信(Short Message Service,SMS)可为广大企业级用户提供稳定可靠,安全合规的短信触达服务。用户可快速接入,调用 API / SDK 或者通过控制台即可发送,支持发送验证码、通知类短信和营销短信。国内验证短信秒级触达,99%到达率;国际/港澳台短信覆盖全球200+国家/地区,全球多服务站点,稳定可靠。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档