




术语说明:
在RPC中, Client端叫做服务消费者, Server端叫做服务提供者
小结: RPC的目标就是将2-8这些步骤都封装起来, 用户无需关心这些细节, 可以像调用本地方法一样即可完成远程服务的服务调用

package com.dance.netty.netty.dubbo.common.api;
/**
* 对外提供服务的Service接口
*/
public interface HelloService {
String printHello(String msg);
}package com.dance.netty.netty.dubbo.provider.service.impl;
import com.dance.netty.netty.dubbo.common.api.HelloService;
public class HelloServiceImpl implements HelloService {
private static final HelloService helloService = new HelloServiceImpl();
@Override
public String printHello(String msg) {
System.out.println("接收到参数: " + msg);
return "hello msg!";
}
private HelloServiceImpl(){}
public static HelloService getInstance(){
return helloService;
}
}package com.dance.netty.netty.dubbo.common.netty;
import com.dance.netty.netty.dubbo.provider.service.impl.HelloServiceImpl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.nio.charset.StandardCharsets;
public class NettyServer {
public static void startServer0() {
startServer0("127.0.0.1", 7000);
}
public static void startServer0(String hostname, int port) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 使用字符串编解码器
ch.pipeline()
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new NettyServerHandler());
}
});
ChannelFuture sync = serverBootstrap.bind(hostname, port).sync();
System.out.println("netty server is starting, ip: " + hostname + ", port: " + port);
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}package com.dance.netty.netty.dubbo.common.netty;
import com.dance.netty.netty.dubbo.provider.service.impl.HelloServiceImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("有人注册");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String msgStr = msg.toString();
// 获取客户端发送的数据
System.out.println("msg is " + msgStr);
// 客户端在调用服务器的API的时候, 我们可以自定义一个协议
// 比如我们要求 每次发送消息都必须以固定格式开头 比如
// 服务名称#方法名称#参数
// 例如: HelloService#printHello#this is msg
if(msgStr.startsWith("HelloService#printHello#")){
// 传入参数的时候 去除协议头
ctx.writeAndFlush(HelloServiceImpl.getInstance().printHello(msgStr.substring(msgStr.lastIndexOf('#') + 1)));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}package com.dance.netty.netty.dubbo.provider.server;
import com.dance.netty.netty.dubbo.common.netty.NettyServer;
/**
* 服务启动类
*/
public class ServerBootstrap {
public static void main(String[] args) {
NettyServer.startServer0();
}
}package com.dance.netty.netty.dubbo.common.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.SocketAddress;
import java.util.concurrent.Callable;
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable<String> {
/**
* 上下文对象
*/
private ChannelHandlerContext context;
/**
* 远程调用的返回结果
*/
private String result;
public void setParams(String params) {
this.params = params;
}
/**
* 客户端调用方法时传入的参数
*/
private String params;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 在其他方法会使用
context = ctx;
}
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString();
// 唤醒在这个方法上等待的线程
notify();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
/**
* 被代理对象调用, 发送数据给服务器 => wait => 等待被读取唤醒(也就是服务器有数据回送时) => 返回结果
* 其实就是将异步通过 wait 和 notify 变成了同步等待
* @return 返回结果
* @throws Exception 线程异常
*/
@Override
public synchronized String call() throws Exception {
// 发送参数到 服务器
context.writeAndFlush(params);
// 等待read读取到数据
wait();
// 返回响应的数据
return result;
}
}package com.dance.netty.netty.dubbo.common.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NettyClient {
/**
* 创建线程池
*/
private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
/**
* 用于执行远程调用的处理器
*/
private static NettyClientHandler nettyClientHandler;
// 编写方法使用代理模式, 获取一个代理对象
public Object getBean(final Class<?> serviceClass, final String protocol){
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass}, (proxy, method, args) -> {
if (nettyClientHandler == null) {
initClient();
}
// 设置要发给服务器端的信息
// 采用 协议头 + 参数[0] 格式
nettyClientHandler.setParams(protocol + args[0]);
// 线程池提交一个任务
return executorService.submit(nettyClientHandler).get();
});
}
private static void initClient(){
nettyClientHandler = new NettyClientHandler();
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("stringDecoder", new StringDecoder())
.addLast("stringEncoder", new StringEncoder())
.addLast("nettyClientHandler", nettyClientHandler);
}
});
bootstrap.connect("127.0.0.1", 7000).sync();
// 注意 Client端不能阻塞
// sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 重点 千万不要关闭
// eventExecutors.shutdownGracefully();
}
}
}package com.dance.netty.netty.dubbo.consumer.client;
import com.dance.netty.netty.dubbo.common.api.HelloService;
import com.dance.netty.netty.dubbo.common.netty.NettyClient;
public class ClientBootstrap {
private static final String PROTOCOL = "HelloService#printHello#";
public static void main(String[] args) {
// 创建一个消费者
NettyClient nettyClient = new NettyClient();
// 创建代理对象
HelloService helloService = (HelloService) nettyClient.getBean(HelloService.class, PROTOCOL);
// 通过代理对象调用服务提供者的方法(服务)
String result = helloService.printHello("hi dubbo rpc");
System.out.println("调用方法执行返回结果: " + result);
}
}Server端
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
netty server is starting, ip: 127.0.0.1, port: 7000
msg is HelloService#printHello#hi dubbo rpc
接收到参数: hi dubbo rpcClient端
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
调用方法执行返回结果: hello msg!我们只通过接口就调用到了提供者的提供的接口,实现了通过Netty完成了RPC的远程调用
到这里Netty就写完了, 源码分析的话, 如果后面找到有好的视频讲解Netty的源码,再回来写吧, 接下来去看看数据结构与算法吧