前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RMI源码分析

RMI源码分析

作者头像
歪歪梯
发布2020-06-19 16:17:57
7610
发布2020-06-19 16:17:57
举报
文章被收录于专栏:歪歪梯Club歪歪梯Club

相关核心类

sun.rmi.server.UnicastServerRef sun.rmi.server.UnicastRef sun.rmi.server.Util sun.rmi.transport.tcp.TCPEndpoint sun.rmi.transport.LiveRef java.rmi.Naming sun.rmi.registry.RegistryImpl

rmi

RMI是Java的一组开发分布式应用程序的API。 RMI使用Java语言接口定义了远程对象,它集合了Java序列化和Java远程方法协议(Java Remote Method Protocol)。 简单地说,这样使原先的程序在同一操作系统的方法调用,变成了不同操作系统之间程序的方法调用,由于J2EE是分布式程序平台,它以RMI机制实现程序组件在不同操作系统之间的通信。 比如,一个EJB可以通过RMI调用Web上另一台机器上的EJB远程方法。

简单使用rmi

要发布的服务接口

代码语言:javascript
复制
public interface HelloService extends Remote{
    public String sayHello() throws RemoteException;
    //要发布的服务类的方法必须都throws RemoteException
    //在Util中,创建代理对象时会checkMethod,存在没有throws RemoteException的则抛出IllegalArgumentException
}

util中的checkMethod

代码语言:javascript
复制
   public static Remote createProxy(Class<?> implClass,
                                     RemoteRef clientRef,
                                     boolean forceStubUse)
        throws StubNotFoundException
    {
        .....
        final Class<?>[] interfaces = getRemoteInterfaces(implClass);
        .....
    }
    private static Class<?>[] getRemoteInterfaces(Class<?> remoteClass) {
        ArrayList<Class<?>> list = new ArrayList<>();
        getRemoteInterfaces(list, remoteClass);
        return list.toArray(new Class<?>[list.size()]);
    }

    private static void getRemoteInterfaces(ArrayList<Class<?>> list, Class<?> cl) {
        .....
        for (int i = 0; i < interfaces.length; i++) {
            Class<?> intf = interfaces[i];
            if (Remote.class.isAssignableFrom(intf)) {
                if (!(list.contains(intf))) {
                    Method[] methods = intf.getMethods();
                    for (int j = 0; j < methods.length; j++) {
                        checkMethod(methods[j]);
                    }
                    list.add(intf);
                }
            }
        }
    }

    private static void checkMethod(Method m) {
        Class<?>[] ex = m.getExceptionTypes();
        for (int i = 0; i < ex.length; i++) {
            if (ex[i].isAssignableFrom(RemoteException.class))
                return;
        }
        throw new IllegalArgumentException(
            "illegal remote method encountered: " + m);
    }

要发布的服务接口的实现类

代码语言:javascript
复制
public class HelloServiceImpl extends UnicastRemoteObject implements HelloService {
    //需要继承UnicastRemoteObject,因为Remote只是接口,UnicastRemoteObject是Remote的实现类

    protected HelloServiceImpl() throws RemoteException {
        super();
    }

    @Override
    public String sayHello() throws RemoteException{
        return "hello";
    }

}

服务发布端与客户消费端

代码语言:javascript
复制
public class Server {
    final static String host = "127.0.0.1";
    final static int port = 8080;
    public static void main(String[] args) throws RemoteException, MalformedURLException {
        HelloService helloService = new HelloServiceImpl();
        LocateRegistry.createRegistry(port);
        Naming.rebind("rmi://"+host+":"+port+"/hello", helloService);//不写port默认是1099
        System.out.println("服务启动...");
    }

}
public class Client {

    public static void main(String[] args) throws MalformedURLException, RemoteException, NotBoundException {
        HelloService helloService = (HelloService) Naming.lookup("rmi://"+Server.host+":"+Server.port+"/hello");//默认端口1099
        System.out.println(helloService.sayHello());
    }

}

追踪源码

启动服务前,创建服务对象会调用父类无参构造 HelloServiceImpl

代码语言:javascript
复制
protected HelloServiceImpl() throws RemoteException {
        super();
    }

UnicastRemoteObject构造

