专栏首页DDDmotan客户端

motan客户端

很早之前学习motan框架(新浪微博开源RPC)看的源码,最近在看我司框架,温顾一下,代码部分排版不太好看,在浏览器里看才会正常显示

RPC的本质

方法调用对于程序员来讲是再正常不过的事了,object.method(),RPC的使用也一样,但底层对这一过程又切分开,有client和server两端,也就是调用者与实现者

因为他们不再在同一进程中,需要通过网络跨JVM实现这一调用过程

在java中的实现手法:动态代理+socket通信;这就是个套路,上层怎么封装实现,但底层就是这样,概莫能外

请求过程

motan的调用实现

先画个简单的序列图,理清一下调用过程

motan与spring的结合,后面再写了,spring的扩展也很简单。

基于对RPC本质的认识,可以先找到InvocationHandler的实现类RefererInvocationHandler

这个接口就一个方法

public Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable;

在这个方法里面就是去构造socket传输的request对象,request主要就是方法的签名信息与参数,传输到server端,去执行对应的实现方法

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if(isLocalMethod(method)){
            if("toString".equals(method.getName())){
                return clustersToString();
            }
            throw new MotanServiceException("can not invoke local method:" + method.getName());
        }
        DefaultRequest request = new DefaultRequest();

        request.setRequestId(RequestIdGenerator.getRequestId());
        request.setArguments(args);
        request.setMethodName(method.getName());
        request.setParamtersDesc(ReflectUtil.getMethodParamDesc(method));
        request.setInterfaceName(clz.getName());
        request.setAttachment(URLParamType.requestIdFromClient.getName(), String.valueOf(RequestIdGenerator.getRequestIdFromClient()));

        // 当 referer配置多个protocol的时候,比如A,B,C,
        // 那么正常情况下只会使用A,如果A被开关降级,那么就会使用B,B也被降级,那么会使用C
        for (Cluster<T> cluster : clusters) {
            String protocolSwitcher = MotanConstants.PROTOCOL_SWITCHER_PREFIX + cluster.getUrl().getProtocol();

            Switcher switcher = switcherService.getSwitcher(protocolSwitcher);

            if (switcher != null && !switcher.isOn()) {
                continue;
            }

            request.setAttachment(URLParamType.version.getName(), cluster.getUrl().getVersion());
            request.setAttachment(URLParamType.clientGroup.getName(), cluster.getUrl().getGroup());
            // 带上client的application和module
            request.setAttachment(URLParamType.application.getName(), ApplicationInfo.getApplication(cluster.getUrl()).getApplication());
            request.setAttachment(URLParamType.module.getName(), ApplicationInfo.getApplication(cluster.getUrl()).getModule());
            Response response = null;
            boolean throwException =
                    Boolean.parseBoolean(cluster.getUrl().getParameter(URLParamType.throwException.getName(),
                            URLParamType.throwException.getValue()));
            try {
                //真正执行
                response = cluster.call(request);
                return response.getValue();
            }

invoke交给了Cluster.call,而Cluster又给了HAStragy.call,HA策略通过loadbalance选择负载均衡策略得到Referer

Cluster是什么

在InvocationHandler里面,调用了Cluster的call方法,从代码上看,它的本质就是Referer的集合,并且提供了HA服务以及负载均衡。而Referer是提供服务的一个抽象

HA与LoadBalance

HA

HA策略,就提供了两种,

fail-fast

fail-fast很简单,调用失败就抛异常;

fail-over

fail-over相对fail-fast多了重试次数,如果失败,就重试一个referer

LoadBalance

这倒提供了不少

Round-Robin

这个很简单,一个一个往下轮询就行了, 但需要记住上一次的位置

random

随机

Least Load

这个motan实现有点意思

由于Referer List可能很多,比如上百台,如果每次都要从这上百个Referer或者最低并发的几个,性能有些损耗,因此 random.nextInt(list.size())获取一个起始的index,然后获取最多不超过MAXREFERERCOUNT的 状态是isAvailable的referer进行判断activeCount.

localFirst

本地服务优先获取策略:对referers根据ip顺序查找本地服务,多存在多个本地服务,获取Active最小的本地服务进行服务。当不存在本地服务,但是存在远程RPC服务,则根据ActivWeight获取远程RPC服务;当两者都存在,所有本地服务都应优先于远程服务,本地RPC服务与远程RPC服务内部则根据ActiveWeight进行

NettyClient

上层不管怎么选择服务,最后都需要传输层去传输,nettyclient就是传输作用。

在DefaultRpcReferer中创建了一个nettyClient。向server发送远程调用

private Response request(Request request, boolean async) throws TransportException {
        Channel channel = null;

        Response response = null;

        try {
            // return channel or throw exception(timeout or connection_fail)
            channel = borrowObject();

            if (channel == null) {
                LoggerUtil.error("NettyClient borrowObject null: url=" + url.getUri() + " "
                        + MotanFrameworkUtil.toString(request));
                return null;
            }

            // async request
            response = channel.request(request);
            // return channel to pool
            returnObject(channel);

使用了common-pool连接池

在这儿是委托给了nettychannel.request(),nettyclient与nettychannel是什么关系呢? client有server地址,channel就是这个地址连接的通道。 在nettychannel中

public Response request(Request request) throws TransportException {
        int timeout = nettyClient.getUrl().getMethodParameter(request.getMethodName(), request.getParamtersDesc(),
                URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
        if (timeout <= 0) {
               throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.",
                       MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
           }
        NettyResponseFuture response = new NettyResponseFuture(request, timeout, this.nettyClient);
        this.nettyClient.registerCallback(request.getRequestId(), response);

        ChannelFuture writeFuture = this.channel.write(request);

        boolean result = writeFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);

        if (result && writeFuture.isSuccess()) {
            response.addListener(new FutureListener() {
                @Override
                public void operationComplete(Future future) throws Exception {
                    if (future.isSuccess() || (future.isDone() && ExceptionUtil.isBizException(future.getException()))) {
                        // 成功的调用 
                        nettyClient.resetErrorCount();
                    } else {
                        // 失败的调用 
                        nettyClient.incrErrorCount();
                    }
                }
            });
            return response;
        }

到此,整个请求过程已经完成。

返回处理

调用完成之后,总得得到结果才行

motan返回过程

在上面nettychannel.request方法,会返回一个response,NettyResponseFuture这个类名就说明了一切,使用了Future模式。

在返回response时,构造真实response

private Response asyncResponse(Response response, boolean async) {
        if (async || !(response instanceof NettyResponseFuture)) {
            return response;
        }

        return new DefaultResponse(response);
    }

真实response里面,使用futureresponse去取值

public DefaultResponse(Response response) {
        this.value = response.getValue();
        this.exception = response.getException();
        this.requestId = response.getRequestId();
        this.processTime = response.getProcessTime();
        this.timeout = response.getTimeout();
    }

在futureresponse里面:

public Object getValue() {
        synchronized (lock) {
            if (!isDoing()) {
                return getValueOrThrowable();
            }

            if (timeout <= 0) {
                try {
                    lock.wait();
                } catch (Exception e) {
                    cancel(new MotanServiceException("NettyResponseFuture getValue InterruptedException : "
                            + MotanFrameworkUtil.toString(request) + " cost="
                            + (System.currentTimeMillis() - createTime), e));
                }

                // don't need to notifylisteners, because onSuccess or
                // onFailure or cancel method already call notifylisteners
                return getValueOrThrowable();
            } else {
                long waitTime = timeout - (System.currentTimeMillis() - createTime);

                if (waitTime > 0) {
                    for (;;) {
                        try {
                            lock.wait(waitTime);
                        } catch (InterruptedException e) {
                        }

                        if (!isDoing()) {
                            break;
                        } else {
                            waitTime = timeout - (System.currentTimeMillis() - createTime);
                            if (waitTime <= 0) {
                                break;
                            }
                        }
                    }
                }

                if (isDoing()) {
                    timeoutSoCancel();
                }
            }
            return getValueOrThrowable();
        }
    }

没有使用java.util.concurrent包中Condition,CountDownLatch之类的工具类,而是使用原始的wait,notify组合

在NettyClient中,得到返回对象后,对responsefuter进行赋值

pipeline.addLast("handler", new NettyChannelHandler(NettyClient.this, new MessageHandler() {
                    @Override
                    public Object handle(Channel channel, Object message) {
                        //得到返回对象
                        Response response = (Response) message;
                        //得到对应request的future
                        NettyResponseFuture responseFuture = NettyClient.this.removeCallback(response.getRequestId());

                        if (responseFuture == null) {
                            LoggerUtil.warn(
                                    "NettyClient has response from server, but resonseFuture not exist,  requestId={}",
                                    response.getRequestId());
                            return null;
                        }

                        if (response.getException() != null) {
                            responseFuture.onFailure(response);
                        } else {
                            responseFuture.onSuccess(response);
                        }

                        return null;
                    }
                }));

responseFuture的onsuccess方法,进行赋值并notify

public void onSuccess(Response response) {
        this.result = response.getValue();
        this.processTime = response.getProcessTime();

        done();
    }
private boolean done() {
        synchronized (lock) {
            if (!isDoing()) {
                return false;
            }

            state = FutureState.DONE;
            lock.notifyAll();
        }

        notifyListeners();
        return true;
    }

总结

到此,客户端部分已经完成,主要就是两方面

  1. 调用请求
  2. 返回处理

还有一些问题:

  1. 客户端怎么服务发现的?
  2. 服务降低怎么处理的?

本文分享自微信公众号 - 码农戏码(coder-game),作者:朱兴生

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-10-12

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • motan-2:motan的简约限流/熔断方式

    https://github.com/weibocom/motan/issues/551

    千里行走
  • spring整合各种RPC框架(netty、dubbo、dubbox、gRPC、Motan)-续(Motan)

    继上文 :spring整合各种RPC框架(netty、dubbo、dubbox、gRPC、Motan)-续(gRPC)

    逍遥壮士
  • 深入理解RPC之服务注册与发现篇

    在我们之前 RPC 原理的分析中,主要将笔墨集中在 Client 和 Server 端。而成熟的服务治理框架中不止存在这两个角色,一般还会有一个 Registr...

    kirito-moe
  • 【RPC 专题】深入理解 RPC 之服务注册与发现篇

    摘要: 原创出处 https://www.cnkirito.moe/rpc-registry/ 「老徐」欢迎转载,保留摘要,谢谢!

    芋道源码
  • motan使用zk

    叔牙
  • 使用Spring Cloud Sleuth实现链路监控

    在服务比较少的年代,一个系统的接口响应缓慢通常能够迅速被发现,但如今的微服务模块,大多具有规模大,依赖关系复杂等特性,错综复杂的网状结构使得我们不容易定位到某一...

    kirito-moe
  • motan-1:优雅关闭

    1.Motan支持在Consul、ZooKeeper集群环境下优雅的关闭节点,当需要关闭或重启节点时,可以先将待上线节点从集群中摘除,避免直接关闭影响正常请求。

    千里行走
  • Motan源码阅读--工程概述

    负载均衡默认为activeWeight,低并发度优先,refer的某个时刻call的数量越小优先级越高。

    春哥大魔王
  • 【RPC 专栏】从跨语言调用到 dubbo2.js

    摘要: 原创出处 https://www.cnkirito.moe/dubbojs-in-qianmi/ 「老徐」欢迎转载,保留摘要,谢谢!

    芋道源码
  • 【千米网】从跨语言调用到dubbo2.js

    微服务架构已成为目前互联网架构的趋势,关于微服务的讨论,几乎占据了各种技术大会的绝大多数版面。国内使用最多的服务治理框架非阿里开源的 dubbo 莫属,千米网也...

    kirito-moe
  • 深入理解 RPC 之集群篇

    上一篇文章分析了服务的注册与发现,这一篇文章着重分析下 RPC 框架都会用到的集群的相关知识。 集群(Cluster)本身并不具备太多知识点,在分布式系统中,...

    kirito-moe
  • 【RPC 专栏】深入理解 RPC 之集群篇

    摘要: 原创出处 https://www.cnkirito.moe/rpc-cluster/ 「老徐」欢迎转载,保留摘要,谢谢!

    芋道源码
  • 深入理解RPC之动态代理篇

    提到 JAVA 中的动态代理,大多数人都不会对 JDK 动态代理感到陌生,Proxy,InvocationHandler 等类都是 J2SE 中的基础概念。动态...

    kirito-moe
  • motan服务端

    服务端的处理也有套路,不管上层怎么玩,最后还得是通过反射得到Method对象,再调用invoke()

    码农戏码
  • Java 分布式 RPC 框架性能大比拼,Dubbo 排第几?

    其它的一些知名电商如当当、京东、国美维护了自己的分支或者在dubbo的基础开发,但是官方的库缺乏维护,相关的依赖类比如Spring,Netty还是很老的版本(S...

    芋道源码
  • Java微服务RPC选型Dubbo还是SpringCloud?

    国内最早开源的RPC框架,由阿里巴巴公司开发并于2011年末对外开源,仅支持Java

    JavaEdge
  • Java微服务选型Dubbo V.S SpringCloud

    国内最早开源的RPC框架,由阿里巴巴公司开发并于2011年末对外开源,仅支持Java

    JavaEdge
  • 微服务超时与重试

    经常被提起的两种超时:connection timeout、socket timeout

    码农戏码
  • 6 种微服务 RPC 框架,你知道几个?

    Dubbo:国内最早开源的 RPC 框架,由阿里巴巴公司开发并于 2011 年末对外开源,仅支持 Java 语言。

    淡定的蜗牛

扫码关注云+社区

领取腾讯云代金券