专栏首页码匠的流水账springboot2上报metrics到statsd

springboot2上报metrics到statsd

本文主要研究一下springboot2如何上报metrics到statsd

maven

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-statsd</artifactId>
        </dependency>

配置文件

# Whether exporting of metrics to StatsD is enabled.
management.metrics.export.statsd.enabled=true
# StatsD line protocol to use. datalog or esty
management.metrics.export.statsd.flavor=etsy
# Host of the StatsD server to receive exported metrics.
management.metrics.export.statsd.host=192.168.99.100
# Port of the StatsD server to receive exported metrics.
management.metrics.export.statsd.port=8125
# Total length of a single payload should be kept within your network's MTU.
management.metrics.export.statsd.max-packet-length=1400
# How often gauges will be polled. When a gauge is polled, its value is recalculated and if the value has changed (or publishUnchangedMeters is true), it is sent to the StatsD server.
management.metrics.export.statsd.polling-frequency=10s
# Whether to send unchanged meters to the StatsD server.
management.metrics.export.statsd.publish-unchanged-meters=true
# Maximum size of the queue of items waiting to be sent to the StatsD server.
management.metrics.export.statsd.queue-size=2147483647

flavor

micrometer-registry-statsd-1.0.1-sources.jar!/io/micrometer/statsd/StatsdFlavor.java

public enum StatsdFlavor {
    /**
     * https://github.com/etsy/statsd/blob/master/docs/metric_types.md
     */
    ETSY,

    /**
     * https://docs.datadoghq.com/guides/dogstatsd/#datagram-format
     */
    DATADOG,

    /**
     * https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/
     *
     * For gauges to work as expected, you should set `delete_gauges = false` in your input options as documented here:
     * https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd
     */
    TELEGRAF
}

flavor有好几种,默认是DATALOG,这里使用ETSY

StatsdProperties

spring-boot-actuator-autoconfigure-2.0.0.RELEASE-sources.jar!/org/springframework/boot/actuate/autoconfigure/metrics/export/statsd/StatsdProperties.java

@ConfigurationProperties(prefix = "management.metrics.export.statsd")
public class StatsdProperties {

    /**
     * Whether exporting of metrics to StatsD is enabled.
     */
    private boolean enabled = true;

    /**
     * StatsD line protocol to use.
     */
    private StatsdFlavor flavor = StatsdFlavor.DATADOG;

    /**
     * Host of the StatsD server to receive exported metrics.
     */
    private String host = "localhost";

    /**
     * Port of the StatsD server to receive exported metrics.
     */
    private Integer port = 8125;

    /**
     * Total length of a single payload should be kept within your network's MTU.
     */
    private Integer maxPacketLength = 1400;

    /**
     * How often gauges will be polled. When a gauge is polled, its value is recalculated
     * and if the value has changed (or publishUnchangedMeters is true), it is sent to the
     * StatsD server.
     */
    private Duration pollingFrequency = Duration.ofSeconds(10);

    /**
     * Maximum size of the queue of items waiting to be sent to the StatsD server.
     */
    private Integer queueSize = Integer.MAX_VALUE;

    /**
     * Whether to send unchanged meters to the StatsD server.
     */
    private boolean publishUnchangedMeters = true;

    //......
}

注意这里的queueSize默认是无限大。不过仔细看源码貌似没看到调用的地方。

实例

查看heap

curl -i http://localhost:8080/actuator/metrics/jvm.memory.used?tag=area:heap

返回

{
  "name": "jvm.memory.used",
  "measurements": [
    {
      "statistic": "VALUE",
      "value": 442224240
    }
  ],
  "availableTags": [
    {
      "tag": "id",
      "values": [
        "PS Eden Space",
        "PS Old Gen",
        "PS Survivor Space"
      ]
    }
  ]
}

查看eden

curl -i http://localhost:8080/actuator/metrics/jvm.memory.used?tag=area:heap&tag=id:PS Eden Space

返回

{
  "name": "jvm.memory.used",
  "measurements": [
    {
      "statistic": "VALUE",
      "value": 388454976
    }
  ],
  "availableTags": []
}

