专栏首页互扯程序activemq的高可用(zookeeper+leveldb)主从集群

activemq的高可用(zookeeper+leveldb)主从集群

一、架构和技术介绍

1、简介

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现

2、activemq的特性

  • 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  • 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  • 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
  • 通过常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resourceadaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4商业服务器上
  • 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  • 支持通过JDBC和journal提供高速的消息持久化
  • 从设计上保证了高性能的集群,客户端-服务器,点对点
  • 支持Ajax
  • 支持与Axis的整合
  • 可以很容易得调用内嵌JMS provider,进行测试

二、集群介绍

从 ActiveMQ 5.9 开始,ActiveMQ 的集群实现方式取消了传统的Master-Slave 方式,增加了基于ZooKeeper + LevelDB的 Master-Slave实现方式,其他两种方式目录共享和数据库共享依然存在。

三种集群方式的对比:

1、基于共享文件系统(KahaDB,默认):

<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter> 

2、基于 JDBC:

<bean id="MySQL-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
      <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
      <property name="url" value="jdbc:mysql://localhost:3306/amq?relaxAutoCommit=true"/>
      <property name="username" value="root"/>
      <property name="password" value="root"/>
      <property name="maxActive" value="20"/>
      <property name="poolPreparedStatements" value="true"/>
</bean>
<persistenceAdapter> 
        <jdbcPersistenceAdapter dataDirectory="${activemq.data}" dataSource="#mysql-ds" createTablesOnStartup="false"/>
</persistenceAdapter>

3、基于可复制的 LevelDB(本文采用这种集群方式):

LevelDB 是 Google开发的一套用于持久化数据的高性能类库。LevelDB并不是一种服务,用户需要自 行实现Server。是单进程的服务,能够处理十亿级别规模Key-Value 型数据,占用内存小。

<persistenceAdapter> 
    <replicatedLevelDB 
           directory="${activemq.data}/leveldb"     replicas="3"   
           bind="tcp://0.0.0.0:62621"   zkAddress="localhost:2181,localhost:2182,localhost:2183" 
           hostname="localhost"    zkPath="/activemq/leveldb-stores"/> 
</persistenceAdapter>

本文主要讲解基于 ZooKeeper 和LevelDB 搭建ActiveMQ 集群。集群仅提供主备方式的高可用集 群功能,避免单点故障,没有负载均衡功能。

官方文档:http://activemq.apache.org/replicated-leveldb-store.html

集群原理图:

高可用的原理:

使用ZooKeeper(集群)注册所有的ActiveMQ Broker。只有其中的一个Broker 可以提供 服务,被视为Master,其他的Broker 处于待机状态,被视为Slave。

如果Master 因故障而不能提供服务,ZooKeeper会从 Slave中选举出一个 Broker充当 Master。 Slave 连接 Master并同步他们的存储状态,Slave不接受客户端连接。所有的存储操作都将被复制到 连接至 Master 的Slaves。如果 Master 宕了,得到了最新更新的 Slave 会成为 Master。

故障节点在恢复后 会重新加入到集群中并连接 Master 进入Slave 模式。 所有需要同步的disk 的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。

所以,如果你配置了replicas=3,那么法定大小是(3/2)+1=2。Master 将会存储并更新然后等待 (2-1)=1 个Slave存储和更新完成,才汇报 success。至于为什么是 2-1,熟悉 Zookeeper 的应该知道,有一个 node要作为观擦者存在。当一个新的Master 被选中,你需要至少保障一个法定node 在线以能够找到拥有最新 状态的node。

这个node 可以成为新的Master。因此,推荐运行至少3 个replica nodes,以防止一个node失败了,服务中断。(原理与 ZooKeeper 集群的高可用实现方式类似)

1、ActiveMQ集群部署规划:

环境:CentOS 6.5 x64 、JDK8

版本:ActiveMQ 5.13.3

ZooKeeper 集群环境:

192.168.1.81:2181,192.168.1.82:2182,192.168.1.83:2183

2、防火墙打开对应的端口

edu-zk-01:

