限流

最近更新时间:2025-02-20 16:38:42

我的收藏

概述

腾讯云消息队列 RocketMQ 版为各类大规模、低延时、高可用性要求的在线业务提供消息服务,客户端通过 SDK 与 RocketMQ 集群建立长连接并进行消息收发,同时消耗集群机器节点的计算、存储、网络带宽等资源。为了提供高质量的消息服务,我们需要控制集群在高并发、大流量情况下的负载水位,以保障系统的稳定性与可靠性。因此,服务端会根据集群规格限制客户端每秒能够发送和消费的最大折算消息条数 (Transaction Per Second/TPS),具体计算规则可参见 计费概述 - 计算规格。为了兼具隔离性和灵活性,发送消息与消费消息的 TPS 配额不共享,同时支持自定义配额比例(默认配额比例为1 : 1 )。


限流行为说明

腾讯云消息队列 RocketMQ 版采用基于快速失败 (Fail-Fast) 限流策略,即当客户端请求速率达到上限后,服务端会立即响应错误。通常在线业务都对响应时间敏感,快速失败可以让客户端感知限流事件并及时介入处理,避免业务消息端到端耗时恶化。
以1000TPS 规格的基础集群为例,假设配额收发 TPS 比例为1:1,相关的触发限流行为说明如下:
说明
发送消息限流
消费消息限流
触发限流情景
所有连接该集群的发送客户端每秒最多可发送折算消息的总和为 500 条,发送速率达到限制后,超限的发送请求会失败。
所有连接该集群的消费客户端每秒最多可消费折算消息的总和为 500 条,消费速率达到限制后,消息的消费延迟会增加。
触发限流时 SDK 日志关键词
Rate of message sending reaches limit, please take a control or upgrade the resource specification。
Rate of message receiving reaches limit, please take a control or upgrade the resource specification。
触发限流时 SDK 重试机制
不同协议的 SDK 处理有差异:
5.x SDK 会根据指数退避策略进行重试发送,最大重试次数可在初始化 Producer 时自定义,默认值为 2 次;达到最大重试次数仍未成功的发送请求会抛出异常。
4.x SDK 直接抛出异常,不会进行重试。
SDK 拉消息线程会自动退避重试。

客户端实践教程

规划集群

腾讯云消息队列 RocketMQ 版集群限流的目的是保障服务稳定可靠,防止在集群高负载时出现服务响应时间变长、请求成功率下降等问题,从而避免业务受损。因此,在您接入腾讯云消息队列 RocketMQ 版时,合理规划集群非常重要,建议您:
依据当前规模和未来趋势预测来充分评估业务 TPS, 如果业务流量具有波动特性,应以峰值 TPS 为准。此外,评估时建议您预留一部分 TPS 配额(例如 30%)来应对可能出现的突发流量。
对稳定性要求较高的业务,建议您使用多套 RocketMQ 集群加强隔离性。例如,将核心链路(如交易系统)与非核心链路(如日志系统)隔离,以及生产环境与开发测试环境进行隔离等。

监控负载

您可以利用腾讯云消息队列 RocketMQ 版控制台的监控告警能力实现对集群负载的实时观测,提前发现 TPS 水位风险并及时操作升配,保证资源充足,避免触发限流。具体操作可参见 监控告警,告警策略建议如下:
发送和消费 TPS 水位超过容量的 70% 时触发告警,提醒进行升配评估。
出现发送限流时触发告警,警告业务发送消息可能失败风险。

示例

以 1000TPS 规格的基础集群为例,TPS 告警策略如下:
TPS 告警配置
TPS 告警配置


代码异常处理

业务代码通过 RocketMQ SDK 发送消息时,需要捕获包括限流错误在内的异常,并保存必要的上下文信息,以便人工介入恢复业务。不同协议的 SDK 重试机制有差异,相关处理示例代码如下:
4.x SDK 不会对限流错误进行自动重试,因此业务代码需要捕获异常并进行处理,示例代码如下:
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;