grafana展示

源码解析

tag转name

由于esty的statsd不支持tag,因此输出时将tag转为了name的一部分

micrometer-registry-statsd-1.0.1-sources.jar!/io/micrometer/statsd/internal/FlavorStatsdLineBuilder.java

/**
 * A Statsd serializer for a particular {@link Meter} that formats the line in different
 * ways depending on the prevailing {@link StatsdFlavor}.
 *
 * @author Jon Schneider
 */
public class FlavorStatsdLineBuilder implements StatsdLineBuilder {
    private final Meter.Id id;
    private final StatsdFlavor flavor;
    private final HierarchicalNameMapper nameMapper;
    private final MeterRegistry.Config config;

    private final Function<NamingConvention, String> datadogTagString;
    private final Function<NamingConvention, String> telegrafTagString;

    public FlavorStatsdLineBuilder(Meter.Id id, StatsdFlavor flavor, HierarchicalNameMapper nameMapper, MeterRegistry.Config config) {
        this.id = id;
        this.flavor = flavor;
        this.nameMapper = nameMapper;
        this.config = config;

        // service:payroll,region:us-west
        this.datadogTagString = memoize(convention ->
                id.getTags().iterator().hasNext() ?
                        id.getConventionTags(convention).stream()
                                .map(t -> t.getKey() + ":" + t.getValue())
                                .collect(Collectors.joining(","))
                        : null
        );

        // service=payroll,region=us-west
        this.telegrafTagString = memoize(convention ->
                id.getTags().iterator().hasNext() ?
                        id.getConventionTags(convention).stream()
                                .map(t -> t.getKey() + "=" + t.getValue())
                                .collect(Collectors.joining(","))
                        : null
        );
    }

    @Override
    public String count(long amount, Statistic stat) {
        return line(Long.toString(amount), stat, "c");
    }

    @Override
    public String gauge(double amount, Statistic stat) {
        return line(DoubleFormat.decimalOrNan(amount), stat, "g");
    }

    @Override
    public String histogram(double amount) {
        return line(DoubleFormat.decimalOrNan(amount), null, "h");
    }

    @Override
    public String timing(double timeMs) {
        return line(DoubleFormat.decimalOrNan(timeMs), null, "ms");
    }

    private String line(String amount, @Nullable Statistic stat, String type) {
        switch (flavor) {
            case ETSY:
                return metricName(stat) + ":" + amount + "|" + type;
            case DATADOG:
                return metricName(stat) + ":" + amount + "|" + type + tags(stat, datadogTagString.apply(config.namingConvention()),":", "|#");
            case TELEGRAF:
            default:
                return metricName(stat) + tags(stat, telegrafTagString.apply(config.namingConvention()),"=", ",") + ":" + amount + "|" + type;
        }
    }

    private String tags(@Nullable Statistic stat, String otherTags, String keyValueSeparator, String preamble) {
        String tags = of(stat == null ? null : "statistic" + keyValueSeparator + stat.getTagValueRepresentation(), otherTags)
                .filter(Objects::nonNull)
                .collect(Collectors.joining(","));

        if(!tags.isEmpty())
            tags = preamble + tags;
        return tags;
    }

    private String metricName(@Nullable Statistic stat) {
        switch (flavor) {
            case ETSY:
                return nameMapper.toHierarchicalName(stat != null ? id.withTag(stat) : id, config.namingConvention());
            case DATADOG:
            case TELEGRAF:
            default:
                return config.namingConvention().name(id.getName(), id.getType(), id.getBaseUnit());
        }
    }
}

重点看tags方法

StatsdMetricsExportAutoConfiguration

spring-boot-actuator-autoconfigure-2.0.0.RELEASE-sources.jar!/org/springframework/boot/actuate/autoconfigure/metrics/export/statsd/StatsdMetricsExportAutoConfiguration.java

@Configuration
@AutoConfigureBefore({ CompositeMeterRegistryAutoConfiguration.class,
        SimpleMetricsExportAutoConfiguration.class })
