它们的常用方法有:
- group:设置线程组
- channel:指定通道的实现类
- option:给channel添加配置
- childOption:给接收到的channel添加配置
- handler:设置bossGroup的handler
- childHandler:设置workGroup的handler
- addFirst:把handler添加到链表第一个位置
- addLast:把handler添加到链表的最后一个位置
之前说过用NIO实现聊天室,现在来看看用netty如何实现聊天室。这里我将新建两个maven项目,一个服务端,一个客户端,最后可以打成jar包,服务端jar包运行在你电脑上,客户端jar包自己跑一份,还可以发给你的同事,然后就可以愉快的聊天了。
1、服务端:
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.50.Final</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.zhusl.study.chatroom.NettyChatRoomClient</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
客户端的pom.xml唯一的区别就是 <mainClass>换成了客户端启动类。
public class NettyChatRoomServer {
public void run () throws Exception {
// 创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new NettyChatRoomServerInitializer());
// 监听端口
ChannelFuture cf = bootstrap.bind(6666).sync();
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
NettyChatRoomServer ncrs = new NettyChatRoomServer();
ncrs.run();
}
}
public class NettyChatRoomServerInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decode",new StringDecoder());//解码器
pipeline.addLast("encode",new StringEncoder());//编码器
pipeline.addLast("handler",new NettyChatRoomServerHandler());
}
}
public class NettyChatRoomServerHandler extends SimpleChannelInboundHandler<String>{
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 当有channel加入时执行该方法(即当有客户端连接时)
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
System.out.println("【" + ip + "】" + "进入聊天室");
for (Channel channel : channelGroup) {
// 给别的客户端提醒:xxx加入群聊
if (ctx.channel() != channel) {
channel.writeAndFlush("【" + ip + "】" + "进入聊天室");
}
}
// 将当前channel加入到channelGroup中
channelGroup.add(ctx.channel());
}
/**
* 当有channel删除时执行该方法(即客户端断开连接)
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
System.out.println("【" + ip + "】" + "离开聊天室");
for (Channel channel : channelGroup) {
// 给别的客户端提醒:xxx加入群聊
if (ctx.channel() != channel) {
channel.writeAndFlush("【" + ip + "】" + "离开聊天室");
}
}
// 这里不需要channelGroup.remove,会自动remove
}
/**
* 当有数据读取时执行该方法
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
System.out.println("【" + ip + "】" + ":" + msg);
for (Channel channel : channelGroup) {
// 将消息转发给别的客户端
if (ctx.channel() != channel) {
channel.writeAndFlush("【" + ip + "】" + ":" + msg);
} else {
channel.writeAndFlush("【我】:" + msg);
}
}
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
2、客户端:
public class NettyChatRoomClient {
@SuppressWarnings("resource")
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new NettyChatRoomClientInitializer());
// 连接服务器
ChannelFuture channelFuture = bootstrap.connect("192.168.2.36", 7777).sync();
Channel channel = channelFuture.channel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
channel.writeAndFlush(msg);
}
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
NettyChatRoomClient ncrc = new NettyChatRoomClient();
ncrc.run();
}
}
public class NettyChatRoomClientInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decode",new StringDecoder());//解码器
pipeline.addLast("encode",new StringEncoder());
pipeline.addLast("handler",new NettyChatRoomClientHandler());
}
}
public class NettyChatRoomClientHandler extends SimpleChannelInboundHandler<String>{
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 将从服务端接收到的消息打印出来
System.out.println(msg);
}
}
客户端与服务端连接是否正常,需要有一个机制来检测,Netty提供了心跳检测机制。 1、Netty心跳检测案例:
客户端和以前一样,没有变换,主要是服务端加了日志handler以及childHandler重写了一个用于检测心跳的方法userEventTriggered,服务端代码如下:
public class HeartBeatServer {
public static void main(String[] args) throws Exception {
// 1. 创建boss group (boss group和work group含有的子线程数默认是cpu数 * 2)
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 2. 创建work group
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 3. 创建服务端启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 4. 配置启动参数
bootstrap.group(bossGroup, workGroup) // 设置两个线程组
.channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作为服务器的通道
.handler(new LoggingHandler(LogLevel.INFO)) // 日志处理器
.childHandler(new ChannelInitializer<SocketChannel>() { // 创建通道初始化对象
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
sc.pipeline().addLast(new HeartBeatServerHandler());
}
});
// 5. 启动服务器并绑定端口
ChannelFuture cf = bootstrap.bind(6666).sync();
// 6. 对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
String info = null;
switch (event.state()) {
case READER_IDLE:
info = "读空闲";
break;
case WRITER_IDLE:
info = "写空闲";
break;
case ALL_IDLE:
info = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + ":" + info);
}
}
}
1、http协议和websocket协议的区别:
2、案例代码:
public class WebSocketServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup) // 设置两个线程组
.channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作为服务器的通道
.handler(new LoggingHandler(LogLevel.INFO)) // 日志处理器
.childHandler(new ChannelInitializer<SocketChannel>() { // 创建通道初始化对象
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new HttpServerCodec()); // 使用http的编码解码器
sc.pipeline().addLast(new ChunkedWriteHandler()); // 是以块方式写,添加ChunkedWriteHandler处理器
sc.pipeline().addLast(new HttpObjectAggregator(8192)); // http数据在传输的时候是分段的,用这个处理器就可聚集分段
// 请求的url就是:ws://localhost:6666/hello
sc.pipeline().addLast(new WebSocketServerProtocolHandler("/hello"));
sc.pipeline().addLast(new WebSocketServerHandler());
}
});
ChannelFuture cf = bootstrap.bind(80).sync();
System.out.println("服务端准备好了");
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服务器收到消息:" + msg.text());
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间:" + LocalDateTime.now()) + ",msg:" + msg.text());
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded被调用:" + ctx.channel().id().asLongText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved被调用:" + ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught被调用:" + ctx.channel().id().asLongText());
ctx.close();
}
}
然后编写一个页面,用来发送websocket请求:
<body>
<script type="text/javascript">
var socket;
if(window.WebSocket) {
socket = new WebSocket("ws://127.0.0.1/hello");
// 接收服务器端返回的消息,显示在responseText中
socket.onmessage = function(ev){
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + ev.data;
}
// 相当于开启连接
socket.onopen = function(ev){
var rt = document.getElementById("responseText");
rt.value = "连接开启了";
}
// 连接关闭
socket.onclose = function(ev){
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "连接关闭了";
}
} else {
alert("浏览器不支持websocket");
}
function send(message){
if (!window.socket){
return;
}
if (socket.readyState == WebSocket.OPEN){
socket.send(message);
} else {
alert("连接没有开启");
}
}
</script>
<form onsubmit="return false">
<textarea name="message" style="height:300px;width: 300px"></textarea>
<input type="button" value="发送消息" onclick="send(this.form.message.value)">
<textarea id="responseText" style="height:300px;width: 300px"></textarea>
</form>
</body>
访问这个页面,服务端启动或者关闭会在框框中显示出来,同样,如果客户端关闭,服务端也会在控制台打印出来。
1、编解码问题: 数据在网络中是以二进制字节码传输的,发送的数据需要编码,服务端收到后需要解码。Netty提供了StringDecoder、ObjectDecoder,底层采用的是java序列化技术,java序列化本身效率较低,而且无法跨语言,所以就有了protobuf。
2、protobuf简介: 它是Google的开源项目,轻便高效的结构化数据存储格式,可用于数据的序列化,且支持跨平台跨语言,很适合做数据存储和RPC数据交换格式。
3、protobuf的使用:
下面开始编码:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>
syntax = "proto3"; // 版本
option java_outer_classname = "StudentPOJO"; // 外部类名
message Student { // 内部类名
int32 id = 1; // 1不是值,而是序号
string name = 2;
}
protoc.exe --java_out=. Student.proto
执行完后会在protoc.exe所在目录生成一个StudentPOJO.java文件,这就是我们要的文件,复制到项目中。
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client:" + ctx);
// 发送student对象到服务端
StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(666).setName("张三").build();
ctx.writeAndFlush(student);
}
NettyClient中添加protobuf的编码器:
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 加入protobuf的编码器
sc.pipeline().addLast("encoder", new ProtobufEncoder());
sc.pipeline().addLast(new NettyClientHandler());
}
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 加入解码器,指定解码对象
sc.pipeline().addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
// 传入自定义的handler
sc.pipeline().addLast(new NettyServerHandler());
// 在这里,可以将SocketChannel sc保存到集合中,别的线程拿到集合就可以调用channel的方法了
}
NettyServerHandler中读取student对象:
// 读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 读取客户端发送的student
StudentPOJO.Student student = (Student) msg;
System.out.println("客户端发送的数据是:id=" + student.getId() + ", name=" + student.getName());
}
启动服务端,再启动客户端,就可以看到服务端后台打印出了如下信息:
客户端发送的数据是:id=666, name=张三