dubbo监控机制之监控数据上报分析

dubbo对服务运行的监控,是通过从provider和consumer方收集调用信息存盘后,再由监控中心对数据分析绘表的方式完成的。 具体实现是provider和consumer向监控中心推数据。 今天以服务消费方为例,通过源码分析下消费方向监控中心上报数据的过程。 配置监控中心的两种方式:

<!--1,表示从注册中心发现监控中心地址-->
<dubbo:monitor protocol="registry"></dubbo:monitor>
<!--2,直连监控中心服务器地址-->
<dubbo:monitor address="10.47.17.170"></dubbo:monitor>

<!--配置过滤器monitor,dubbo是通过过滤器实现调用信息上报的-->
<dubbo:reference id="demoService"  interface="demo.dubbo.api.DemoService"   timeout="6000" filter="monitor"/>

以上spring配置里的<dubbo:monitor>标签的解析,在ReferenceBean的afterPropertiesSet方法中,逻辑如下

   public void afterPropertiesSet() throws Exception {
   
   //....其他代码略
   if (getMonitor() == null
                && (getConsumer() == null || getConsumer().getMonitor() == null)
                && (getApplication() == null || getApplication().getMonitor() == null)) {
		//解析MonitorConfig类,从容器中获取monitorConfig对象
            Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
            if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
                MonitorConfig monitorConfig = null;
                for (MonitorConfig config : monitorConfigMap.values()) {
                    if (config.isDefault() == null || config.isDefault().booleanValue()) {
                        if (monitorConfig != null) {
                            throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
                        }
                        monitorConfig = config;
                    }
                }
		//把解析后对象赋值给monitor属性,后面构造代理会用到
                if (monitorConfig != null) {
                    setMonitor(monitorConfig);
                }
            }
        }
   }

在构造代理逻辑在ReferenceConfig类的createProxy方法中,因为我们这里走注册中心发现监控中心,所以看下面一段逻辑:

	//构造注册中心url
	List<URL> us = loadRegistries(false);
	if (us != null && us.size() > 0) {
	    for (URL u : us) {
		//通过注册中心的url构造monitor Url(***跟踪下loadMonitor***)
		URL monitorUrl = loadMonitor(u);
		if (monitorUrl != null) {
		    //放置监控url到map key为“monitor”(***重点在这里***)
		    map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
		}
		urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
	    }
	}
	if (urls == null || urls.size() == 0) {
	    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
	}

跟到AbstractInterfaceConfig类的loadMonitor方法:

/***
     * 构造监控中心URL
     * @param registryURL
     * @return
     */
    protected URL loadMonitor(URL registryURL) {
        if (monitor == null) {
            //没有配置监控中心,从dubbo.monitor.address属性中获取
            String monitorAddress = ConfigUtils.getProperty("dubbo.monitor.address");
            //获取监控中心服务发现协议,比如通过注册中心
            String monitorProtocol = ConfigUtils.getProperty("dubbo.monitor.protocol");
            if (monitorAddress != null && monitorAddress.length() > 0
                    || monitorProtocol != null && monitorProtocol.length() > 0) {
                //都没有配置,new一个对象
                monitor = new MonitorConfig();
            } else {
                //没有注册中心
                return null;
            }
        }
        //把属性文件中的的值,填充到monitor对象里
        appendProperties(monitor);
        Map<String, String> map = new HashMap<String, String>();
        //
        //这里接口固定是MonitorService.class.getName(),就是固定通过这个接口提供服务上报服务
	//这里的MonitorService服务是由监控中心实现并注册的到注册中心。
        map.put(Constants.INTERFACE_KEY, MonitorService.class.getName());
        map.put("dubbo", Version.getVersion());
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        if (ConfigUtils.getPid() > 0) {
            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
        }
        //把monitor对象里的属性,放到map里去,key是对象属性名
        appendParameters(map, monitor);
        String address = monitor.getAddress();
        String sysaddress = System.getProperty("dubbo.monitor.address");
        if (sysaddress != null && sysaddress.length() > 0) {
            address = sysaddress;
        }
        //设置监控protocal
        if (ConfigUtils.isNotEmpty(address)) {
            if (!map.containsKey(Constants.PROTOCOL_KEY)) {
                if (ExtensionLoader.getExtensionLoader(MonitorFactory.class).hasExtension("logstat")) {
                    map.put(Constants.PROTOCOL_KEY, "logstat");
                } else {//没有logstat spi扩展,就用dubbo协议
                    map.put(Constants.PROTOCOL_KEY, "dubbo");
                }
            }
            //构造通过address和map,构造url
            return UrlUtils.parseURL(address, map);
        } else if (Constants.REGISTRY_PROTOCOL.equals(monitor.getProtocol()) && registryURL != null) {
            //如果monitor配置是通过注册中心发现,监控服务,设置protocol是dubbo, 添加参数 protocol=registry,refer=StringUtils.toQueryString(map)
            return registryURL.setProtocol("dubbo").addParameter(Constants.PROTOCOL_KEY, "registry").addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map));
        }
        return null;
    }

