利用动态代理&反射&socket实现简单的RPC通信

摘 要

利用动态代理&反射&socket实现简单的RPC通信

概述

RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC跨越了传输层和应用层,RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

现在RPC通信在内部分布式集群环境中已经很常见了。现在的开源的分布式框架已经提供了相应的实现,但仅停留在用的层面是远远不够的,这不符合一只码畜的追求。所以为了弄清RPC到底是个啥玩意,就查阅了部分资料,并针对其所述实现了一版最基础的RPC。

实现思路

大体思路是这样的:

首先Consumer通过JDK动态代理的机制去创建socket,让socket连接Producer的SocketServer,内部利用ObjectOutputStream将请求信息(接口信息,方法,参数)封装,通过socket传输。

其次Producer接到ObjectInputStream,将信息拆包(接口信息,方法,参数)。利用反射将接口实现类实例化(这就是为什么RPC框架客户端和服务端都需要有一致的接口类)。

最后Producer利用反射将业务处理完毕后,用ObjectOutputStream将结果封装,通过socket返回数据。Consumer接收到返回数据。

具体代码实现

//RPCServer实现
RPCServer.java
package com.itunic.rpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 
 * RPC服务端<br>
 * 基于SocketServer&反射实现
 * 
 * @ClassName RPCServer
 * @author yinbin
 * @website https://itunic.com
 * @Date 2017年6月23日 上午10:59:50
 * @version 1.0.0
 */
public class RPCServer {
 private static final Map<String, Class<?>> serviceMap = new ConcurrentHashMap<String, Class<?>>();
 private static final ExecutorService exec = Executors
            .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
 private Integer port;
 private Boolean isRunning = true;

 public RPCServer(int port) {
 this.port = port;
    }

 public void register(Class<?> serviceInterface, Class<?> impl) {
        serviceMap.put(serviceInterface.getName(), impl);
    }

 public void start() throws IOException {
        ServerSocket ss = new ServerSocket();
 try {
            ss.bind(new InetSocketAddress(port));
 while (isRunning) {
                exec.execute(new ServiceTask(ss.accept()));
            }
        } finally {
            ss.close();
        }
    }

 private class ServiceTask implements Runnable {
        Socket socket = null;

 public ServiceTask(Socket socket) {
 this.socket = socket;
        }

 @Override
 public void run() {
            ObjectInputStream input = null;
            ObjectOutputStream output = null;
 try {
                input = new ObjectInputStream(socket.getInputStream());
                String serviceName = input.readUTF();
                String methodName = input.readUTF();
                Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                Object[] arguments = (Object[]) input.readObject();
                Class<?> serviceClass = serviceMap.get(serviceName);
 if (serviceClass == null) {
 throw new ClassNotFoundException(serviceName + " not found");
                }
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                Object result = method.invoke(serviceClass.newInstance(), arguments);
                output = new ObjectOutputStream(socket.getOutputStream());
                output.writeObject(result);
                output.flush();
            } catch (Exception e) {
 // TODO 自动生成的 catch 块
                e.printStackTrace();
            } finally {
 if (null != output) {
 try {
                        output.close();
                    } catch (IOException e) {
 // TODO 自动生成的 catch 块
                        e.printStackTrace();
                    }
                }
 if (input != null) {
 try {
                        input.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
 if (socket != null) {
 try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

    }
}
//RPCClient实现
RPCClient.java
package com.itunic.rpc;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
 * 
 * RPC客户端 基于动态代理&socket实现
 * 
 * @ClassName RPCClient
 * @author yinbin
 * @website https://itunic.com
 * @Date 2017年6月23日 上午10:59:04
 * @version 1.0.0
 */
public class RPCClient {
 @SuppressWarnings("unchecked")
 public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
 return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[] { serviceInterface },
 new InvocationHandler() {

 @Override
 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        Socket socket = null;
                        ObjectOutputStream output = null;
                        ObjectInputStream input = null;
 try {
                            socket = new Socket();
                            socket.connect(addr);
                            output = new ObjectOutputStream(socket.getOutputStream());
                            output.writeUTF(serviceInterface.getName());
                            output.writeUTF(method.getName());
                            output.writeObject(method.getParameterTypes());
                            output.writeObject(args);
                            input = new ObjectInputStream(socket.getInputStream());

 return input.readObject();
                        } finally {
 if (socket != null)
                                socket.close();
 if (output != null)
                                output.close();
 if (input != null)
                                input.close();
                        }
                    }
                });
    }
}
package com.itunic.rpc;

import java.io.IOException;

/**
 * 
 *  RPC服务端启动
 * @ClassName RPCServerAction
 * @author c
 * @Date 2017年6月23日 下午5:12:27
 * @version 1.0.0
 */
public class RPCServerAction {
 public static void main(String[] args) throws IOException {
        RPCServer server = new RPCServer(8888);
 /**
         * 注册相关的接口实现类
         */
        server.register(TestHello.class, TestHelloImpl.class);
        server.start();
    }
}

总结

RPC原理还是挺简单的,但可以有更好的实现。例如:socket用高并发框架netty替代,支持更多协议等。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏张善友的专栏

Mix 10 上的asp.net mvc 2的相关Session

Beyond File | New Company: From Cheesy Sample to Social Platform Scott Hansel...

2517
来自专栏一个爱瞎折腾的程序猿

sqlserver使用存储过程跟踪SQL

USE [master] GO /****** Object: StoredProcedure [dbo].[sp_perfworkload_trace_s...

1990
来自专栏张善友的专栏

Miguel de Icaza 细说 Mix 07大会上的Silverlight和DLR

Mono之父Miguel de Icaza 详细报道微软Mix 07大会上的Silverlight和DLR ,上面还谈到了Mono and Silverligh...

2667
来自专栏我和未来有约会

Silverlight第三方控件专题

这里我收集整理了目前网上silverlight第三方控件的专题,若果有所遗漏请告知我一下。 名称 简介 截图 telerik 商 RadC...

3955
来自专栏一个会写诗的程序员的博客

Spring Reactor 项目核心库Reactor Core

Non-Blocking Reactive Streams Foundation for the JVM both implementing a Reactiv...

2102
来自专栏大内老A

The .NET of Tomorrow

Ed Charbeneau(http://developer.telerik.com/featured/the-net-of-tomorrow/) Exciti...

30810
来自专栏我和未来有约会

Kit 3D 更新

Kit3D is a 3D graphics engine written for Microsoft Silverlight. Kit3D was inita...

2506
来自专栏闻道于事

js登录滑动验证,不滑动无法登陆

js的判断这里是根据滑块的位置进行判断,应该是用一个flag判断 <%@ page language="java" contentType="text/html...

6588
来自专栏陈仁松博客

ASP.NET Core 'Microsoft.Win32.Registry' 错误修复

今天在发布Asp.net Core应用到Azure的时候出现错误InvalidOperationException: Cannot find compilati...

4798
来自专栏魂祭心

原 canvas绘制clock

4004

扫码关注云+社区