首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >基于 Netty 实现的 TCP 长连接服务详解

基于 Netty 实现的 TCP 长连接服务详解

原创
作者头像
用户11821718
发布2025-09-09 13:48:57
发布2025-09-09 13:48:57
3400
举报

基于 Netty 实现的 TCP 长连接服务详解

📌 请关注开源社区Geek-XD

在实际项目开发中,我们经常需要实现服务端与客户端之间的长连接通信,比如用于实时消息推送、设备监控、物联网(IoT)数据上报等场景。相比于 HTTP 短连接,TCP 长连接能显著降低连接建立开销,提升通信效率。

本文将带你深入解析一个使用 Netty 框架 实现的 轻量级 TCP 长连接服务器示例,涵盖:客户端认证、心跳保活、空闲检测、消息回显、广播发送 等核心功能,并提供完整的可运行 Java 代码。


✅ 功能概览

本示例实现了以下核心功能:

功能

说明

🔐 客户端认证

连接后需发送 AUTH:clientId 认证,否则断开

🫀 心跳机制

客户端发 PING,服务端回复 PONG

⏱️ 空闲超时关闭

超过设定时间无读操作,自动断开连接

💬 消息回显

收到普通消息,服务端原样返回前加 ECHO:

📣 广播支持

可向所有在线客户端发送消息

🔄 单点登录

同一 clientId 再次连接时踢掉旧连接


🧰 技术栈与依赖

  • Netty 4.1+(NIO 网络框架)
  • Spring Boot(用于配置注入和生命周期管理)
  • Java 8+
  • 使用 DelimiterBasedFrameDecoder 处理粘包/拆包问题
  • 日志使用 SLF4J + Logback

📦 核心代码结构说明

整个类 TcpLongConnectionServer 是一个 Spring Bean,负责启动 Netty 服务并管理客户端连接。

1. 配置项(通过 @Value 注入)

代码语言:java
复制
@Value("${netty.tcp.enable:true}")
private boolean enable;

@Value("${netty.tcp.port:18888}")
private int port;

@Value("${netty.tcp.bossThreads:1}")
private int bossThreads;

@Value("${netty.tcp.workerThreads:4}")
private int workerThreads;

@Value("${netty.tcp.readerIdleSeconds:120}")
private int readerIdleSeconds;

这些配置允许你在 application.yml 中灵活控制:

代码语言:yml
复制
netty:
  tcp:
    enable: true
    port: 18888
    bossThreads: 1
    workerThreads: 4
    readerIdleSeconds: 120

2. 启动服务( @PostConstruct 方法)

代码语言:java
复制
@PostConstruct
public void start() throws InterruptedException {
    if (!enable) return;

    bossGroup = new NioEventLoopGroup(bossThreads);
    workerGroup = new NioEventLoopGroup(workerThreads);

    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childOption(ChannelOption.SO_KEEPALIVE, true)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
                     p.addLast(new StringDecoder(CharsetUtil.UTF_8));
                     p.addLast(new StringEncoder(CharsetUtil.UTF_8));
                     p.addLast(new IdleStateHandler(readerIdleSeconds, 0, 0));
                     p.addLast(new TcpLongConnectionHandler());
                 }
             });

    ChannelFuture future = bootstrap.bind(port).sync();
    serverChannel = future.channel();
    log.info("TCP LongConnection Server started at port {}", port);
}

🔍 关键组件解释:

组件

作用

bossGroup

接收新连接(通常1个线程即可)

workerGroup

处理已连接的 IO 读写

DelimiterBasedFrameDecoder

基于换行符 \n 分隔消息帧,解决粘包问题

StringDecoder/StringEncoder

字符串编解码器(UTF-8)

IdleStateHandler

检测读空闲,超过 readerIdleSeconds 触发超时


🧠 核心处理器:TcpLongConnectionHandler

这是真正处理客户端消息的核心逻辑。

🔄 消息处理流程图

代码语言:txt
复制
收到消息
   ↓
是否为空? → 忽略
   ↓
是否以 AUTH: 开头?
   ├─ 是 → 提取 clientId,绑定并返回 AUTH_OK
   └─ 否 →
       是否为 PING?
         ├─ 是 → 回复 PONG
         └─ 否 →
              是否已认证?
                ├─ 否 → 返回 ERR:UNAUTH 并关闭
                └─ 是 → 回显 ECHO:消息

✅ 认证逻辑(AUTH:clientId)

代码语言:java
复制
if (trim.startsWith("AUTH:")) {
    String clientId = trim.substring(5).trim();
    if (clientId.isEmpty()) {
        ctx.writeAndFlush("ERR:EMPTY_CLIENT_ID\n");
        return;
    }
    bind(clientId, ctx);
    ctx.writeAndFlush("AUTH_OK\n");
}
  • 协议格式:AUTH:your_client_id\n undefined
  • 成功后调用 bind() 方法注册客户端

🫀 心跳机制(PING/PONG)

代码语言:java
复制
if ("PING".equalsIgnoreCase(trim)) {
    ctx.writeAndFlush("PONG\n");
    return;
}
  • 客户端每间隔一段时间发送 PINGundefined
  • 服务端立即响应 PONG,保持连接活跃
