前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ActiveMQ 的 三台服务器高可用集群搭建方案

ActiveMQ 的 三台服务器高可用集群搭建方案

作者头像
北漂的我
发布2019-05-29 11:37:16
1.6K0
发布2019-05-29 11:37:16
举报
文章被收录于专栏:北漂的我

为什么80%的码农都做不了架构师?>>>

1. 高可用集群搭建方案

节点A: 与 节点B 节点C 进行消息同步, 所以节点A 节点B 节点C 都可用作消费者访问节点

节点B: 作为 master 主节点, 作为 生产者访问节点和消费者访问节点

节点C: 作为 slave 从节点, 作为 生产者访问节点和消费者访问节点, 当 master 节点挂掉后, slave 自动转换为 master 节点.

2. 高可用集群搭建步骤

正常应该在3台独立的服务器上进行集群搭建, 本演示只在同一台服务器上进行演示搭建.

将 ActiveMQ 安装包 解压到 三个不同的文件夹, 分别为 activemq-a, activemq-b, activemq-c

修改 activemq-a/conf/activemq.xml

代码语言:javascript
复制
<transportConnectors>
	<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
	<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<!--<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>-->
</transportConnectors>

<networkConnectors>
	<networkConnector name="local_network" uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/>
</networkConnectors>

修改 activemq-b/conf/activemq.xml

代码语言:javascript
复制
<transportConnectors>
	<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
	<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<!--<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>-->
</transportConnectors>
<networkConnectors>
	<networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)"/>
</networkConnectors>

修改 activemq-b/conf/jetty.xml

代码语言:javascript
复制
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
	<!-- the default port number for the web console -->
	<property name="host" value="0.0.0.0"/>
	<property name="port" value="8162"/>
</bean>

修改 activemq-c/conf/activemq.xml

代码语言:javascript
复制
<transportConnectors>
	<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
	<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<!--<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>-->
</transportConnectors>
<networkConnectors>
	<networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)"/>
</networkConnectors>

修改 activemq-c/conf/jetty.xml

代码语言:javascript
复制
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
	<!-- the default port number for the web console -->
	<property name="host" value="0.0.0.0"/>
	<property name="port" value="8163"/>
</bean>

3. 用代码简单测试

代码语言:javascript
复制
package com.codingos.springboot.activemq.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 队列模式 生产者 集群测试
 */
public class AppProducer2 {

	private static final String url = "failover:(tcp://192.168.159.128:61617,tcp://192.168.159.128:61618)?randomize=true";
	private static final String queueName = "queue-test1";

	public static void main(String[] args) throws JMSException {
		// 1.常见ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
		// 2.创建Connection
		Connection connection = connectionFactory.createConnection();
		// 3.启动连接
		connection.start();
		// 4.创建会话 (1参: 是否在事务中处理, 2参: 使用自动应答模式)
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 5.创建一个目标队列
		Destination destination = session.createQueue(queueName);
		// 6.创建生产者
		MessageProducer producer = session.createProducer(destination);

		for (int i = 0; i < 100; i++) {
			// 7.创建消息
			TextMessage textMessage = session.createTextMessage("Message " + i);
			// 8.发送消息
			producer.send(textMessage);
			System.out.println("发送消息: " + textMessage.getText());
		}
		// 9.关闭连接
		connection.close();
	}
}
代码语言:javascript
复制
package com.codingos.springboot.activemq.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 队列模式 消费者 集群测试
 */
public class AppConsumer2 {
	
	private static final String url = "failover:(tcp://192.168.159.128:61616,tcp://192.168.159.128:61617,tcp://192.168.159.128:61618)?randomize=true";
	private static final String queueName = "queue-test1";

	public static void main(String[] args) throws JMSException {
		// 1.常见ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
		// 2.创建Connection
		Connection connection = connectionFactory.createConnection();
		// 3.启动连接
		connection.start();
		// 4.创建会话 (1参: 是否在事务中处理, 2参: 使用自动应答模式)
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 5.创建一个目标队列
		Destination destination = session.createQueue(queueName);
		// 6.创建消费者
		MessageConsumer consumer = session.createConsumer(destination);
		// 7.创建监听器
		consumer.setMessageListener(new MessageListener() {
			@Override
			public void onMessage(Message message) {
				TextMessage textMessage = (TextMessage) message;
				try {
					System.out.println("接收消息: "+textMessage.getText());
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
	}
}

最后 访问 三个节点的 ActiveMQ 管理界面 进行相关查看

activemq-a http://192.168.159.128:8161

activemq-b http://192.168.159.128:8162

activemq-c http://192.168.159.128:8163

相关项目代码可以参考https://gitee.com/jayking/spring-boot-activemq

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档