同步阻塞bio:链接数目较少
public static void main(String args[]) throws IOException {
ExecutorService pool = ThreadPool.getCachedThreadPool();
ServerSocket socket = new ServerSocket(6666);
System.out.println("服务器启动....");
while(true){
final Socket client = socket.accept();
System.out.println("this is a client");
pool.execute(new Runnable() {
@Override
public void run() {
handler(client);
}
});
}
}
public static void handler(Socket socket){
System.out.print(String.format("线程id: %s 线程name: %s",Thread.currentThread().getId(),Thread.currentThread().getName()));
byte[] bytes = new byte[1024];
try (InputStream inputStream = socket.getInputStream()) {
while(true){
int read = inputStream.read(bytes);
if(read!=-1){
System.out.print(new String(bytes,0,read));
}else {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
buffer的读写都依赖这四个属性,
String str = "hello world";
try (FileOutputStream stream = new FileOutputStream("d:/desktop/1.txt")) {
FileChannel channel = stream.getChannel();
ByteBuffer buffer1 = ByteBuffer.allocate(1024);
buffer1.put(str.getBytes());
buffer1.flip();
channel.write(buffer1);
stream.close();
} catch (IOException e) {
e.printStackTrace();
}
文件复制:同一个buffer
文件复制:使用通道的transform(),或transto()方法,更方便
文件直接修改,文件内容不再进入jvm,buffer直接映射到文件
通信示例
public static void server() throws Exception{
// 创建ServerSocketChannel ---》ServerSocket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress socketAddress = new InetSocketAddress(7000);
serverSocketChannel.socket().bind(socketAddress);
serverSocketChannel.configureBlocking(false);
// 创建Selector
Selector selector = Selector.open();
// serverSocketChannel注册到seletcor,事件为OP——ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
// 没有事件
if(selector.select(1000) == 0){ // selector 等待1秒钟
System.out.println("waiting for 1 seconds");
continue;
}
// 有事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while(keyIterator.hasNext()){
// 获取key
SelectionKey key = keyIterator.next();
// 获取对应通道
if(key.isAcceptable()){
// 生成socketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// socketChannel注册到selector,关注事件为OP_read
socketChannel.register(selector,SelectionKey.OP_READ,ByteBuffer.allocate(1024));
}
if (key.isReadable()){
// 反向获取到channel
SocketChannel channel = (SocketChannel) key.channel();
// 会获取到buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
channel.read(buffer);
System.out.println(String.format("当前线程:%s-%s,form客户端:%s",Thread.currentThread().getId(),Thread.currentThread().getName(),new String(buffer.array())));
}
// 删除selectionkey,防止重复操作
keyIterator.remove();
}
}
}
public static void client() throws Exception{
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
// 未连接上或正在连接
if(!socketChannel.connect(new InetSocketAddress("127.0.0.1",7000))){
while(!socketChannel.finishConnect()){
System.out.println("正在连接,但客户端没有阻塞");
}
}
// 已连接
String str = "hello world";
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
// 发送数据
socketChannel.write(buffer);
}
示例:服务器读取文件,发送给客户
sendFile:2次切换,3次copy linux2.1
sendFile:2次切换,2次copy linux2.4
异步非阻塞aio:链接数目多,并且链接时间长
单Rector单线程
单Rector多线程
// netty 聊天
public class Server{
public static void main(String[] args){
// 连个线程组,
// 处理链接,默认线程数=cup核数*2
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 处理业务
EventLoopGroup workGroup = new NioEventLoopGroup();
// 服务端启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class) // 通道类型
.option(ChannelOption.SO_BACKLOG, 128) // 线程队列链接数
.childOption(ChannelOption.SO_KEEPALIVE, true) //保持活动链接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler());
}
}); // 给workgroup的eventloop设置处理器
// 绑定端口,并启动
ChannelFuture cf = bootstrap.bind("127.0.0.1",6668).sync();
System.out.println("服务端启动....");
// 关闭,当有关闭事件时关闭
cf.channel().closeFuture().sync();
}catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
class NettyServerHandler extends ChannelInboundHandlerAdapter {
// 读取客户端消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(ctx.channel().remoteAddress());
System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public class Client {
public static void main(String[] args) {
// 事件循环组
EventLoopGroup eventExecutors = new NioEventLoopGroup();
// 客户端对象
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(eventExecutors) // 设置线程组
.channel(NioSocketChannel.class) // 设置通道实现类
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler()); // 加入处理器
}
});
// 启动
ChannelFuture cf = bootstrap.connect("127.0.0.1", 6668).sync();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ctx"+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("客户端发来的信息",CharsetUtil.UTF_8));
}
// 读取客户端消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(ctx.channel().remoteAddress());
System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
// 用户自定义的普通任务
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(ctx.channel().remoteAddress());
System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8));
// 若此时是耗时任务,客户端都要等待服务器执行完毕,
// 通过下面方式可以异步执行,就是将任务交给了netty的task
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
System.out.println(ctx.channel().remoteAddress());
System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8));
}
});
// 任务放在scheduleTaskQueue
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
System.out.println(ctx.channel().remoteAddress());
System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8));
}
},5, TimeUnit.SECONDS);
}
public class Main{
public static void main(String[] args){
// 连个线程组,
// 处理链,默认线程数=cup核数*2
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 处理业务
EventLoopGroup workGroup = new NioEventLoopGroup();
// 服务端启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class) // 通道类型
.option(ChannelOption.SO_BACKLOG, 128) // 线程队列链接数
.childOption(ChannelOption.SO_KEEPALIVE, true) //保持活动链接状态
.childHandler(new ChannelInitializer<SocketChannel>() {// 给workgroup添加handler
@Override
protected void initChannel(SocketChannel ch) throws Exception {// 每个请求pipline与handler是不共享的,
// 得到管道
ChannelPipeline pipeline = ch.pipeline();
// http 解码器
pipeline.addLast(new HttpServerCodec());
// 响应
pipeline.addLast(new NettyServerHandler());
}
}); // 给workgroup的eventloop设置处理器
// 绑定端口,并启动
ChannelFuture cf = bootstrap.bind("127.0.0.1",8080).sync();
System.out.println("服务端启动....");
// 关闭,当有关闭事件时关闭
cf.channel().closeFuture().sync();
}catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
// SimpleChannelInboundHandler<HttpObject>是ChannelInboundHandlerAdapter的子类
// 通信数据被封装httpobject
class NettyServerHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
System.out.println(ctx.channel().pipeline().hashCode()+" "+this.hashCode());
HttpRequest request = (HttpRequest) msg;
URI uri = new URI(request.uri());
if("/favicon.ico".equals(uri.getPath())){
System.out.println("不作响应");
return;
}
ByteBuf content = Unpooled.copiedBuffer("我是服务器,", CharsetUtil.UTF_16);
DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK,content);
defaultFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
defaultFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());
ctx.writeAndFlush(defaultFullHttpResponse);
}
}
// 该对象包含数组,读取的时候不同flip进行翻转,底层维护了readerIndex,writerIndex
ByteBuf a = Unpooled.buffer(100);
ByteBuf b = Unpooled.copiedBuffer("helloworld",CharsetUtil.UTF_8);
// 心跳检测处理器,触发器
// 3s没读取,发送检测包
// 5s没有写,发送检测包
// 7s没有读,也没有写,发送检测包
// 事件传递到下一个handler处理(自定义)
pipline.addLast(new IdleStateHandler(3,5,7TimeUnit.SECONDS))
pipline.addLast(new ChannelInboundHandlerAdapter(){
public void userEventTriggered(ChannelHandlerContext ctx,Object evt){
if(evt instance of IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
swicth(event.state()){
case READER_IDLE:
XXX
case WRITER_IDLE:
XXX
case ALL_IDLE:
XXX
}
}
}
})
class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
ctx.channel().writeAndFlush(new TextWebSocketFrame("xxxx"));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
}
}
var socket;
if(windows.WebSocket){
socket = new WebSocket("ws://localhost:7000/hello");
socket.onmessage = function(ev){
收到消息
}
socket.onopen = function(ev){
连接开启
}
socket.onclose = function(ev){
连接关闭
}
}
function send(message){
if(socket.readyState==WebSocket.OPEN){
socket.send(message);
}
}
Netty编码解码
序列化的缺点
syntax = "protobuf4";// 版本号,
option optimize_for = SPEED; // 加快解析
option java_package = "top.deanxxx"; // 指定包名
option java_outer_classname = "MyData";//文件名,外部类名
message MyMessage{
enum DataType{
StudentType = 0;
UserType = 1;
}
DataType data_Type = 1; // MyMessage 的第一个属性
oneof dataBody{
Stduent s = 2; // MyMessage 的另一个属性
User u = 3;// MyMessage 的另一个属性
}
}
message User{
int32 id = 1; // id属性 ,1 表示属性的序号
string name = 2;
}
message Student{
int32 id = 1; // id属性 ,1 表示属性的序号
string name = 2;
}
// protoc.exe --java_out=. xxx.proto
粘包:间隔时间短的包合并为一个
拆包:数据过大拆分
自定义协议+编解码器来解决
关键是解决服务器每次读取数据长度问题。
// 公共接口
public interface{
String hello(String msg);
}
// provider
public class HelloService implement HelloService{
String hello(String msg){
}
}
public class ServerBootstrap{
public static void main(String args){
}
}
public class NettyServer{
// netty初始化
private static void startserver0(String hostname,int port){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class) // 通道类型
.option(ChannelOption.SO_BACKLOG, 128) // 线程队列链接数
.childOption(ChannelOption.SO_KEEPALIVE, true) //保持活动链接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ServerHandler());
}
});
ChannelFuture cf = bootstrap.bind(hostname,port).sync();
cf.channel().closeFuture().sync();
}catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
public class ServerHandler{
channelRead:
xxxxx
}
public class NettyClient{
private static ExecutorService executor = Executors.newFixedThreadPool(4);
private static NettyClient client;
// 编写方法,代理模式
public Object getBean(final Class<?>serviceclass,final String providerName){
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class<?>[]{serviceClass},(proxy,method,args)->{
// 每调用一次hello,
if(cliet ==null){
initial();
}
client.setPara(prividerName+args[0]);
return executor.submit(client).get()
})
}
private static void initial(){
client = new NettyClient();
EventLoopGroup eventExecutors = new NioEventLoopGroup();
// 客户端对象
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
}
});
// 启动
ChannelFuture cf = bootstrap.connect("127.0.0.1", 8080).sync();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
}
public class ClientHandler extends ChannelInboundHandlerAdapter implements Callable{
private ChannelHandlerContext ctx;
private String result;
private para;
channelActivate(ctx){
ctx = ctx
}
synchronzied channelRead(msg){
result = msg.toString();
notify();
}
synchronized call(){
ctx.writeAndFlush(para);
wait();
retutn result;
}
}