@AutoConfigureAfter(MetricsAutoConfiguration.class)
@ConditionalOnBean(Clock.class)
@ConditionalOnClass(StatsdMeterRegistry.class)
@ConditionalOnProperty(prefix = "management.metrics.export.statsd", name = "enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(StatsdProperties.class)
public class StatsdMetricsExportAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean(StatsdConfig.class)
    public StatsdConfig statsdConfig(StatsdProperties statsdProperties) {
        return new StatsdPropertiesConfigAdapter(statsdProperties);
    }

    @Bean
    @ConditionalOnMissingBean
    public StatsdMeterRegistry statsdMeterRegistry(StatsdConfig statsdConfig,
            HierarchicalNameMapper hierarchicalNameMapper, Clock clock) {
        return new StatsdMeterRegistry(statsdConfig, hierarchicalNameMapper, clock);
    }

    @Bean
    @ConditionalOnMissingBean
    public HierarchicalNameMapper hierarchicalNameMapper() {
        return HierarchicalNameMapper.DEFAULT;
    }

    @Bean
    public StatsdMetrics statsdMetrics() {
        return new StatsdMetrics();
    }

}

注意这里使用StatsdPropertiesConfigAdapter将statsdProperties适配为statsdConfig 这里还创建了StatsdMeterRegistry

StatsdMeterRegistry

micrometer-registry-statsd-1.0.1-sources.jar!/io/micrometer/statsd/StatsdMeterRegistry.java

public class StatsdMeterRegistry extends MeterRegistry {
    //......
    private StatsdMeterRegistry(StatsdConfig config,
                                HierarchicalNameMapper nameMapper,
                                NamingConvention namingConvention,
                                Clock clock,
                                @Nullable Function<Meter.Id, StatsdLineBuilder> lineBuilderFunction,
                                @Nullable Consumer<String> lineSink) {
        super(clock);

        this.statsdConfig = config;
        this.nameMapper = nameMapper;
        this.lineBuilderFunction = lineBuilderFunction;
        this.lineSink = lineSink;
        config().namingConvention(namingConvention);

        UnicastProcessor<String> processor = UnicastProcessor.create(Queues.<String>unboundedMultiproducer().get());

        try {
            Class.forName("ch.qos.logback.classic.turbo.TurboFilter", false, getClass().getClassLoader());
            this.publisher = new LogbackMetricsSuppressingUnicastProcessor(processor);
        } catch (ClassNotFoundException e) {
            this.publisher = processor;
        }

        if (lineSink != null) {
            publisher.subscribe(new Subscriber<String>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(String line) {
                    if (started.get()) {
                        lineSink.accept(line);
                    }
                }

                @Override
                public void onError(Throwable t) {
                }

                @Override
                public void onComplete() {
                    meterPoller.dispose();
                }
            });

            // now that we're connected, start polling gauges and other pollable meter types
            meterPoller.replace(Flux.interval(statsdConfig.pollingFrequency())
                    .doOnEach(n -> pollableMeters.forEach(StatsdPollable::poll))
                    .subscribe());
        }

        if (config.enabled())
            start();
    }

    public void start() {
        if (started.compareAndSet(false, true) && lineSink == null) {
            UdpClient.create(statsdConfig.host(), statsdConfig.port())
                    .newHandler((in, out) -> out
                            .options(NettyPipeline.SendOptions::flushOnEach)
                            .sendString(publisher)
                            .neverComplete()
                    )
                    .subscribe(client -> {
                        this.udpClient.replace(client);

                        // now that we're connected, start polling gauges and other pollable meter types
                        meterPoller.replace(Flux.interval(statsdConfig.pollingFrequency())
                                .doOnEach(n -> pollableMeters.forEach(StatsdPollable::poll))
                                .subscribe());
                    });
        }
    }

    public void stop() {
        if (started.compareAndSet(true, false)) {
            udpClient.dispose();
            meterPoller.dispose();
        }
    }

    @Override
    public void close() {
        stop();
        super.close();
    }
    //......
}

