摘 要
利用动态代理&反射&socket实现简单的RPC通信
RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC跨越了传输层和应用层,RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
现在RPC通信在内部分布式集群环境中已经很常见了。现在的开源的分布式框架已经提供了相应的实现,但仅停留在用的层面是远远不够的,这不符合一只码畜的追求。所以为了弄清RPC到底是个啥玩意,就查阅了部分资料,并针对其所述实现了一版最基础的RPC。
大体思路是这样的:
首先Consumer通过JDK动态代理的机制去创建socket,让socket连接Producer的SocketServer,内部利用ObjectOutputStream
将请求信息(接口信息,方法,参数)封装,通过socket传输。
其次Producer接到ObjectInputStream
,将信息拆包(接口信息,方法,参数)。利用反射将接口实现类实例化(这就是为什么RPC框架客户端和服务端都需要有一致的接口类)。
最后Producer利用反射将业务处理完毕后,用ObjectOutputStream
将结果封装,通过socket返回数据。Consumer接收到返回数据。
//RPCServer实现
RPCServer.java
package com.itunic.rpc;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
* RPC服务端<br>
* 基于SocketServer&反射实现
*
* @ClassName RPCServer
* @author yinbin
* @website https://itunic.com
* @Date 2017年6月23日 上午10:59:50
* @version 1.0.0
*/
public class RPCServer {
private static final Map<String, Class<?>> serviceMap = new ConcurrentHashMap<String, Class<?>>();
private static final ExecutorService exec = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private Integer port;
private Boolean isRunning = true;
public RPCServer(int port) {
this.port = port;
}
public void register(Class<?> serviceInterface, Class<?> impl) {
serviceMap.put(serviceInterface.getName(), impl);
}
public void start() throws IOException {
ServerSocket ss = new ServerSocket();
try {
ss.bind(new InetSocketAddress(port));
while (isRunning) {
exec.execute(new ServiceTask(ss.accept()));
}
} finally {
ss.close();
}
}
private class ServiceTask implements Runnable {
Socket socket = null;
public ServiceTask(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
ObjectInputStream input = null;
ObjectOutputStream output = null;
try {
input = new ObjectInputStream(socket.getInputStream());
String serviceName = input.readUTF();
String methodName = input.readUTF();
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
Object[] arguments = (Object[]) input.readObject();
Class<?> serviceClass = serviceMap.get(serviceName);
if (serviceClass == null) {
throw new ClassNotFoundException(serviceName + " not found");
}
Method method = serviceClass.getMethod(methodName, parameterTypes);
Object result = method.invoke(serviceClass.newInstance(), arguments);
output = new ObjectOutputStream(socket.getOutputStream());
output.writeObject(result);
output.flush();
} catch (Exception e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
} finally {
if (null != output) {
try {
output.close();
} catch (IOException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}
if (input != null) {
try {
input.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
//RPCClient实现
RPCClient.java
package com.itunic.rpc;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
/**
*
* RPC客户端 基于动态代理&socket实现
*
* @ClassName RPCClient
* @author yinbin
* @website https://itunic.com
* @Date 2017年6月23日 上午10:59:04
* @version 1.0.0
*/
public class RPCClient {
@SuppressWarnings("unchecked")
public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[] { serviceInterface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = null;
ObjectOutputStream output = null;
ObjectInputStream input = null;
try {
socket = new Socket();
socket.connect(addr);
output = new ObjectOutputStream(socket.getOutputStream());
output.writeUTF(serviceInterface.getName());
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(args);
input = new ObjectInputStream(socket.getInputStream());
return input.readObject();
} finally {
if (socket != null)
socket.close();
if (output != null)
output.close();
if (input != null)
input.close();
}
}
});
}
}
package com.itunic.rpc;
import java.io.IOException;
/**
*
* RPC服务端启动
* @ClassName RPCServerAction
* @author c
* @Date 2017年6月23日 下午5:12:27
* @version 1.0.0
*/
public class RPCServerAction {
public static void main(String[] args) throws IOException {
RPCServer server = new RPCServer(8888);
/**
* 注册相关的接口实现类
*/
server.register(TestHello.class, TestHelloImpl.class);
server.start();
}
}
RPC原理还是挺简单的,但可以有更好的实现。例如:socket用高并发框架netty替代,支持更多协议等。