📌 请关注开源社区Geek-XD
在实际项目开发中,我们经常需要实现服务端与客户端之间的长连接通信,比如用于实时消息推送、设备监控、物联网(IoT)数据上报等场景。相比于 HTTP 短连接,TCP 长连接能显著降低连接建立开销,提升通信效率。
本文将带你深入解析一个使用 Netty 框架 实现的 轻量级 TCP 长连接服务器示例,涵盖:客户端认证、心跳保活、空闲检测、消息回显、广播发送 等核心功能,并提供完整的可运行 Java 代码。
本示例实现了以下核心功能:
功能 | 说明 |
|---|---|
🔐 客户端认证 | 连接后需发送 |
🫀 心跳机制 | 客户端发 |
⏱️ 空闲超时关闭 | 超过设定时间无读操作,自动断开连接 |
💬 消息回显 | 收到普通消息,服务端原样返回前加 |
📣 广播支持 | 可向所有在线客户端发送消息 |
🔄 单点登录 | 同一 |
DelimiterBasedFrameDecoder 处理粘包/拆包问题整个类 TcpLongConnectionServer 是一个 Spring Bean,负责启动 Netty 服务并管理客户端连接。
@Value 注入)@Value("${netty.tcp.enable:true}")
private boolean enable;
@Value("${netty.tcp.port:18888}")
private int port;
@Value("${netty.tcp.bossThreads:1}")
private int bossThreads;
@Value("${netty.tcp.workerThreads:4}")
private int workerThreads;
@Value("${netty.tcp.readerIdleSeconds:120}")
private int readerIdleSeconds;这些配置允许你在 application.yml 中灵活控制:
netty:
tcp:
enable: true
port: 18888
bossThreads: 1
workerThreads: 4
readerIdleSeconds: 120@PostConstruct 方法)@PostConstruct
public void start() throws InterruptedException {
if (!enable) return;
bossGroup = new NioEventLoopGroup(bossThreads);
workerGroup = new NioEventLoopGroup(workerThreads);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
p.addLast(new StringDecoder(CharsetUtil.UTF_8));
p.addLast(new StringEncoder(CharsetUtil.UTF_8));
p.addLast(new IdleStateHandler(readerIdleSeconds, 0, 0));
p.addLast(new TcpLongConnectionHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
serverChannel = future.channel();
log.info("TCP LongConnection Server started at port {}", port);
}组件 | 作用 |
|---|---|
bossGroup | 接收新连接(通常1个线程即可) |
workerGroup | 处理已连接的 IO 读写 |
DelimiterBasedFrameDecoder | 基于换行符 \n 分隔消息帧,解决粘包问题 |
StringDecoder/StringEncoder | 字符串编解码器(UTF-8) |
IdleStateHandler | 检测读空闲,超过 readerIdleSeconds 触发超时 |
TcpLongConnectionHandler这是真正处理客户端消息的核心逻辑。
收到消息
↓
是否为空? → 忽略
↓
是否以 AUTH: 开头?
├─ 是 → 提取 clientId,绑定并返回 AUTH_OK
└─ 否 →
是否为 PING?
├─ 是 → 回复 PONG
└─ 否 →
是否已认证?
├─ 否 → 返回 ERR:UNAUTH 并关闭
└─ 是 → 回显 ECHO:消息if (trim.startsWith("AUTH:")) {
String clientId = trim.substring(5).trim();
if (clientId.isEmpty()) {
ctx.writeAndFlush("ERR:EMPTY_CLIENT_ID\n");
return;
}
bind(clientId, ctx);
ctx.writeAndFlush("AUTH_OK\n");
}if ("PING".equalsIgnoreCase(trim)) {
ctx.writeAndFlush("PONG\n");
return;
}p.addLast(new IdleStateHandler(readerIdleSeconds, 0, 0));配合事件触发:
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent idle) {
if (idle.state() == IdleState.READER_IDLE) {
ctx.writeAndFlush("ERR:IDLE_TIMEOUT\n")
.addListener(ChannelFutureListener.CLOSE);
}
}
}@Override
public void channelInactive(ChannelHandlerContext ctx) {
unbind(ctx); // 清理映射关系
}无论正常断开还是异常断开,都会执行清理。
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.debug("TCP connection error: {}", cause.getMessage());
ctx.close(); // 发生异常时关闭连接
}防止因未捕获异常导致连接泄露。
为了实现“单点登录”和高效查找,使用两个 ConcurrentHashMap:
private static final Map<String, Channel> CLIENTS = new ConcurrentHashMap<>();
private static final Map<String, String> CHANNEL_BINDINGS = new ConcurrentHashMap<>();@Override
private void bind(String clientId, ChannelHandlerContext ctx) {
Channel old = CLIENTS.put(clientId, ctx.channel());
if (old != null && old != ctx.channel()) {
old.writeAndFlush("ERR:KICKED\n")
.addListener(ChannelFutureListener.CLOSE);
}
CHANNEL_BINDINGS.put(ctx.channel().id().asShortText(), clientId);
}⚠️ 如果同一个 clientId 再次连接,会自动踢掉之前的连接!
提供了几个静态方法供其他服务调用:
public static boolean sendToClient(String clientId, String msg) {
Channel ch = CLIENTS.get(clientId);
if (ch == null || !ch.isActive()) return false;
ch.writeAndFlush(msg + "\n");
return true;
}public static int broadcast(String msg) {
int count = 0;
for (Channel ch : CLIENTS.values()) {
if (ch.isActive()) {
ch.writeAndFlush(msg + "\n");
count++;
}
}
return count;
}public static int onlineSize() {
return CLIENTS.size();
}💡 这些方法可在 Controller 或 Service 中直接调用,实现“服务端主动推送”。
@PreDestroy
public void stop() {
try {
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
} finally {
if (bossGroup != null) bossGroup.shutdownGracefully();
if (workerGroup != null) workerGroup.shutdownGracefully();
log.info("TCP LongConnection Server stopped");
}
}功能 | 说明 |
|---|---|
SSL 加密 | 添加 SSLHandler 到 Pipeline |
更复杂协议 | 使用 Protobuf / JSON 替代文本协议 |
集群支持 | 结合Redis 存储 clientId -> server 映射 |
消息确认机制 | 增加 ACK 回执与重试逻辑 |
连接限流 | 使用 ChannelTrafficShapingHandler |
本文详细解析了一个基于 Netty + Spring Boot 的 TCP 长连接服务实现方案。该代码具备:认证机制、心跳保活、空闲超时、单点登录、广播能力和易于集成。非常适合中小型项目作为基础通信模块使用。你只需稍作修改,即可接入自己的业务系统。
📌 请关注开源社区Geek-XD
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。