以上逻辑构造了monitorUrl并通过 monitor key放入url的参数中。 由于dubbo是通过过滤器上报监控数据的,(关于dubbo使用过滤器机制,还要从dubbo aop实现入手),下面分析下具体过滤器如何使用monitorUrl的,可以看懂文章开始我们配置的过滤器是“monitor” 所以这里,看下Filter的monitor spi实现,MonitorFilter类,具体在invoke方法里:

   //调用过程拦截
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {
            RpcContext context = RpcContext.getContext(); // 提供方必须在invoke()之前获取context信息
            String remoteHost = context.getRemoteHost();
            long start = System.currentTimeMillis(); // 记录起始时间戮
            getConcurrent(invoker, invocation).incrementAndGet(); // 并发计数++
            try {
                Result result = invoker.invoke(invocation); // 让调用链往下执行
                //上报调用统计信息(***看这里**)
                collect(invoker, invocation, result, remoteHost, start, false);
                return result;
            } catch (RpcException e) {
                collect(invoker, invocation, null, remoteHost, start, true);
                throw e;
            } finally {
                getConcurrent(invoker, invocation).decrementAndGet(); // 并发计数++
            }
        } else {
            return invoker.invoke(invocation);
        }
    }

//具体
private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
        try {
            // ---- 服务信息获取 ----
            long elapsed = System.currentTimeMillis() - start; // 计算调用耗时
            int concurrent = getConcurrent(invoker, invocation).get(); // 当前并发数
            String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);
            String service = invoker.getInterface().getName(); // 获取服务名称
            String method = RpcUtils.getMethodName(invocation); // 获取方法名
           
            URL url = invoker.getUrl().getUrlParameter(Constants.MONITOR_KEY);
	     //通过 monitor key 获取监控url (***看这里**),这里monitorFactory是spi机制生成的MonitorFactory$Adaptive
	     //这里实际是走的DubboMonitorFactroy类的getMonitor方法
            Monitor monitor = monitorFactory.getMonitor(url);
            int localPort;
            String remoteKey;
            String remoteValue;
            if (Constants.CONSUMER_SIDE.equals(invoker.getUrl().getParameter(Constants.SIDE_KEY))) {
                // ---- 服务消费方监控 ----
                localPort = 0;
                remoteKey = MonitorService.PROVIDER;
                remoteValue = invoker.getUrl().getAddress();
            } else {
                // ---- 服务提供方监控 ----
                localPort = invoker.getUrl().getPort();
                remoteKey = MonitorService.CONSUMER;
                remoteValue = remoteHost;
            }
            String input = "", output = "";
            if (invocation.getAttachment(Constants.INPUT_KEY) != null) {
                input = invocation.getAttachment(Constants.INPUT_KEY);
            }
            if (result != null && result.getAttachment(Constants.OUTPUT_KEY) != null) {
                output = result.getAttachment(Constants.OUTPUT_KEY);
            }
            //通过上面构造的监控上报工具,上报数据(***看这里**)
            monitor.collect(new URL(Constants.COUNT_PROTOCOL,
                    NetUtils.getLocalHost(), localPort,
                    service + "/" + method,
                    MonitorService.APPLICATION, application,
                    MonitorService.INTERFACE, service,
                    MonitorService.METHOD, method,
                    remoteKey, remoteValue,
                    error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1",
                    MonitorService.ELAPSED, String.valueOf(elapsed),
                    MonitorService.CONCURRENT, String.valueOf(concurrent),
                    Constants.INPUT_KEY, input,
                    Constants.OUTPUT_KEY, output));
        } catch (Throwable t) {
            logger.error("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
        }
    }

看下DubboMonitorFactroy类的getMonitor方法,实现在其父类AbstractMonitorFactory中:

public Monitor getMonitor(URL url) {
        //这里设置上报服务接口MonitorService
        url = url.setPath(MonitorService.class.getName()).addParameter(Constants.INTERFACE_KEY, MonitorService.class.getName());
        String key = url.toServiceStringWithoutResolving();
        LOCK.lock();
        try {
            //从缓存中获取
            Monitor monitor = MONITORS.get(key);
            if (monitor != null) {
                return monitor;
            }
            //通过url创建monitor,在子类DubboMonitorFactroy中实现
            monitor = createMonitor(url);
            if (monitor == null) {
                throw new IllegalStateException("Can not create monitor " + url);
            }
            MONITORS.put(key, monitor);
            return monitor;
        } finally {
            // 释放锁
            LOCK.unlock();
        }
    }

