ActiveMQ体系结构
机器69和68是由网络代理构建的。68和69的activemq.xml
是相同的,除了代理名称、主机名、IP等相关名称之外。
该体系结构中的主题:
TOPIC_A_ALL_DELIVERED
TOPIC_A_ALL_RECEIVED
TOPIC_A_NN_DELIVERED
,NN=00-23TOPIC_A_NN_RECEIVED
,NN=00-23持续天数:当前为2天,但预期目标为7天。这是如果使用者没有连接回以检索消息,则要等待多长时间才能终止消息。我使用这个config:<timeStampingBrokerPlugin ttlCeiling="172800000" zeroExpirationOverride="172800000"/>
。
持久化存储: kahadb,限制大小为200 is
消息格式为XML,每个消息大小约为1K-3K字节。
Consumers
除了持久地订阅TOPIC_A_ALL_XXXXXX
之外,每个使用者还根据自己的需求可持续地订阅其他主题,但我不知道使用者可以多快地使用这些数据。我也不知道他们是否真的订阅了所有的主题。我只知道有时候有些消费者因为调试代码而停止接收数据,然后再连接回去。
生产者
制片人计划每30分钟运行一次。每当生产者工作时,它只将数据放到一个MQ服务器上。目标MQ服务器依赖于故障转移连接协议。
每次生产者将超过800 K数量的XML消息放到主题中,从TOPIC_A
开始。XML包含标记sit_id
(00-23)和标记direction
(DELIVERED
或RECEIVED
),因此生产者将根据XML中的标记site_id
和direction
将每个XML放到相应的主题上。同时,生产者根据每个TOPIC_A_ALL_XXXXX
中的direction
标记将每个XML放入XML中。
根据上述数据,每天平均信息总量约为76,800,000。
症状
一开始没有任何信息在卡哈德语中。生产者将队列放入具有一个连接的单个MQ的速度高达600~700 msgs/秒。随着数据量的增加,发送消息的速度会减慢。它会慢到每秒3毫希,有时会被卡住。无论何时发生这种情况,都可以观察到以下情况:
上述情况几乎每天都会发生。
如果我在那里等4-6小时,MQ服务器就会回来。生产者可以发送4xx~5xx msgs/秒,然后消费者可以接收数据。
这种循环每天都在进行。我真的不知道如何改善这种情况。有什么建议吗?
activemq.xml
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
<!-- Allows accessing the server log -->
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="activemq-mdmcs02p-node1" dataDirectory="${activemq.data}" persistent="true" useJmx="true" populateJMSXUserID="true">
<destinations>
<topic physicalName="TOPIC_A.00.Delivered" />
<topic physicalName="TOPIC_A.00.Received" />
<topic physicalName="TOPIC_A.01.Delivered" />
<topic physicalName="TOPIC_A.01.Received" />
<topic physicalName="TOPIC_A.02.Delivered" />
<topic physicalName="TOPIC_A.02.Received" />
<topic physicalName="TOPIC_A.03.Delivered" />
<topic physicalName="TOPIC_A.03.Received" />
<topic physicalName="TOPIC_A.04.Delivered" />
<topic physicalName="TOPIC_A.04.Received" />
<topic physicalName="TOPIC_A.05.Delivered" />
<topic physicalName="TOPIC_A.05.Received" />
<topic physicalName="TOPIC_A.06.Delivered" />
<topic physicalName="TOPIC_A.06.Received" />
<topic physicalName="TOPIC_A.07.Delivered" />
<topic physicalName="TOPIC_A.07.Received" />
<topic physicalName="TOPIC_A.08.Delivered" />
<topic physicalName="TOPIC_A.08.Received" />
<topic physicalName="TOPIC_A.09.Delivered" />
<topic physicalName="TOPIC_A.09.Received" />
<topic physicalName="TOPIC_A.10.Delivered" />
<topic physicalName="TOPIC_A.10.Received" />
<topic physicalName="TOPIC_A.11.Delivered" />
<topic physicalName="TOPIC_A.11.Received" />
<topic physicalName="TOPIC_A.12.Delivered" />
<topic physicalName="TOPIC_A.12.Received" />
<topic physicalName="TOPIC_A.13.Delivered" />
<topic physicalName="TOPIC_A.13.Received" />
<topic physicalName="TOPIC_A.14.Delivered" />
<topic physicalName="TOPIC_A.14.Received" />
<topic physicalName="TOPIC_A.15.Delivered" />
<topic physicalName="TOPIC_A.15.Received" />
<topic physicalName="TOPIC_A.16.Delivered" />
<topic physicalName="TOPIC_A.16.Received" />
<topic physicalName="TOPIC_A.17.Delivered" />
<topic physicalName="TOPIC_A.17.Received" />
<topic physicalName="TOPIC_A.18.Delivered" />
<topic physicalName="TOPIC_A.18.Received" />
<topic physicalName="TOPIC_A.19.Delivered" />
<topic physicalName="TOPIC_A.19.Received" />
<topic physicalName="TOPIC_A.20.Delivered" />
<topic physicalName="TOPIC_A.20.Received" />
<topic physicalName="TOPIC_A.21.Delivered" />
<topic physicalName="TOPIC_A.21.Received" />
<topic physicalName="TOPIC_A.22.Delivered" />
<topic physicalName="TOPIC_A.22.Received" />
<topic physicalName="TOPIC_A.23.Delivered" />
<topic physicalName="TOPIC_A.23.Received" />
<topic physicalName="TOPIC_A_ALL.Delivered" />
<topic physicalName="TOPIC_A_ALL.Received" />
</destinations>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="false" memoryLimit="4096mb" enableAudit="false" expireMessagesPeriod="60000" >
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false"/>
</deadLetterStrategy>
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name=">" prefix="TOPIC_A.*" selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
-->
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.11.68:61616)" userName="admin" password="XXXXX" dynamicOnly="true" prefetchSize="1" />
</networkConnectors>
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="true"/>
</managementContext>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="/home/activemq/kahadb"
ignoreMissingJournalfiles="true"
checkForCorruptJournalFiles="true"
checksumJournalFiles="true"
enableJournalDiskSyncs="false"
/>
</persistenceAdapter>
<!--
The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="80 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="40 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!--
<sslContext>
<sslContext
keyStore="file:${activemq.base}conf/broker1.ks" keyStorePassword="P@ssw0rd"
/>
</sslContext>
-->
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=1048576000"/>
<transportConnector name="nio" uri="nio://0.0.0.0:61617?trace=true"/>
<!-- <transportConnector name="ssl" uri="ssl://0.0.0.0:61618?trace=true&transport.enabledProtocols=TLSv1.2"/> -->
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.transformer=jms"/>
<!-- <transportConnector name="amqp+ssl" uri="amqp://0.0.0.0:5673?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.enabledProtocols=TLSv1.2"/> -->
<!-- <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> -->
<!-- <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> -->
<!-- <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> -->
</transportConnectors>
<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
<plugins>
<!-- 86,400,000 ms = 1 day -->
<timeStampingBrokerPlugin ttlCeiling="172800000" zeroExpirationOverride="172800000"/>
<jaasAuthenticationPlugin configuration="activemq" />
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic="TOPIC_A.>" read="mdmsusers" write="mdmsusers" />
<authorizationEntry topic="TOPIC_A.Delivered" read="pwsusers" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="mdmsusers,pwsusers" write="mdmsusers,pwsusers" admin="mdmsusers,pwsusers"/>
</authorizationEntries>
<tempDestinationAuthorizationEntry>
<tempDestinationAuthorizationEntry read="admins" write="admins" admin="admins"/>
</tempDestinationAuthorizationEntry>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>
</broker>
<!--
Enable web consoles, REST and Ajax APIs and demos
The web consoles requires by default login, you can disable this in the jetty.xml file
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>
发布于 2021-02-05 15:01:50
我怀疑您正在遇到某种类型的组合流控制,快速生产者,慢消费者,未决消息限制或无效的客户注册问题,需要解决。甚至可能是ActiveMQ版本中的bug或优化。
建议的步骤:
https://stackoverflow.com/questions/66037625
复制相似问题