利用动态代理&反射&socket实现简单的RPC通信

摘 要

利用动态代理&反射&socket实现简单的RPC通信

概述

RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC跨越了传输层和应用层,RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

现在RPC通信在内部分布式集群环境中已经很常见了。现在的开源的分布式框架已经提供了相应的实现,但仅停留在用的层面是远远不够的,这不符合一只码畜的追求。所以为了弄清RPC到底是个啥玩意,就查阅了部分资料,并针对其所述实现了一版最基础的RPC。

实现思路

大体思路是这样的:

首先Consumer通过JDK动态代理的机制去创建socket,让socket连接Producer的SocketServer,内部利用ObjectOutputStream将请求信息(接口信息,方法,参数)封装,通过socket传输。

其次Producer接到ObjectInputStream,将信息拆包(接口信息,方法,参数)。利用反射将接口实现类实例化(这就是为什么RPC框架客户端和服务端都需要有一致的接口类)。

最后Producer利用反射将业务处理完毕后,用ObjectOutputStream将结果封装,通过socket返回数据。Consumer接收到返回数据。

具体代码实现

//RPCServer实现
RPCServer.java
package com.itunic.rpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 
 * RPC服务端<br>
 * 基于SocketServer&反射实现
 * 
 * @ClassName RPCServer
 * @author yinbin
 * @website https://itunic.com
 * @Date 2017年6月23日 上午10:59:50
 * @version 1.0.0
 */
public class RPCServer {
 private static final Map<String, Class<?>> serviceMap = new ConcurrentHashMap<String, Class<?>>();
 private static final ExecutorService exec = Executors
            .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
 private Integer port;
 private Boolean isRunning = true;

 public RPCServer(int port) {
 this.port = port;
    }

 public void register(Class<?> serviceInterface, Class<?> impl) {
        serviceMap.put(serviceInterface.getName(), impl);
    }

 public void start() throws IOException {
        ServerSocket ss = new ServerSocket();
 try {
            ss.bind(new InetSocketAddress(port));
 while (isRunning) {
                exec.execute(new ServiceTask(ss.accept()));
            }
        } finally {
            ss.close();
        }
    }

 private class ServiceTask implements Runnable {
        Socket socket = null;

 public ServiceTask(Socket socket) {
 this.socket = socket;
        }

 @Override
 public void run() {
            ObjectInputStream input = null;
            ObjectOutputStream output = null;
 try {
                input = new ObjectInputStream(socket.getInputStream());
                String serviceName = input.readUTF();
                String methodName = input.readUTF();
                Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                Object[] arguments = (Object[]) input.readObject();
                Class<?> serviceClass = serviceMap.get(serviceName);
 if (serviceClass == null) {
 throw new ClassNotFoundException(serviceName + " not found");
                }
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                Object result = method.invoke(serviceClass.newInstance(), arguments);
                output = new ObjectOutputStream(socket.getOutputStream());
                output.writeObject(result);
                output.flush();
            } catch (Exception e) {
 // TODO 自动生成的 catch 块
                e.printStackTrace();
            } finally {
 if (null != output) {
 try {
                        output.close();
                    } catch (IOException e) {
 // TODO 自动生成的 catch 块
                        e.printStackTrace();
                    }
                }
 if (input != null) {
 try {
                        input.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
 if (socket != null) {
 try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

    }
}
//RPCClient实现
RPCClient.java
package com.itunic.rpc;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
 * 
 * RPC客户端 基于动态代理&socket实现
 * 
 * @ClassName RPCClient
 * @author yinbin
 * @website https://itunic.com
 * @Date 2017年6月23日 上午10:59:04
 * @version 1.0.0
 */
public class RPCClient {
 @SuppressWarnings("unchecked")
 public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
 return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[] { serviceInterface },
 new InvocationHandler() {

 @Override
 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        Socket socket = null;
                        ObjectOutputStream output = null;
                        ObjectInputStream input = null;
 try {
                            socket = new Socket();
                            socket.connect(addr);
                            output = new ObjectOutputStream(socket.getOutputStream());
                            output.writeUTF(serviceInterface.getName());
                            output.writeUTF(method.getName());
                            output.writeObject(method.getParameterTypes());
                            output.writeObject(args);
                            input = new ObjectInputStream(socket.getInputStream());

 return input.readObject();
                        } finally {
 if (socket != null)
                                socket.close();
 if (output != null)
                                output.close();
 if (input != null)
                                input.close();
                        }
                    }
                });
    }
}
package com.itunic.rpc;

import java.io.IOException;

/**
 * 
 *  RPC服务端启动
 * @ClassName RPCServerAction
 * @author c
 * @Date 2017年6月23日 下午5:12:27
 * @version 1.0.0
 */
public class RPCServerAction {
 public static void main(String[] args) throws IOException {
        RPCServer server = new RPCServer(8888);
 /**
         * 注册相关的接口实现类
         */
        server.register(TestHello.class, TestHelloImpl.class);
        server.start();
    }
}

总结

RPC原理还是挺简单的,但可以有更好的实现。例如:socket用高并发框架netty替代,支持更多协议等。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏大内老A

ASP.NET Core中的依赖注入(5):ServicePrvider实现揭秘【补充漏掉的细节】

到目前为止,我们定义的ServiceProvider已经实现了基本的服务提供和回收功能,但是依然漏掉了一些必需的细节特性。这些特性包括如何针对IServiceP...

1927
来自专栏c#开发者

LightSwitch 2011 数据字段唯一性验证方案

LightSwitch 2011 数据字段唯一性验证方案 ? 验证单表数据的某个字段不能输入重复值 设置实体字段唯一索引 ? 如果不写代码,那么验证只会在...

3375
来自专栏進无尽的文章

编码篇-学会小用宏和条件编译

宏定义在C系开发中可以说占有举足轻重的作用。底层框架自不必说,为了编译优化和方便,以及跨平台能力,宏被大量使用,可以说底层开发离开define将寸步难行。而在更...

1341
来自专栏Spark生态圈

[spark] 数据本地化及延迟调度

Spark数据本地化即移动计算而不是移动数据,而现实又是残酷的,不是想要在数据块的地方计算就有足够的资源提供,为了让task能尽可能的以最优本地化级别(Loca...

1562
来自专栏Android 开发学习

JsBridge 源码分析

1433
来自专栏芋道源码1024

【RPC 专栏】简单了解RPC实现原理

2946
来自专栏开发 & 算法杂谈

Intel Pin-JIT模式和Probe模式下库函数的替换

这篇文章主要介绍一下Intel Pin在JIT模式和Probe模式下对库换数的替换,以及实现中有哪写需要注意的地方。

2376
来自专栏屈定‘s Blog

造轮子--Excel报表工具

由于公司内部之前对于excel封装操作并不是很方便,而且对于特殊的需求不是很容易满足,这个月的任务是迁移部分业务小报表顺便重构下,因此这里造个轮子,便于导入和导...

1493
来自专栏微服务那些事儿

关键数据变更监控

在经过了对mybatis的一番检索之后,没有发现对该需求的解决方式.在认知范围内,想到了使用mabatis拦截器解决该问题。

73519
来自专栏GuZhenYin

自己封装了一个EF的上下文类.,分享一下,顺便求大神指点

using System; using System.Collections.Generic; using System.Configuration; usin...

1826

扫码关注云+社区