-A INPUT -m state --state NEW -m tcp -p tcp --dport 8161 -j ACCEPT

-A INPUT -m state --state NEW -m tcp -p tcp --dport 51511 -j ACCEPT

-A INPUT -m state --state NEW -m tcp -p tcp --dport 62621 -j ACCEPT

edu-zk-02:

-A INPUT -m state --state NEW -m tcp -p tcp --dport 8162 -j ACCEPT

-A INPUT -m state --state NEW -m tcp -p tcp --dport 51512 -j ACCEPT

-A INPUT -m state --state NEW -m tcp -p tcp --dport 62622 -j ACCEPT

edu-zk-03:

-A INPUT -m state --state NEW -m tcp -p tcp --dport 8163 -j ACCEPT

-A INPUT -m state --state NEW -m tcp -p tcp --dport 51513 -j ACCEPT

-A INPUT -m state --state NEW -m tcp -p tcp --dport 62623 -j ACCEPT

3、分别在三台主机中创建/home/yxq/activemq目录

# mkdir /home/yxq/activemq

上传 apache-activemq-5.13.3-bin.tar.gz 到/home/yxq/activemq 目录\

4、解压并按节点命名

# cd /home/yxq/activemq 

# tar -xvf apache-activemq-5.13.3-bin.tar.gz

# mv apache-activemq-5.13.3     node-0X    #(X代表节点号 1、2、3,下同)

5、修改管理控制台端口(默认为 8161)可在 conf/jetty.xml 中修改,如下:

node-01 管控台端口:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">

        <!-- the default port number for the web console -->

        <property name="host" value="0.0.0.0"/>

        <property name="port" value="8161"/>

</bean>

node-02管控台端口:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">

             <!-- the default port number for the web console -->

        <property name="host" value="0.0.0.0"/>

        <property name="port" value="8162"/>

</bean>

node-03管控台端口:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">

             <!-- the default port number for the web console -->

        <property name="host" value="0.0.0.0"/>

        <property name="port" value="8163"/>

</bean>

6、集群配置:

在 3 个ActiveMQ 节点中配置conf/activemq.xml 中的持久化适配器。修改其中bind、zkAddress、hostname和 zkPath。注意:每个 ActiveMQ 的 BrokerName 必须相同,否则不能加入集群。

所有节点中activemq.xml配置

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="DubboEdu" dataDirectory="${activemq.data}">

node-01 中的持久化配置:

node-02 中的持久化配置:

node-03 中的持久化配置:

修改各节点的消息端口(注意,避免端口冲突):

node-01 中的消息端口配置:

node-02 中的消息端口配置:

node-03 中的消息端口配置:

7、按顺序启动 3个 ActiveMQ节点:(前提是: zookeeper集群已经启动)

# /home/yxq/activemq/node-01/bin/activemq start

# /home/yxq/activemq/node-02/bin/activemq start

# /home/yxq/activemq/node-03/bin/activemq start

监听日志:

#  tail -f /home/yxq/activemq/node-01/data/activemq.log 

#  tail -f /home/yxq/activemq/node-02/data/activemq.log 

#  tail -f /home/yxq/activemq/node-03/data/activemq.log 

8、集群的节点状态分析:

zookeeper信息查看工具,下载地址:

https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip

解压,打开:ZooInspector\build\zookeeper-dev-ZooInspector.jar

集群启动后对 ZooKeeper 数据的抓图

可以看到ActiveMQ 的有3 个节点,分别是

00000000000,00000000001,00000000002,

我这里是 00000000005,00000000006,00000000007。

以下第一张图展现了 000000000005的值,可以看到elected 的值是不为空,说明这个节点是Master,

其他两个节点是 Slave。

9.集群可用性测试

ActiveMQ的客户端只能访问Master的Broker,其他处于Slave的Broker不能访问,所以客户端连接的Broker应该使用failover协议(失败转移)

failover:

(tcp://192.168.1.81:51511,tcp://192.168.1.82:51512,tcp://192.168.1.83:51513)?randomize=false

当一个ActiveMQ节点挂掉,或者一个Zookeeper节点挂掉,ActiveMQ服务依然正常运转,如果仅剩一个ActiveMQ节点,因为不能选举Master,ActiveMQ不能正常运行:同样的,如果Zookeeper仅剩一个节点活动,不管ActiveMQ各节点存活,ActiveMQ也不能正常提供服务。

(ActiveMQ集群的高可用,依赖于Zookeeper集群的高可用)

10.设置开机启动

#vi /etc/rc.local

su - yxq -c '/home/yxq/activemq/node-01/bin/activemq start'

su - yxq -c '/home/yxq/activemq/node-02/bin/activemq start'

su - yxq -c '/home/yxq/activemq/node-03/bin/activemq start'

或者

生产软链接到 /etc/init.d/ 下

ln -s  /home/yxq/activemq/node-01/bin/activemq   /etc/init.d/

检查 链接的权限是否有执行权限

chmod +x /etc/init.d/activemq

加入管理服务

chkconfig --add  activemq

设置管理 ON状态

chkconfig activemq  on

查看

chkconfig --list activemq

service activemq  start/restart/stop  测试

碰到的问题:

使用service activemq start 提示:

INFO: Loading ‘/etc/default/activemq’

ERROR: Configuration variable JAVA_HOME or JAVACMD is not defined

correctly.

(JAVA_HOME=’ ’ , JAVACMD=’java’)

根据提示的意思是: 无法正确找到 JAVA_HOME 和 JAVACMD

配置服务启动级别和PATH,保证执行启动时,所需执行级别和jdk依赖没有问题

vi  activemq   

# chkconfig 2345 63 67

PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/yxue/jdk/jdk1.8.0_74/bin:/yxue/jdk/jdk1.8.0_74/jre/bin:/root/bin

export PATH

11.demo例子

maven引入:

        <!--ActiveMQ依赖 -->
        <dependency>
             <groupId>org.apache.activemq</groupId>
             <artifactId>activemq-all</artifactId>
             <version>5.13.3</version>
        </dependency>
        <!-- Spring整合ActiveMQ所需依赖 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.1.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-messaging</artifactId>
            <version>4.1.6.RELEASE</version>
        </dependency>

配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

    <!-- ActiveMQ 连接工厂 -->
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    <amq:connectionFactory id="amqConnectionFactory" userName="admin" password="admin" brokerURL="failover:(tcp://192.168.248.129:61616,tcp://192.168.248.128:61616,tcp://192.168.248.127:61616)?randomize=false"   />

    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- 同上,同理 -->
        <!-- <constructor-arg ref="amqConnectionFactory" /> -->
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- Spring JmsTemplate 的消息生产者 start-->
    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="false" />
    </bean>

    <!-- 定义JmsTemplate的Topic类型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <constructor-arg ref="connectionFactory" />
        <!-- pub/sub模型(发布/订阅) -->
        <property name="pubSubDomain" value="true" />
        <!-- 发送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
        <property name="deliveryMode" value="2" />
    </bean>
    <!--Spring JmsTemplate 的消息生产者 end-->

    <!-- 消息消费者 start-->
    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.queue" ref="queueReceiver1"/>
        <jms:listener destination="test.queue" ref="queueReceiver2"/>
    </jms:listener-container>

    <!-- 定义Topic监听器 -->
    <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.topic" ref="topicReceiver1"/>
        <jms:listener destination="test.topic" ref="topicReceiver2"/>
    </jms:listener-container>
    <!-- 消息消费者 end -->
</beans>

QueueSender:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

/**
 * ActiveMQ的消息队列模式
 * 队列消息的生产者,发哦是哪个消息到队列
 */
@Component
public class QueueSender {
    //通过Qualifier来注入对应的Bean
    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;
    /**
     * 发送一条消息到指定的队列(目标)
     * @param queueName 队列名称
     * @param message 消息内容
     */
    public void send(String queueName,final String message){
        jmsTemplate.send(queueName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
}

TopicSender:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

/**
 * ActiveMQ的topic/sub模式
 * Topic生产者发送消息到Topic
 */
@Component
public class TopicSender {
    @Autowired
    @Qualifier("jmsTopicTemplate")
    private JmsTemplate jmsTemplate;
    /**
     * 发送一条消息到指定的队列(目标)
     * @param topicName 队列名称
     * @param message 消息内容
     */
    public void send(String topicName,final String message){
        jmsTemplate.send(topicName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
}

QueueReceiver1:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.stereotype.Component;

/**
 * 消息队列监听器
 *
 */
@Component
public class QueueReceiver1 implements MessageListener {

    public void onMessage(Message message) {
        try {
            System.out.println("QueueReceiver1接收到消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

QueueReceiver2:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.stereotype.Component;

/**
 * 消息队列监听器
 *
 */
@Component
public class QueueReceiver2 implements MessageListener {

    public void onMessage(Message message) {
        try {
            System.out.println("QueueReceiver2接收到消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

TopicReceiver1:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.stereotype.Component;

@Component
public class TopicReceiver1 implements MessageListener{

    public void onMessage(Message message) {
        try {
            System.out.println("TopicReceiver1接收到消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

TopicReceiver2:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.stereotype.Component;

@Component
public class TopicReceiver2 implements MessageListener{

    public void onMessage(Message message) {
        try {
            System.out.println("TopicReceiver2接收到消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

ActivemqController调用:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
@RequestMapping("activemq")
public class ActivemqController {
    @Autowired
    private QueueSender queueSender;
    @Autowired
    private TopicSender topicSender;

    /**
     * 发送消息到队列
     * Queue队列:仅有一个订阅者会收到消息,消息一旦被处理就不会存在队列中
     * @return String
     */
    @ResponseBody
    @RequestMapping("queueSender")
    public String queueSender(){
        String opt = "suc";
        try {
            queueSender.send("test.queue", "=======queue消息");
        } catch (Exception e) {
            opt = e.getCause().toString();
        }
        return opt;
    }

    @ResponseBody
    @RequestMapping("topicSender")
    public String topoicSender(){
        String opt = "suc";
        try {
            topicSender.send("test.topic", "==========topoic消息");
        } catch (Exception e) {
            opt = e.getCause().toString();
        }
        return opt;
    }
}

转自:csdn

作者:java20150326

本文分享自微信公众号 - 互扯程序(chat_routine)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-12-25

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • docker私有仓库搭建,证书认证,鉴权管理

    -Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化。

    互扯程序
  • 基于GitLab+Docker+K8S的持续集成和交付

    此文档主要说明怎样基于GitLab进行持续集成和持续交付,该持续集成与交付集成了gitlab-runner 、mvnw、Docker、harbor、k8s等技术...

    互扯程序
  • MySQL索引实现

    我们上一篇讲了MySQL索引背后的数据结构及算法原理,我们知道了为什么使用索引查询数据效率那么高的原理了,我们接着看看MySQL的索引是如何实现的。

    互扯程序
  • springboot集成ActiveMQ

    麦克劳林
  • zuul集成apollo动态刷新配置

    一笠风雨任生平
  • springCloud Eureca服务提供者Provider的项目

    服务提供者的项目: 本例子是把前面springboot的mybatis例子,几乎不变的拿过来就可以运行了。 package com; import java...

    马克java社区
  • JMS学习之路(一):整合activeMQ到SpringMVC

    JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息...

    肖哥哥
  • 一起来学SpringBoot | 第二十七篇:优雅解决分布式限流

    在前面的两篇文章中,介绍了一些限流的类型和策略,本篇从 SpringBoot、 Redis 应用层面来实现分布式的限流....

    battcn
  • SpringCache实战遇坑

    但你在网上找答案,都是文不对题,或者说其他错误导致相同的报错,反正我是找不到正确的解答

    老梁
  • Spring Boot Web 自定义注解篇(注解很简单很好用)

    自从spring 4.0 开放以后,可以添加很多新特性的注解了。使用系统定义好的注解可以大大方便的提高开发的效率。

    爱撸猫的杰

扫码关注云+社区

领取腾讯云代金券