小组内使用 MQTT 协议搭建了一个聊天服务器,前天在测大消息(超过5000汉字)时,连接直接变得不可用,后续发送的消息全部都收不到回复。
服务器环境:
Netty :4.1.32.Final
使用的是 Netty 包中自带的 MqttDecoder
客户端: Android
tcpdump
抓了包,发现客户端正常发送,并且所有的包服务端都已经 ack,但是后续服务端没有发回响应,猜测是服务端在大消息的情况下处理失败了。 tcpdump
使用 -nn
打印出ip和端口,-X
打印网络包的内容,也可以使用-w
选项保存到文件里,然后使用 tcpdump
或 wireshark
来分析PUBLISH
类型的消息,但是消息的 class 不为 MqttPublishMessage
, 且 payload 中无数据,但在 Message 中有一个报错消息 too large message: 56234 bytes
MqttDecoder
, 发现 decoder 有最长 payload 限制(以下为部分代码),启动代码里调用的是默认构造函数,因此默认最长数据为 8092
字节。public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;
public MqttDecoder() {
this(DEFAULT_MAX_BYTES_IN_MESSAGE);
}
public MqttDecoder(int maxBytesInMessage) {
super(DecoderState.READ_FIXED_HEADER);
this.maxBytesInMessage = maxBytesInMessage;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
switch (state()) {
case READ_FIXED_HEADER: try {
mqttFixedHeader = decodeFixedHeader(buffer);
bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
checkpoint(DecoderState.READ_VARIABLE_HEADER);
// fall through
} catch (Exception cause) {
out.add(invalidMessage(cause));
return;
}
case READ_VARIABLE_HEADER: try {
final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
variableHeader = decodedVariableHeader.value;
if (bytesRemainingInVariablePart > maxBytesInMessage) {
throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
}
bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
checkpoint(DecoderState.READ_PAYLOAD);
// fall through
} catch (Exception cause) {
out.add(invalidMessage(cause));
return;
}
case READ_PAYLOAD: try {
final Result<?> decodedPayload =
decodePayload(
buffer,
mqttFixedHeader.messageType(),
bytesRemainingInVariablePart,
variableHeader);
bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
if (bytesRemainingInVariablePart != 0) {
throw new DecoderException(
"non-zero remaining payload bytes: " +
bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
}
checkpoint(DecoderState.READ_FIXED_HEADER);
MqttMessage message = MqttMessageFactory.newMessage(
mqttFixedHeader, variableHeader, decodedPayload.value);
mqttFixedHeader = null;
variableHeader = null;
out.add(message);
break;
} catch (Exception cause) {
out.add(invalidMessage(cause));
return;
}
case BAD_MESSAGE:
// Keep discarding until disconnection.
buffer.skipBytes(actualReadableBytes());
break;
default:
// Shouldn't reach here.
throw new Error();
}
}
private MqttMessage invalidMessage(Throwable cause) {
checkpoint(DecoderState.BAD_MESSAGE);
return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
}
}
MqttDecoder
的父类 ReplayingDecoder
有关系,查看源码有详尽的类说明, 在读取可变长度头部时,如果payload 超过了最大限制,那么直接抛出异常。摘出代码如下:case READ_VARIABLE_HEADER: try {
final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
variableHeader = decodedVariableHeader.value;
if (bytesRemainingInVariablePart > maxBytesInMessage) {
throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
}
bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
checkpoint(DecoderState.READ_PAYLOAD);
// fall through
} catch (Exception cause) {
out.add(invalidMessage(cause));
return;
}
在异常处理中,调用了 invalidMessage
方法,这个方法将 状态设为 DecoderState.BAD_MESSAGE
, 在这个状态下,所有的字节都直接被丢弃。
case BAD_MESSAGE:
// Keep discarding until disconnection.
buffer.skipBytes(actualReadableBytes());
break;
也就是说此后的消息都不会进入到业务处理逻辑,这条长连接废掉了。
MqttDecoder
提供了构造函数(不建议使用,这样会增大服务器处理时间和内存负担)