代码语言:javascript
复制
    protected UnicastRemoteObject() throws RemoteException
    {
        this(0);
    }
     protected UnicastRemoteObject(int port) throws RemoteException
    {
        this.port = port;
        exportObject((Remote) this, port);
    }

在构造中调用了exportObject,将服务暴露出去

代码语言:javascript
复制
    public static Remote exportObject(Remote obj, int port)
        throws RemoteException
    {
        return exportObject(obj, new UnicastServerRef(port));
    }
    private static Remote exportObject(Remote obj, UnicastServerRef sref)
        throws RemoteException
    {
        if (obj instanceof UnicastRemoteObject) {
            ((UnicastRemoteObject) obj).ref = sref;
        }
        return sref.exportObject(obj, null, false);
    }

构造UnicastServerRef服务器对象来发布服务

代码语言:javascript
复制
    public UnicastServerRef(int port) {
        super(new LiveRef(port));
        this.filter = null;
    }

UnicastServerRef调用父类带参构造

代码语言:javascript
复制
    public UnicastRef(LiveRef liveRef) {
        ref = liveRef;
    }

sref对象发布服务细节

代码语言:javascript
复制
    public Remote exportObject(Remote impl, Object data,
                               boolean permanent)
        throws RemoteException
    {
        Class<?> implClass = impl.getClass();
        Remote stub;

        try {
            stub = Util.createProxy(implClass, getClientRef(), forceStubUse);
        } catch (IllegalArgumentException e) {
            throw new ExportException(
                "remote object implements illegal remote interface", e);
        }
        if (stub instanceof RemoteStub) {
            setSkeleton(impl);
        }

        Target target =
            new Target(impl, this, stub, ref.getObjID(), permanent);
        ref.exportObject(target);
        hashToMethod_Map = hashToMethod_Maps.get(implClass);
        return stub;
    }

创建代理对象,并生成一个可以真正发布的target,调用ref对象(上一步构建以后传递到父类构造函数的LiveRef)发布出去 LiveRef细节

代码语言:javascript
复制
    public LiveRef(int port) {
        this((new ObjID()), port);
    }
    public LiveRef(ObjID objID, int port) {
        this(objID, TCPEndpoint.getLocalEndpoint(port), true);
    }
    public LiveRef(ObjID objID, Endpoint endpoint, boolean isLocal) {
        ep = endpoint;
        id = objID;
        this.isLocal = isLocal;
    }
    public void exportObject(Target target) throws RemoteException {
        ep.exportObject(target);
    }

TCPEndpoint.getLocalEndpoint

代码语言:javascript
复制
    public static TCPEndpoint getLocalEndpoint(int port) {
        return getLocalEndpoint(port, null, null);
    }

    public static TCPEndpoint getLocalEndpoint(int port,
                                               RMIClientSocketFactory csf,
                                               RMIServerSocketFactory ssf)
    {
        /*
         * Find mapping for an endpoint key to the list of local unique
         * endpoints for this client/server socket factory pair (perhaps
         * null) for the specific port.
         */
        TCPEndpoint ep = null;

        synchronized (localEndpoints) {
            TCPEndpoint endpointKey = new TCPEndpoint(null, port, csf, ssf);
            LinkedList<TCPEndpoint> epList = localEndpoints.get(endpointKey);
            String localHost = resampleLocalHost();

            if (epList == null) {
                /*
                 * Create new endpoint list.
                 */
                ep = new TCPEndpoint(localHost, port, csf, ssf);
                epList = new LinkedList<TCPEndpoint>();
                epList.add(ep);
                ep.listenPort = port;
                ep.transport = new TCPTransport(epList);
                localEndpoints.put(endpointKey, epList);

                if (TCPTransport.tcpLog.isLoggable(Log.BRIEF)) {
                    TCPTransport.tcpLog.log(Log.BRIEF,
                        "created local endpoint for socket factory " + ssf +
                        " on port " + port);
                }
            } else {
                synchronized (epList) {
                    ep = epList.getLast();
                    String lastHost = ep.host;
                    int lastPort =  ep.port;
                    TCPTransport lastTransport = ep.transport;
                    // assert (localHost == null ^ lastHost != null)
                    if (localHost != null && !localHost.equals(lastHost)) {
                        /*
                         * Hostname has been updated; add updated endpoint
                         * to list.
                         */
                        if (lastPort != 0) {
                            /*
                             * Remove outdated endpoints only if the
                             * port has already been set on those endpoints.
                             */
                            epList.clear();
                        }
                        ep = new TCPEndpoint(localHost, lastPort, csf, ssf);
                        ep.listenPort = port;
                        ep.transport = lastTransport;
                        epList.add(ep);
                    }
                }
            }
        }

        return ep;
    }

