首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

基于Netty的简单多人聊天程序

本篇文章,我们一起来实现一个基于Netty的简单多人聊天示例程序,目的是帮助大家了解Netty在Socket通信方面的典型应用场景。

先来简单介绍一下本示例服务端的基本功能点和执行流程:

1.服务端负责记录客户端的上线、下线信息;

2.服务端负责接收多个客户端的消息内容,保存与客户端的链接信息,并将消息内容广播到所有客户端;

3.多个客户端可以同时接收到某一客户端发出的消息内容(类似QQ群);

4.每个客户端可以接收到其他客户端的加入、离开的信息;

了解了程序的大体功能点之后,接下来就需要考虑具体的实现了。还记得上一篇文章中我们编写的HttpServerHandler类吗?它实现了ChannelHandler接口,这个接口是Netty中极为重要的组件,我们的业务逻辑往往要写在ChannelHandler接口的实现类中,本示例中我们的服务端声明一个ChatServerHandler类,用来实现之前提到的基本功能点。

首先,我们定义ChatServerHandler类,此类同样需要继承SimpleChannelInboundHandler(注意范型String表示服务端接收到的消息类型),由于服务端在接收到某一客户端发送过来的消息后,需要将消息向其他在线的客户端广播,因此ChatServerHandler类中需要包含一个实例变量用于存放所有的链接信息,这个变量需要用到Netty提供的组件ChannelGroup,ChannelGroup是一个接口,创建其实例的代码如下:

//服务端用于存放与多个客户端之间的socket链接

private staticChannelGroupchannelGroup=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);

还记得上篇文章中我们提到的“事件驱动”吗,当客户端链接到服务端时,会产生handlerAdded事件,我们在ChatServerHandler类中添加handlerAdded的事件回调方法:

//客户端与服务端建立好链接时会触发此方法

@Override

public voidhandlerAdded(ChannelHandlerContext ctx)throwsException {

Channel channel = ctx.channel();

//channelGroup的writeAndFlush方法会调用其所包含的所有的channel的writeAndFlush方法

channelGroup.writeAndFlush("【服务器】- "+ channel.remoteAddress() +" 加入\n");

channelGroup.add(channel);//将客户端链接存放到channelGroup中

}

这段代码的作用是服务端将新接入的链接存放到channelGroup中,向所有客户端广播新链接到的客户端信息并在控制台打印链接数。

当客户端与服务端的链接处于活动状态时,会触发channelActive事件,我们在ChatServerHandler添加channelActive事件的回调方法:

//客户端与服务端建立好链接,链接处于活动状态时触发此方法

@Override

public voidchannelActive(ChannelHandlerContext ctx)throwsException {

Channel channel = ctx.channel();

}

此方法表示当有客户端链接到服务端时,服务端会在控制台打印出远程客户端的ip地址:端口 + "上线"(需要强调的是,回调方法的执行时机是handlerAdded方法早于channelActive方法)。

当服务端接收到客户端发来的聊天消息后,会触发channelRead0方法的执行,我们在这个回调方法中处理接收到的消息:

//参数msg表示客户端的消息内容,此类型由SimpleChannelInboundHandler的范型决定

@Override

protected voidchannelRead0(ChannelHandlerContext ctx, String msg)throwsException {

Channel channel = ctx.channel();

Iterator iterator =channelGroup.iterator();

while(iterator.hasNext()) {

Channel ch = iterator.next();

if(channel != ch) {

ch.writeAndFlush(channel.remoteAddress() +" 发送的消息:"+ msg +"\n");

}else{

ch.writeAndFlush("【自己】"+ msg +"\n");

}

}

}

此方法为本示例的重点方法,请注意,当有多个客户端链接到服务端时,handlerAdded方法将会被调用多次,每次调用都会分别将链接存放在channelGroup中,通过迭代channelGroup可以获取到所有客户端的链接。例如:有一个客户端向服务端发送了“你好”这样一个消息,服务端会读到此消息,并通过迭代channelGroup获取到其他客户端的链接,并通过channel的writeAndFlush方法将“你好”这条消息消息分别写到不同的客户端,从而实现群聊的效果。

客户端与服务端的链接失效时,会触发channelInactive事件,我们在ChatServerHandler中添加channelInactive事件的回调方法,当客户端与服务端的链接断开时,服务端在控制台上打印ip地址:端口 + "下线"。

@Override

public voidchannelInactive(ChannelHandlerContext ctx)throwsException {

Channel channel = ctx.channel();

}

当客户端与服务端的链接断开后,会触发handlerRemoved事件,我们在ChatServerHandler类中添加handlerRemoved事件的回调方法:

//链接彻底断开时触发此方法

@Override

public voidhandlerRemoved(ChannelHandlerContext ctx)throwsException {

Channel channel = ctx.channel();

channelGroup.writeAndFlush("【服务器】- "+ channel.remoteAddress() +" 离开\n");

//channelGroup自动移除断掉的channel

}

另外还有一个事件回调方法需要注意,那就是异常捕获方法,当ChatServerHandler类中的方法产生异常时,exceptionCaught事件将被触发:

@Override

public voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause)throwsException {

cause.printStackTrace();

ctx.close();

}

通常情况下这个方法需要通过ctx.close()方法关闭网络连接。至此,我们ChatServerHandler类就编写完成了(写法可以参考上一篇文章中的HttpServerHandler类的实现方式)。

接下来我们定义ChannelHandler的初始化容器ChannelInitializer类的子类ChatServerInitializer。如果记不清ChannelInitializer的作用,请阅读上一篇文章。

public classChatServerInitializerextendsChannelInitializer {

@Override

protected voidinitChannel(SocketChannel ch)throwsException {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(newDelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));

pipeline.addLast(newStringDecoder(CharsetUtil.UTF_8));

pipeline.addLast(newStringEncoder(CharsetUtil.UTF_8));

pipeline.addLast(newChatServerHandler());

}

}

与我们上一篇定义的HttpServerInitializer不同之处仅在于我们使用了不同的编解码器,DelimiterBasedFrameDecoder解码器会按照指定的分隔符如"\r"或"\n"来判断是否接收了一条完整消息,StringDecoder和StringEncoder理解起来相对容易些,本示例中通过指定utf-8字符集来对消息内容进行编解码(channelRead0方法的第二个参数类型是String,是实际消息内容,这些内容经过了StringDecoder解码器处理,当消息从服务端广播到客户端时,会通过StringEncoder编码器处理),最后将我们定义的ChatServerHandler类也加入到ChannelPipeline中。

public classChatServer {

public static voidmain(String[] args)throwsException {

EventLoopGroup bossGroup =newNioEventLoopGroup();

EventLoopGroup workerGroup =newNioEventLoopGroup();

try{

ServerBootstrap serverBootstrap =newServerBootstrap();

serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).

childHandler(newChatServerInitializer());

ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();

channelFuture.channel().closeFuture().sync();

}finally{

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

}

}

至此,服务端的代码全部编写完毕,总结起来您可能会发现,服务端的启动类ChatServer和ChannelHandler的初始化器ChatServerInitializer同上一篇文章中的对应组件几乎没有变化,变化最大的是我们自定义的处理器类,本示例我们在处理器类ChatServerHandler中引入了若干个回调方法,用于响应不同的网络事件,将这些事件进行有机结合,进而实现了聊天的业务逻辑,在下一篇文章中我们将介绍如何编写聊天程序客户端的功能。也许此刻您依然对Channel,ChannelHandler,ChannelInitializer这些组件的逻辑关系存在迷惑,先不要着急,慢慢来,我们下篇文章见。

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180209G01KDP00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券