DubboMonitorFactroy里实现的createMonitor方法:

protected Monitor createMonitor(URL url) {
        //这里会通过url的protocol参数获取协议值,如果是通过注册中心发现监控中心服务的方式,这里
        //protocol的值是registry,否则就是dubbo
        url = url.setProtocol(url.getParameter(Constants.PROTOCOL_KEY, "dubbo"));
        if (url.getPath() == null || url.getPath().length() == 0) {
            url = url.setPath(MonitorService.class.getName());
        }
        String filter = url.getParameter(Constants.REFERENCE_FILTER_KEY);
        if (filter == null || filter.length() == 0) {
            filter = "";
        } else {
            filter = filter + ",";
        }
        //监控中心服务配置多个的场景,这里默认使用failsafe容错机制
        url = url.addParameters(Constants.CLUSTER_KEY, "failsafe", Constants.CHECK_KEY, String.valueOf(false),
                Constants.REFERENCE_FILTER_KEY, filter + "-monitor");
        //这里protocol也是Protocol$Adpative的,如果协议是registry 要走通过注册中心发现服务那一套逻辑。
        Invoker<MonitorService> monitorInvoker = protocol.refer(MonitorService.class, url);
        //创建服务代理代理
        MonitorService monitorService = proxyFactory.getProxy(monitorInvoker);
        //最后构造BubboMonitor对象
        return new DubboMonitor(monitorInvoker, monitorService);
    }

这里看下DubboMonitor类继承图,可以看到它实现了MonitorService接口

//构造函数
public DubboMonitor(Invoker<MonitorService> monitorInvoker, MonitorService monitorService) {
        this.monitorInvoker = monitorInvoker;
        this.monitorService = monitorService;
        this.monitorInterval = monitorInvoker.getUrl().getPositiveParameter("interval", 60000);
        // 启动统计信息收集定时器,设置上报频率monitorInterval,所以说,上报数据是异步的
        sendFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                // send方法收集统计信息
                try {
		    (***看这里***)
                    send();
                } catch (Throwable t) { // 防御性容错
                    logger.error("Unexpected error occur at send statistic, cause: " + t.getMessage(), t);
                }
            }
        }, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);
    }

     //从本地静态变量中获取统计信息,通过远程服务monitorService接口方法上报。
    public void send() {
        if (logger.isInfoEnabled()) {
            logger.info("Send statistics to monitor " + getUrl());
        }
        String timestamp = String.valueOf(System.currentTimeMillis());
        for (Map.Entry<Statistics, AtomicReference<long[]>> entry : statisticsMap.entrySet()) {
            // 获取已统计数据
            Statistics statistics = entry.getKey();
            AtomicReference<long[]> reference = entry.getValue();
            long[] numbers = reference.get();
            long success = numbers[0];
            long failure = numbers[1];
            long input = numbers[2];
            long output = numbers[3];
            long elapsed = numbers[4];
            long concurrent = numbers[5];
            long maxInput = numbers[6];
            long maxOutput = numbers[7];
            long maxElapsed = numbers[8];
            long maxConcurrent = numbers[9];

            // 发送汇总信息
            URL url = statistics.getUrl()
                    .addParameters(MonitorService.TIMESTAMP, timestamp,
                            MonitorService.SUCCESS, String.valueOf(success),
                            MonitorService.FAILURE, String.valueOf(failure),
                            MonitorService.INPUT, String.valueOf(input),
                            MonitorService.OUTPUT, String.valueOf(output),
                            MonitorService.ELAPSED, String.valueOf(elapsed),
                            MonitorService.CONCURRENT, String.valueOf(concurrent),
                            MonitorService.MAX_INPUT, String.valueOf(maxInput),
                            MonitorService.MAX_OUTPUT, String.valueOf(maxOutput),
                            MonitorService.MAX_ELAPSED, String.valueOf(maxElapsed),
                            MonitorService.MAX_CONCURRENT, String.valueOf(maxConcurrent)
                    );
            //调用监控中心发布的MonitorService服务,上报调用统计信息
            monitorService.collect(url);

            // 减掉已统计数据
            long[] current;
            long[] update = new long[LENGTH];
            do {
                current = reference.get();
                if (current == null) {
                    update[0] = 0;
                    update[1] = 0;
                    update[2] = 0;
                    update[3] = 0;
                    update[4] = 0;
                    update[5] = 0;
                } else {
                    update[0] = current[0] - success;
                    update[1] = current[1] - failure;
                    update[2] = current[2] - input;
                    update[3] = current[3] - output;
                    update[4] = current[4] - elapsed;
                    update[5] = current[5] - concurrent;
                }
            } while (!reference.compareAndSet(current, update));
        }
    }

    //而DubboMonitor本身的collect方法,供信息上报处,过滤器中调用
    //每次的调用信息,放入本地静态变量statisticsMap中,
    public void collect(URL url) {
        // 读写统计变量
        int success = url.getParameter(MonitorService.SUCCESS, 0);
        int failure = url.getParameter(MonitorService.FAILURE, 0);
        int input = url.getParameter(MonitorService.INPUT, 0);
        int output = url.getParameter(MonitorService.OUTPUT, 0);
        int elapsed = url.getParameter(MonitorService.ELAPSED, 0);
        int concurrent = url.getParameter(MonitorService.CONCURRENT, 0);
        // 初始化原子引用
        Statistics statistics = new Statistics(url);
        AtomicReference<long[]> reference = statisticsMap.get(statistics);
        if (reference == null) {
            statisticsMap.putIfAbsent(statistics, new AtomicReference<long[]>());
            reference = statisticsMap.get(statistics);
        }
        // CompareAndSet并发加入统计数据
        long[] current;
        long[] update = new long[LENGTH];
        do {
            current = reference.get();
            if (current == null) {
                update[0] = success;
                update[1] = failure;
                update[2] = input;
                update[3] = output;
                update[4] = elapsed;
                update[5] = concurrent;
                update[6] = input;
                update[7] = output;
                update[8] = elapsed;
                update[9] = concurrent;
            } else {
                update[0] = current[0] + success;
                update[1] = current[1] + failure;
                update[2] = current[2] + input;
                update[3] = current[3] + output;
                update[4] = current[4] + elapsed;
                update[5] = (current[5] + concurrent) / 2;
                update[6] = current[6] > input ? current[6] : input;
                update[7] = current[7] > output ? current[7] : output;
                update[8] = current[8] > elapsed ? current[8] : elapsed;
                update[9] = current[9] > concurrent ? current[9] : concurrent;
            }
        } while (!reference.compareAndSet(current, update));
    }