最终获得一个可以发布服务的TCPEndpoint对象,并调用该对象把服务暴露出去

服务注册与拉取源码分析

注册服务 LocateRegistry.createRegistry(port); 创建注册中心时,会创建一个RegistryImpl对象

代码语言:javascript
复制
    public static Registry createRegistry(int port) throws RemoteException {
        return new RegistryImpl(port);
    }

RegistryImpl代表注册中心 RegistryImpl中使用一个HashTable来注册(bind)服务和查找(lookup)服务

代码语言:javascript
复制
    private Hashtable<String, Remote> bindings = new Hashtable<>(101);
    public RegistryImpl(int port)
        throws RemoteException
    {
        if (port == Registry.REGISTRY_PORT && System.getSecurityManager() != null) {
            try {
                AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
                    public Void run() throws RemoteException {
                        LiveRef lref = new LiveRef(id, port);
                        setup(new UnicastServerRef(lref, RegistryImpl::registryFilter));
                        return null;
                    }
                }, null, new SocketPermission("localhost:"+port, "listen,accept"));
            } catch (PrivilegedActionException pae) {
                throw (RemoteException)pae.getException();
            }
        } else {
            LiveRef lref = new LiveRef(id, port);
            setup(new UnicastServerRef(lref, RegistryImpl::registryFilter));
        }
    }
    private void setup(UnicastServerRef uref)
        throws RemoteException
    {
        ref = uref;
        uref.exportObject(this, null, true);
    }
    public Remote lookup(String name)
        throws RemoteException, NotBoundException
    {
        synchronized (bindings) {
            Remote obj = bindings.get(name);
            if (obj == null)
                throw new NotBoundException(name);
            return obj;
        }
    }
    public void bind(String name, Remote obj)
        throws RemoteException, AlreadyBoundException, AccessException
    {
        synchronized (bindings) {
            Remote curr = bindings.get(name);
            if (curr != null)
                throw new AlreadyBoundException(name);
            bindings.put(name, obj);
        }
    }
    public void unbind(String name)
        throws RemoteException, NotBoundException, AccessException
    {
        synchronized (bindings) {
            Remote obj = bindings.get(name);
            if (obj == null)
                throw new NotBoundException(name);
            bindings.remove(name);
        }
    }
    public void rebind(String name, Remote obj)
        throws RemoteException, AccessException
    {
        bindings.put(name, obj);
    }
    .....

最终和服务端暴露服务类似,会把RegistryImpl对象暴露出去

当服务提供方调用Naming.rebind("rmi://"+host+":"+port+"/hello", helloService);注册服务时 会先根据端口获取到暴露的注册中心对象RegistryImpl,然后调用其方法注册 对应的客户端获取服务的过程也是类似(HelloService) Naming.lookup("rmi://"+Server.host+":"+Server.port+"/hello"); 会先根据端口获取到暴露的注册中心对象RegistryImpl,然后调用其方法获取

代码语言:javascript
复制
    public static void rebind(String name, Remote obj)
        throws RemoteException, java.net.MalformedURLException
    {
        ParsedNamingURL parsed = parseURL(name);
        Registry registry = getRegistry(parsed);

        if (obj == null)
            throw new NullPointerException("cannot bind to null");

        registry.rebind(parsed.name, obj);
    }
    public static Remote lookup(String name)
        throws NotBoundException,
            java.net.MalformedURLException,
            RemoteException
    {
        ParsedNamingURL parsed = parseURL(name);
        Registry registry = getRegistry(parsed);

        if (parsed.name == null)
            return registry;
        return registry.lookup(parsed.name);
    }
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 歪歪梯Club 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 相关核心类
  • rmi
  • 简单使用rmi
    • 要发布的服务接口
      • 要发布的服务接口的实现类
      • 服务发布端与客户消费端
      • 追踪源码
      • 服务注册与拉取源码分析
      相关产品与服务
      微服务引擎 TSE
      微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档