利用动态代理&反射&socket实现简单的RPC通信

摘 要

利用动态代理&反射&socket实现简单的RPC通信

概述

RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC跨越了传输层和应用层,RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

现在RPC通信在内部分布式集群环境中已经很常见了。现在的开源的分布式框架已经提供了相应的实现,但仅停留在用的层面是远远不够的,这不符合一只码畜的追求。所以为了弄清RPC到底是个啥玩意,就查阅了部分资料,并针对其所述实现了一版最基础的RPC。

实现思路

大体思路是这样的:

首先Consumer通过JDK动态代理的机制去创建socket,让socket连接Producer的SocketServer,内部利用ObjectOutputStream将请求信息(接口信息,方法,参数)封装,通过socket传输。

其次Producer接到ObjectInputStream,将信息拆包(接口信息,方法,参数)。利用反射将接口实现类实例化(这就是为什么RPC框架客户端和服务端都需要有一致的接口类)。

最后Producer利用反射将业务处理完毕后,用ObjectOutputStream将结果封装,通过socket返回数据。Consumer接收到返回数据。

具体代码实现

//RPCServer实现
RPCServer.java
package com.itunic.rpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 
 * RPC服务端<br>
 * 基于SocketServer&反射实现
 * 
 * @ClassName RPCServer
 * @author yinbin
 * @website https://itunic.com
 * @Date 2017年6月23日 上午10:59:50
 * @version 1.0.0
 */
public class RPCServer {
 private static final Map<String, Class<?>> serviceMap = new ConcurrentHashMap<String, Class<?>>();
 private static final ExecutorService exec = Executors
            .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
 private Integer port;
 private Boolean isRunning = true;

 public RPCServer(int port) {
 this.port = port;
    }

 public void register(Class<?> serviceInterface, Class<?> impl) {
        serviceMap.put(serviceInterface.getName(), impl);
    }

 public void start() throws IOException {
        ServerSocket ss = new ServerSocket();
 try {
            ss.bind(new InetSocketAddress(port));
 while (isRunning) {
                exec.execute(new ServiceTask(ss.accept()));
            }
        } finally {
            ss.close();
        }
    }

 private class ServiceTask implements Runnable {
        Socket socket = null;

 public ServiceTask(Socket socket) {
 this.socket = socket;
        }

 @Override
 public void run() {
            ObjectInputStream input = null;
            ObjectOutputStream output = null;
 try {
                input = new ObjectInputStream(socket.getInputStream());
                String serviceName = input.readUTF();
                String methodName = input.readUTF();
                Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                Object[] arguments = (Object[]) input.readObject();
                Class<?> serviceClass = serviceMap.get(serviceName);
 if (serviceClass == null) {
 throw new ClassNotFoundException(serviceName + " not found");
                }
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                Object result = method.invoke(serviceClass.newInstance(), arguments);
                output = new ObjectOutputStream(socket.getOutputStream());
                output.writeObject(result);
                output.flush();
            } catch (Exception e) {
 // TODO 自动生成的 catch 块
                e.printStackTrace();
            } finally {
 if (null != output) {
 try {
                        output.close();
                    } catch (IOException e) {
 // TODO 自动生成的 catch 块
                        e.printStackTrace();
                    }
                }
 if (input != null) {
 try {
                        input.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
 if (socket != null) {
 try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

    }
}
//RPCClient实现
RPCClient.java
package com.itunic.rpc;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
 * 
 * RPC客户端 基于动态代理&socket实现
 * 
 * @ClassName RPCClient
 * @author yinbin
 * @website https://itunic.com
 * @Date 2017年6月23日 上午10:59:04
 * @version 1.0.0
 */
public class RPCClient {
 @SuppressWarnings("unchecked")
 public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
 return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[] { serviceInterface },
 new InvocationHandler() {

 @Override
 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        Socket socket = null;
                        ObjectOutputStream output = null;
                        ObjectInputStream input = null;
 try {
                            socket = new Socket();
                            socket.connect(addr);
                            output = new ObjectOutputStream(socket.getOutputStream());
                            output.writeUTF(serviceInterface.getName());
                            output.writeUTF(method.getName());
                            output.writeObject(method.getParameterTypes());
                            output.writeObject(args);
                            input = new ObjectInputStream(socket.getInputStream());

 return input.readObject();
                        } finally {
 if (socket != null)
                                socket.close();
 if (output != null)
                                output.close();
 if (input != null)
                                input.close();
                        }
                    }
                });
    }
}
package com.itunic.rpc;

import java.io.IOException;

/**
 * 
 *  RPC服务端启动
 * @ClassName RPCServerAction
 * @author c
 * @Date 2017年6月23日 下午5:12:27
 * @version 1.0.0
 */
public class RPCServerAction {
 public static void main(String[] args) throws IOException {
        RPCServer server = new RPCServer(8888);
 /**
         * 注册相关的接口实现类
         */
        server.register(TestHello.class, TestHelloImpl.class);
        server.start();
    }
}

总结

RPC原理还是挺简单的,但可以有更好的实现。例如:socket用高并发框架netty替代,支持更多协议等。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏大大的微笑

JAVA中使用Jedis操作Redis

redis安装看这里:https://my.oschina.net/u/2486137/blog/1541190 需要的jar:commons-pool2 ,r...

5657
来自专栏码匠的流水账

聊聊spring cloud gateway的PrefixPath及StripPrefix功能

本文主要研究一下spring cloud gateway的PrefixPath及StripPrefix功能

632
来自专栏码匠的流水账

聊聊storm的tickTuple

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/TupleUtils.java

622
来自专栏软件工程师成长笔记

Java判断计算机网络连接是否正常

1022
来自专栏Linyb极客之路

Spring Cloud开发注意事项

如果provider中需要引入其他feign client的接口,需在 provider的启动类添加注解 @EnableFeignClients(basePac...

1203
来自专栏码匠的流水账

聊聊HystrixThreadPool

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixThreadPool.java

501
来自专栏xdecode

Protocol Buffer序列化对比Java序列化.

初识 Protocol Buff是谷歌推出的一种序列化协议. 而Java序列化协议也是一种协议. 两者的目的是, 将对象序列化成字节数组, 或者说是二进制数据,...

1725
来自专栏软件工程师成长笔记

判断监听系统网络状态

913
来自专栏编码小白

ofbiz实体引擎(七) 检查数据源

/** * Check the datasource to make sure the entity definitions are correct,...

2634
来自专栏Java成神之路

JavaUtil_04_验证码生成器

通过一个随机串,一个指定串(如accesskey),和当前时间来进行验证码的生成,期间还经过SHA1加密。如网易云信的短信验证码生成器:

472

扫码关注云+社区