首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >ActiveMQ在大量消息(数十毫安)上的性能较差

ActiveMQ在大量消息(数十毫安)上的性能较差
EN

Stack Overflow用户
提问于 2021-02-04 00:14:40
回答 1查看 894关注 0票数 0
  • CentOS 7
  • 4芯,16 G RAM,500 SSD
  • ActiveMQ 5.15.4
  • OpenJDK运行时环境AdoptOpenJDK (构建11.0.10+9)

ActiveMQ体系结构

机器69和68是由网络代理构建的。68和69的activemq.xml是相同的,除了代理名称、主机名、IP等相关名称之外。

该体系结构中的主题:

  • TOPIC_A_ALL_DELIVERED
  • TOPIC_A_ALL_RECEIVED
  • TOPIC_A_NN_DELIVERED,NN=00-23
  • TOPIC_A_NN_RECEIVED,NN=00-23

持续天数:当前为2天,但预期目标为7天。这是如果使用者没有连接回以检索消息,则要等待多长时间才能终止消息。我使用这个config:<timeStampingBrokerPlugin ttlCeiling="172800000" zeroExpirationOverride="172800000"/>

持久化存储: kahadb,限制大小为200 is

消息格式为XML,每个消息大小约为1K-3K字节。

Consumers

除了持久地订阅TOPIC_A_ALL_XXXXXX之外,每个使用者还根据自己的需求可持续地订阅其他主题,但我不知道使用者可以多快地使用这些数据。我也不知道他们是否真的订阅了所有的主题。我只知道有时候有些消费者因为调试代码而停止接收数据,然后再连接回去。

生产者

制片人计划每30分钟运行一次。每当生产者工作时,它只将数据放到一个MQ服务器上。目标MQ服务器依赖于故障转移连接协议。

每次生产者将超过800 K数量的XML消息放到主题中,从TOPIC_A开始。XML包含标记sit_id (00-23)和标记direction (DELIVEREDRECEIVED),因此生产者将根据XML中的标记site_iddirection将每个XML放到相应的主题上。同时,生产者根据每个TOPIC_A_ALL_XXXXX中的direction标记将每个XML放入XML中。

根据上述数据,每天平均信息总量约为76,800,000

症状

一开始没有任何信息在卡哈德语中。生产者将队列放入具有一个连接的单个MQ的速度高达600~700 msgs/秒。随着数据量的增加,发送消息的速度会减慢。它会慢到每秒3毫希,有时会被卡住。无论何时发生这种情况,都可以观察到以下情况:

  1. 在htop中,一个活动进程持续消耗100%的CPU (有时高达200%)。

  1. 空闲内存正常(至少4~8GB)
  2. 消费者从两台MQ服务器上都得不到任何信息。

上述情况几乎每天都会发生。

如果我在那里等4-6小时,MQ服务器就会回来。生产者可以发送4xx~5xx msgs/秒,然后消费者可以接收数据。

这种循环每天都在进行。我真的不知道如何改善这种情况。有什么建议吗?

activemq.xml