以上梳理了下,服务消费方配置监控中心并上报调用数据的流程, 服务提供方上报监控中心的流程是一样的。同样使用这个过滤器完成。 下次再梳理下,监控中心本身的处理逻辑。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏difcareer的技术笔记

Android Linker学习笔记[转]

Linker是Android系统动态库so的加载器/链接器,要想轻松地理解Android linker的运行机制,我们需要先熟悉ELF的文件结构,再了解ELF文...

31540
来自专栏Java学习网

Java Web Response对象的27个方法及状态码

response表示HttpServletResponse对象,主要将JSP容器处理后的结果传回到客户端。 ? 网络配图 1、void addCookie(...

54270
来自专栏阿杜的世界

Spring中的资源加载策略

Resouce接口可以根据资源的不同类型,或者资源位置的不同,给出对应的具体实现,Spring框架提供了一些实现类:

9220
来自专栏逆向技术

内核开发知识第一讲.内核中的数据类型.重要数据结构.常用内核API函数.

  在内核中.程序的编写不能简单的用基本数据类型了. 因为操作系统不同.很有可能造成数据类型的长度不一.而产生重大问题.所以在内核中.

19220
来自专栏coolblog.xyz技术专栏

Dubbo 源码分析 - 服务导出全过程解析

本篇文章,我们来研究一下 Dubbo 导出服务的过程。Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导...

14720
来自专栏Java3y

JDBC【数据库连接池、DbUtils框架、分页】

1.数据库连接池 什么是数据库连接池 简单来说:数据库连接池就是提供连接的。。。 为什么我们要使用数据库连接池 数据库的连接的建立和关闭是非常消耗资源的 频繁地...

39640
来自专栏coolblog.xyz技术专栏

Dubbo 源码分析 - 集群容错之 Directory

前面文章分析了服务的导出与引用过程,从本篇文章开始,我将开始分析 Dubbo 集群容错方面的源码。这部分源码包含四个部分,分别是服务目录 Directory、服...

9020
来自专栏Java与Android技术栈

Retrofit 风格的 RxCache及其多种缓存替换算法

之前的文章《给 Java 和 Android 构建一个简单的响应式Local Cache》、《RxCache 整合 Android 的持久层框架 greenDA...

8020
来自专栏xiaoheike

Spring Application Event Example

https://github.com/xiaoheike/SpringApplicationEventExample.git

18710
来自专栏菩提树下的杨过

mongodb-java-driver基本用法

1、先下载mongodb-java-driver 目前最新版本是2.9.3 2、下面是基本的CRUD示例代码: 1 package com.cnblogs....

22780

扫码关注云+社区

领取腾讯云代金券