⏱️ 空闲超时检测
代码语言:java
复制
p.addLast(new IdleStateHandler(readerIdleSeconds, 0, 0));

配合事件触发:

代码语言:java
复制
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
    if (evt instanceof IdleStateEvent idle) {
        if (idle.state() == IdleState.READER_IDLE) {
            ctx.writeAndFlush("ERR:IDLE_TIMEOUT\n")
               .addListener(ChannelFutureListener.CLOSE);
        }
    }
}
  • 若客户端在 readerIdleSeconds 秒内未发送任何消息,则判定为空闲。undefined
  • 服务端发送超时提示并主动关闭连接。

🚪 客户端断开处理

代码语言:java
复制
@Override
public void channelInactive(ChannelHandlerContext ctx) {
    unbind(ctx); // 清理映射关系
}

无论正常断开还是异常断开,都会执行清理。


❌ 异常处理

代码语言:java
复制
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    log.debug("TCP connection error: {}", cause.getMessage());
    ctx.close(); // 发生异常时关闭连接
}

防止因未捕获异常导致连接泄露。


🔗 客户端绑定管理(双 Map 映射)

为了实现“单点登录”和高效查找,使用两个 ConcurrentHashMap:

代码语言:java
复制
private static final Map<String, Channel> CLIENTS = new ConcurrentHashMap<>();
private static final Map<String, String> CHANNEL_BINDINGS = new ConcurrentHashMap<>();
  • CLIENTS: clientId → Channel,用于快速向指定客户端发消息
  • CHANNEL_BINDINGS: channelId → clientId,用于连接断开时反向查找✅ 绑定逻辑(支持踢人)
代码语言:java
复制
@Override
private void bind(String clientId, ChannelHandlerContext ctx) {
    Channel old = CLIENTS.put(clientId, ctx.channel());
    if (old != null && old != ctx.channel()) {
        old.writeAndFlush("ERR:KICKED\n")
           .addListener(ChannelFutureListener.CLOSE);
    }
    CHANNEL_BINDINGS.put(ctx.channel().id().asShortText(), clientId);
}

⚠️ 如果同一个 clientId 再次连接,会自动踢掉之前的连接!

📢 外部调用接口(静态方法)

提供了几个静态方法供其他服务调用:

1.向指定客户端发送消息
代码语言:java
复制
public static boolean sendToClient(String clientId, String msg) {
    Channel ch = CLIENTS.get(clientId);
    if (ch == null || !ch.isActive()) return false;
    ch.writeAndFlush(msg + "\n");
    return true;
}
2. 广播给所有在线客户端
代码语言:java
复制
public static int broadcast(String msg) {
    int count = 0;
    for (Channel ch : CLIENTS.values()) {
        if (ch.isActive()) {
            ch.writeAndFlush(msg + "\n");
            count++;
        }
    }
    return count;
}
3. 获取当前在线人数
代码语言:java
复制
public static int onlineSize() {
    return CLIENTS.size();
}

💡 这些方法可在 Controller 或 Service 中直接调用,实现“服务端主动推送”。

🛑 优雅关闭(@PreDestroy)

代码语言:java
复制
@PreDestroy
public void stop() {
    try {
        if (serverChannel != null) {
            serverChannel.close().syncUninterruptibly();
        }
    } finally {
        if (bossGroup != null) bossGroup.shutdownGracefully();
        if (workerGroup != null) workerGroup.shutdownGracefully();
        log.info("TCP LongConnection Server stopped");
    }
}

📈 应用场景建议

  • ✅ 物联网设备状态上报与指令下发
  • ✅ 实时聊天系统(可扩展为 WebSocket)
  • ✅ 在线客服系统
  • ✅ 实时监控平台(如车辆、传感器)
  • ✅ 游戏服务器底层通信

🚀 扩展建议

功能

说明

SSL 加密

添加 SSLHandler 到 Pipeline

更复杂协议

使用 Protobuf / JSON 替代文本协议

集群支持

结合Redis 存储 clientId -> server 映射

消息确认机制

增加 ACK 回执与重试逻辑

连接限流

使用 ChannelTrafficShapingHandler

📎 总结

本文详细解析了一个基于 Netty + Spring Boot 的 TCP 长连接服务实现方案。该代码具备:认证机制、心跳保活、空闲超时、单点登录、广播能力和易于集成。非常适合中小型项目作为基础通信模块使用。你只需稍作修改,即可接入自己的业务系统。

📌 请关注开源社区Geek-XD

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基于 Netty 实现的 TCP 长连接服务详解
    • ✅ 功能概览
    • 🧰 技术栈与依赖
    • 📦 核心代码结构说明
      • 1. 配置项(通过 @Value 注入)
      • 2. 启动服务( @PostConstruct 方法)
    • 🔍 关键组件解释:
    • 🧠 核心处理器:TcpLongConnectionHandler
      • 🔄 消息处理流程图
      • ✅ 认证逻辑(AUTH:clientId)
      • 🫀 心跳机制(PING/PONG)
      • 🚪 客户端断开处理
      • ❌ 异常处理
    • 🔗 客户端绑定管理(双 Map 映射)
      • 📢 外部调用接口(静态方法)
      • 🛑 优雅关闭(@PreDestroy)
    • 📈 应用场景建议
    • 🚀 扩展建议
    • 📎 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档