public class ProducerExample {
private static final InternalLogger log = InternalLoggerFactory.getLogger(ProducerExample.class);

public static void main(
String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {

String nameserver = "Your_Nameserver";
String accessKey = "Your_Access_Key";
String secretKey = "Your_Secret_Key";
String topicName = "Your_Topic_Name";
String producerGroupName = "Your_Producer_Group_Name";

// 实例化消息生产者 Producer
DefaultMQProducer producer = new DefaultMQProducer(
producerGroupName, // 生产者组名字
new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限, accessKey 和 secretKey 可以在控制台集群权限页面获取
);

// 设置 Nameserver 地址, 可以在控制台集群基本信息页面获取
producer.setNamesrvAddr(nameserver);

// 启动 Producer 实例
producer.start();

// 创建消息实例, 设置 topic 和消息内容
Message message = new Message(topicName, "Your_Biz_Body".getBytes(StandardCharsets.UTF_8));

// 最大尝试发送次数, 请根据业务情况设置
final int maxAttempts = 3;
// 重试间隔时间, 请根据业务情况设置
final int retryIntervalMillis = 200;

// 发送消息
int attempt = 0;
do {
try {
SendResult sendResult = producer.send(message);
log.info("Send message successfully, {}", sendResult);
break;
} catch (Throwable t) {
attempt++;
if (attempt >= maxAttempts) {
// 达到最大次数
log.warn("Failed to send message finally, run out of attempt times, attempt={}, maxAttempts={}, msgId={}",
attempt, maxAttempts, MessageClientIDSetter.getUniqID(message), t);
// 记录发送失败的消息 (或记录到其他业务系统, 比如数据库等)
log.warn(message.toString());
break;
}
int waitMillis;
if (t instanceof MQBrokerException && ((MQBrokerException) t).getResponseCode() == 215 /* FLOW_CONTROL */) {
// 限流异常, 采用退避重试
waitMillis = (int) Math.pow(2, attempt - 1) * retryIntervalMillis; // 重试间隔: 200ms, 400ms, ......
} else {
// 其他异常
waitMillis = retryIntervalMillis;
}
log.warn("Failed to send message, will retry after {}ms, attempt={}, maxAttempts={}, msgId={}",
waitMillis, attempt, maxAttempts, MessageClientIDSetter.getUniqID(message), t);
try {
Thread.sleep(waitMillis);
} catch (InterruptedException ignore) {
}
}
}
while (true);

producer.shutdown();
}
}
5.x SDK 会对发送异常进行自动重试,业务代码可以自定义最大重试次数,示例代码如下:
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerExample {
private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);

public static void main(String[] args) throws ClientException, IOException {

String nameserver = "Your_Nameserver";
String accessKey = "Your_Access_Key";
String secretKey = "Your_Secret_Key";
String topicName = "Your_Topic_Name";

// ACL权限, accessKey 和 secretKey 可以在控制台集群权限页面获取
SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);

ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(nameserver) // 设置 NameServer 地址, 可以在控制台集群基本信息页面获取
.setCredentialProvider(sessionCredentialsProvider)
.build();

// 启动 Producer 实例
ClientServiceProvider provider = ClientServiceProvider.loadService();
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topicName) // 预声明消息发送的 topic, 建议设置
.setMaxAttempts(3) // 最大尝试发送次数, 请根据业务情况设置
.build();

// 创建消息实例, 设置 topic 和消息内容
byte[] body = "Your_Biz_Body".getBytes(StandardCharsets.UTF_8);
final Message message = provider.newMessageBuilder()
.setTopic(topicName)
.setBody(body)
.build();

try {
final SendReceipt sendReceipt = producer.send(message);
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (Throwable t) {
log.warn("Failed to send message", t);
// 记录发送失败的消息 (或记录到其他业务系统, 比如数据库等)
log.warn(message.toString());
}

producer.close();
}
}

常见问题

触发限流后会不会丢消息?

发送消息触发限流后服务端不会存储该条消息,客户端需要捕获异常并做降级处理;消费触发限流后会出现消费延迟,但已经发送成功的消息不会丢。

为什么监控页面的 TPS 比消息条数大?

TPS 是折算消息数量,如果业务使用了高级消息(顺序、延迟、事务等)或消息体比较大,那么一条业务消息会被统计为多条折算消息,具体的折算逻辑可参见 计算规格。此外,消息条数指标统计的是一分钟内的秒级平均值,而 TPS 指标统计的是一分钟内的秒级峰值。

集群偶尔出现短暂的消费被限流,是否有影响?

一般没有影响。在客户端重启、服务端重启、控制台扩容主题队列等操作期间,都有可能因为消费组瞬间堆积而触发短暂的消费限流,通常稳定后很快会恢复。

如何判断集群是否出现了限流?

除了通过识别 SDK 发送接口抛出的异常或 SDK 日志记录的信息外,您还可以查看 腾讯云 RocketMQ 控制台 > 监控大盘被限流的生产 TPS(Count/s) 被限流的消费 TPS(Count/s)