代码语言:javascript
运行
复制
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />

   <!-- Allows accessing the server log -->
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="activemq-mdmcs02p-node1" dataDirectory="${activemq.data}" persistent="true" useJmx="true" populateJMSXUserID="true">
        <destinations>

        
        <topic physicalName="TOPIC_A.00.Delivered" />
        <topic physicalName="TOPIC_A.00.Received" />
        <topic physicalName="TOPIC_A.01.Delivered" />
        <topic physicalName="TOPIC_A.01.Received" />
        <topic physicalName="TOPIC_A.02.Delivered" />
        <topic physicalName="TOPIC_A.02.Received" />
        <topic physicalName="TOPIC_A.03.Delivered" />
        <topic physicalName="TOPIC_A.03.Received" />
        <topic physicalName="TOPIC_A.04.Delivered" />
        <topic physicalName="TOPIC_A.04.Received" />
        <topic physicalName="TOPIC_A.05.Delivered" />
        <topic physicalName="TOPIC_A.05.Received" />
        <topic physicalName="TOPIC_A.06.Delivered" />
        <topic physicalName="TOPIC_A.06.Received" />
        <topic physicalName="TOPIC_A.07.Delivered" />
        <topic physicalName="TOPIC_A.07.Received" />
        <topic physicalName="TOPIC_A.08.Delivered" />
        <topic physicalName="TOPIC_A.08.Received" />
        <topic physicalName="TOPIC_A.09.Delivered" />
        <topic physicalName="TOPIC_A.09.Received" />
        <topic physicalName="TOPIC_A.10.Delivered" />
        <topic physicalName="TOPIC_A.10.Received" />
        <topic physicalName="TOPIC_A.11.Delivered" />
        <topic physicalName="TOPIC_A.11.Received" />
        <topic physicalName="TOPIC_A.12.Delivered" />
        <topic physicalName="TOPIC_A.12.Received" />
        <topic physicalName="TOPIC_A.13.Delivered" />
        <topic physicalName="TOPIC_A.13.Received" />
        <topic physicalName="TOPIC_A.14.Delivered" />
        <topic physicalName="TOPIC_A.14.Received" />
        <topic physicalName="TOPIC_A.15.Delivered" />
        <topic physicalName="TOPIC_A.15.Received" />
        <topic physicalName="TOPIC_A.16.Delivered" />
        <topic physicalName="TOPIC_A.16.Received" />
        <topic physicalName="TOPIC_A.17.Delivered" />
        <topic physicalName="TOPIC_A.17.Received" />
        <topic physicalName="TOPIC_A.18.Delivered" />
        <topic physicalName="TOPIC_A.18.Received" />
        <topic physicalName="TOPIC_A.19.Delivered" />
        <topic physicalName="TOPIC_A.19.Received" />
        <topic physicalName="TOPIC_A.20.Delivered" />
        <topic physicalName="TOPIC_A.20.Received" />
        <topic physicalName="TOPIC_A.21.Delivered" />
        <topic physicalName="TOPIC_A.21.Received" />
        <topic physicalName="TOPIC_A.22.Delivered" />
        <topic physicalName="TOPIC_A.22.Received" />
        <topic physicalName="TOPIC_A.23.Delivered" />
        <topic physicalName="TOPIC_A.23.Received" />
        <topic physicalName="TOPIC_A_ALL.Delivered" />
        <topic physicalName="TOPIC_A_ALL.Received" />
        
        </destinations>

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="false" memoryLimit="4096mb" enableAudit="false" expireMessagesPeriod="60000" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->

                    <deadLetterStrategy>
                        <sharedDeadLetterStrategy processExpired="false"/>
                    </deadLetterStrategy>
 
                    <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                    </pendingMessageLimitStrategy>

                    <networkBridgeFilterFactory> 
                        <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/> 
                    </networkBridgeFilterFactory>

                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

    <!--
        <destinationInterceptors>
            <virtualDestinationInterceptor>
                <virtualDestinations>
                    <virtualTopic name=">" prefix="TOPIC_A.*" selectorAware="false"/>
                </virtualDestinations>
            </virtualDestinationInterceptor>
        </destinationInterceptors>
     -->

    <networkConnectors>
        <networkConnector uri="static:(tcp://192.168.11.68:61616)" userName="admin" password="XXXXX" dynamicOnly="true" prefetchSize="1" />
    </networkConnectors>

        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="true"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="/home/activemq/kahadb"
                    ignoreMissingJournalfiles="true"
                    checkForCorruptJournalFiles="true"
                    checksumJournalFiles="true"
                    enableJournalDiskSyncs="false"
             />
        </persistenceAdapter>


          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="80 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="40 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
        <sslContext>
            <sslContext 
                keyStore="file:${activemq.base}conf/broker1.ks" keyStorePassword="P@ssw0rd"
             />
        </sslContext>
         -->

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=1048576000"/>
            <transportConnector name="nio" uri="nio://0.0.0.0:61617?trace=true"/>
            <!-- <transportConnector name="ssl" uri="ssl://0.0.0.0:61618?trace=true&amp;transport.enabledProtocols=TLSv1.2"/> -->

            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;transport.transformer=jms"/>
            <!-- <transportConnector name="amqp+ssl" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;transport.enabledProtocols=TLSv1.2"/> -->
            <!-- <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> -->
            <!-- <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> -->
            <!-- <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> -->
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

        <plugins>
            <!-- 86,400,000 ms = 1 day -->
            <timeStampingBrokerPlugin ttlCeiling="172800000" zeroExpirationOverride="172800000"/>

            <jaasAuthenticationPlugin configuration="activemq" />

            <authorizationPlugin>
                <map>
                    <authorizationMap>
                        <authorizationEntries>
                            <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
                            <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />

                            <authorizationEntry topic="TOPIC_A.>" read="mdmsusers" write="mdmsusers" />
                            <authorizationEntry topic="TOPIC_A.Delivered" read="pwsusers" />
                            <authorizationEntry topic="ActiveMQ.Advisory.>" read="mdmsusers,pwsusers" write="mdmsusers,pwsusers" admin="mdmsusers,pwsusers"/>
                        </authorizationEntries>
                        <tempDestinationAuthorizationEntry> 
                            <tempDestinationAuthorizationEntry read="admins" write="admins" admin="admins"/> 
                        </tempDestinationAuthorizationEntry> 
                    </authorizationMap>
                </map>
            </authorizationPlugin>
        </plugins>

    </broker>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web consoles requires by default login, you can disable this in the jetty.xml file

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>
EN

回答 1

Stack Overflow用户

发布于 2021-02-05 15:01:50

我怀疑您正在遇到某种类型的组合流控制,快速生产者,慢消费者,未决消息限制或无效的客户注册问题,需要解决。甚至可能是ActiveMQ版本中的bug或优化。

建议的步骤:

  1. 升级到最新的5.15.x。有很多修复程序,并且对那么旧的构建进行故障排除是没有意义的。
  2. 在目的地上启用用于咨询信息advisoryForFastProducer和advisoryForSlowConsumer。这将为您提供一个ActiveMQ.Advisory主题,以显示这些场景何时发生。
  3. 调查客户端和连接,确保它们都在clientId和suscriptionName中正确注册,以获得持久的订阅。记住:两个连接不能共享clientId+subscriptioName
  4. 考虑搬到虚拟主题。这是将消息发送到主题和使用者从队列读取信息的地方。对于流动所发生的事情,有更多的灵活性和可见度。此外,它还能让多个经纪公司直接分享订阅。
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66037625

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档