1.可以看到底层是使用reactor的UdpClient,processor使用的是UnicastProcessor,用的队列是无界的MpscLinkedQueue 2.这里我看半天没看到配置文件设置的queueSize作用在哪里 3.具体的数据中转是通过这个processor来处理,UdpClient这里订阅processor,然后各个metrics往processor产生数据

比如 micrometer-registry-statsd-1.0.1-sources.jar!/io/micrometer/statsd/StatsdGauge.java

public class StatsdGauge<T> extends AbstractMeter implements Gauge, StatsdPollable {
    private final StatsdLineBuilder lineBuilder;
    private final Subscriber<String> publisher;

    private final WeakReference<T> ref;
    private final ToDoubleFunction<T> value;
    private final AtomicReference<Double> lastValue = new AtomicReference<>(Double.NaN);
    private final boolean alwaysPublish;

    StatsdGauge(Id id, StatsdLineBuilder lineBuilder, Subscriber<String> publisher, @Nullable T obj, ToDoubleFunction<T> value, boolean alwaysPublish) {
        super(id);
        this.lineBuilder = lineBuilder;
        this.publisher = publisher;
        this.ref = new WeakReference<>(obj);
        this.value = value;
        this.alwaysPublish = alwaysPublish;
    }

    @Override
    public double value() {
        T obj = ref.get();
        return obj != null ? value.applyAsDouble(ref.get()) : 0;
    }

    @Override
    public void poll() {
        double val = value();
        if (alwaysPublish || lastValue.getAndSet(val) != val) {
            publisher.onNext(lineBuilder.gauge(val));
        }
    }

    @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
    @Override
    public boolean equals(Object o) {
        return MeterEquivalence.equals(this, o);
    }

    @Override
    public int hashCode() {
        return MeterEquivalence.hashCode(this);
    }
}

可以看到这里的poll方法往publisheronNext数据

小结

springboot2目前的micrometer貌似不支持statsd的prefix定义,这样会造成多个应用服务上报指标的时候,无法区分开来。

doc

  • production-ready-metrics-export-statsd

本文分享自微信公众号 - 码匠的流水账(geek_luandun),作者:go4it

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-03-20

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊dubbo的ConfigChangeEvent

    dubbo-2.7.3/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/d...

    codecraft
  • 聊聊dubbo的ConfigChangeEvent

    dubbo-2.7.3/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/d...

    codecraft
  • 聊聊maxwell的BinlogConnectorDiagnostic

    本文主要研究一下maxwell的BinlogConnectorDiagnostic

    codecraft
  • 【BIO】基于BIO实现简单动态HTTP服务器

    喜欢天文的pony站长
  • spring boot集成WebSocket实时输出日志到web页面

    前言碎语 今天来做个有趣的东西,就是实时将系统日志输出的前端web页面,因为是实时输出,所有第一时间就想到了使用webSocket,而且在spring boot...

    kl博主
  • spring boot集成WebSocket实时输出日志到web页面

    前言碎语 今天来做个有趣的东西,就是实时将系统日志输出的前端web页面,因为是实时输出,所有第一时间就想到了使用webSocket,而且在spring boot...

    kl博主
  • Java面试系列13

    一、说出一些常用的类,包,接口,请各举5个 常用的类:BufferedReader BufferedWriter FileReader FileWirter ...

    奋斗蒙
  • 微信支付-微信红包Java版本

    扫描可以关注查看其它接口的demo效果 https://zb.oschina.net/market/opus/1325c0ab3ac1f4b6 代码链接,可根据...

    小帅丶
  • javafx框架tornadofx实战-舒尔特8-增加统计功能

    用户6167008
  • 【Web技术】281- 滴滴开源小程序框架 Mpx2.0

    Mpx是一款致力于提高小程序开发体验和效率的增强型小程序框架,目前在滴滴公司内部支撑了包括滴滴出行小程序,滴滴出行广场小程序,青桔单车,黑马电单车,小桔养车,小...

    pingan8787

扫码关注云+社区

领取腾讯云代金券