首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >springboot2上报metrics到statsd

springboot2上报metrics到statsd

作者头像
code4it
发布2018-09-17 16:05:19
1.2K0
发布2018-09-17 16:05:19
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下springboot2如何上报metrics到statsd

maven

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-statsd</artifactId>
        </dependency>

配置文件

# Whether exporting of metrics to StatsD is enabled.
management.metrics.export.statsd.enabled=true
# StatsD line protocol to use. datalog or esty
management.metrics.export.statsd.flavor=etsy
# Host of the StatsD server to receive exported metrics.
management.metrics.export.statsd.host=192.168.99.100
# Port of the StatsD server to receive exported metrics.
management.metrics.export.statsd.port=8125
# Total length of a single payload should be kept within your network's MTU.
management.metrics.export.statsd.max-packet-length=1400
# How often gauges will be polled. When a gauge is polled, its value is recalculated and if the value has changed (or publishUnchangedMeters is true), it is sent to the StatsD server.
management.metrics.export.statsd.polling-frequency=10s
# Whether to send unchanged meters to the StatsD server.
management.metrics.export.statsd.publish-unchanged-meters=true
# Maximum size of the queue of items waiting to be sent to the StatsD server.
management.metrics.export.statsd.queue-size=2147483647

flavor

micrometer-registry-statsd-1.0.1-sources.jar!/io/micrometer/statsd/StatsdFlavor.java

public enum StatsdFlavor {
    /**
     * https://github.com/etsy/statsd/blob/master/docs/metric_types.md
     */
    ETSY,

    /**
     * https://docs.datadoghq.com/guides/dogstatsd/#datagram-format
     */
    DATADOG,

    /**
     * https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/
     *
     * For gauges to work as expected, you should set `delete_gauges = false` in your input options as documented here:
     * https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd
     */
    TELEGRAF
}

flavor有好几种,默认是DATALOG,这里使用ETSY

StatsdProperties

spring-boot-actuator-autoconfigure-2.0.0.RELEASE-sources.jar!/org/springframework/boot/actuate/autoconfigure/metrics/export/statsd/StatsdProperties.java

@ConfigurationProperties(prefix = "management.metrics.export.statsd")
public class StatsdProperties {

    /**
     * Whether exporting of metrics to StatsD is enabled.
     */
    private boolean enabled = true;

    /**
     * StatsD line protocol to use.
     */
    private StatsdFlavor flavor = StatsdFlavor.DATADOG;

    /**
     * Host of the StatsD server to receive exported metrics.
     */
    private String host = "localhost";

    /**
     * Port of the StatsD server to receive exported metrics.
     */
    private Integer port = 8125;

    /**
     * Total length of a single payload should be kept within your network's MTU.
     */
    private Integer maxPacketLength = 1400;

    /**
     * How often gauges will be polled. When a gauge is polled, its value is recalculated
     * and if the value has changed (or publishUnchangedMeters is true), it is sent to the
     * StatsD server.
     */
    private Duration pollingFrequency = Duration.ofSeconds(10);

    /**
     * Maximum size of the queue of items waiting to be sent to the StatsD server.
     */
    private Integer queueSize = Integer.MAX_VALUE;

    /**
     * Whether to send unchanged meters to the StatsD server.
     */
    private boolean publishUnchangedMeters = true;

    //......
}

注意这里的queueSize默认是无限大。不过仔细看源码貌似没看到调用的地方。

实例

查看heap

curl -i http://localhost:8080/actuator/metrics/jvm.memory.used?tag=area:heap

返回

{
  "name": "jvm.memory.used",
  "measurements": [
    {
      "statistic": "VALUE",
      "value": 442224240
    }
  ],
  "availableTags": [
    {
      "tag": "id",
      "values": [
        "PS Eden Space",
        "PS Old Gen",
        "PS Survivor Space"
      ]
    }
  ]
}

查看eden

curl -i http://localhost:8080/actuator/metrics/jvm.memory.used?tag=area:heap&tag=id:PS Eden Space

返回

{
  "name": "jvm.memory.used",
  "measurements": [
    {
      "statistic": "VALUE",
      "value": 388454976
    }
  ],
  "availableTags": []
}

grafana展示

源码解析

tag转name

由于esty的statsd不支持tag,因此输出时将tag转为了name的一部分

micrometer-registry-statsd-1.0.1-sources.jar!/io/micrometer/statsd/internal/FlavorStatsdLineBuilder.java

/**
 * A Statsd serializer for a particular {@link Meter} that formats the line in different
 * ways depending on the prevailing {@link StatsdFlavor}.
 *
 * @author Jon Schneider
 */
public class FlavorStatsdLineBuilder implements StatsdLineBuilder {
    private final Meter.Id id;
    private final StatsdFlavor flavor;
    private final HierarchicalNameMapper nameMapper;
    private final MeterRegistry.Config config;

    private final Function<NamingConvention, String> datadogTagString;
    private final Function<NamingConvention, String> telegrafTagString;

    public FlavorStatsdLineBuilder(Meter.Id id, StatsdFlavor flavor, HierarchicalNameMapper nameMapper, MeterRegistry.Config config) {
        this.id = id;
        this.flavor = flavor;
        this.nameMapper = nameMapper;
        this.config = config;

        // service:payroll,region:us-west
        this.datadogTagString = memoize(convention ->
                id.getTags().iterator().hasNext() ?
                        id.getConventionTags(convention).stream()
                                .map(t -> t.getKey() + ":" + t.getValue())
                                .collect(Collectors.joining(","))
                        : null
        );

        // service=payroll,region=us-west
        this.telegrafTagString = memoize(convention ->
                id.getTags().iterator().hasNext() ?
                        id.getConventionTags(convention).stream()
                                .map(t -> t.getKey() + "=" + t.getValue())
                                .collect(Collectors.joining(","))
                        : null
        );
    }

    @Override
    public String count(long amount, Statistic stat) {
        return line(Long.toString(amount), stat, "c");
    }

    @Override
    public String gauge(double amount, Statistic stat) {
        return line(DoubleFormat.decimalOrNan(amount), stat, "g");
    }

    @Override
    public String histogram(double amount) {
        return line(DoubleFormat.decimalOrNan(amount), null, "h");
    }

    @Override
    public String timing(double timeMs) {
        return line(DoubleFormat.decimalOrNan(timeMs), null, "ms");
    }

    private String line(String amount, @Nullable Statistic stat, String type) {
        switch (flavor) {
            case ETSY:
                return metricName(stat) + ":" + amount + "|" + type;
            case DATADOG:
                return metricName(stat) + ":" + amount + "|" + type + tags(stat, datadogTagString.apply(config.namingConvention()),":", "|#");
            case TELEGRAF:
            default:
                return metricName(stat) + tags(stat, telegrafTagString.apply(config.namingConvention()),"=", ",") + ":" + amount + "|" + type;
        }
    }

    private String tags(@Nullable Statistic stat, String otherTags, String keyValueSeparator, String preamble) {
        String tags = of(stat == null ? null : "statistic" + keyValueSeparator + stat.getTagValueRepresentation(), otherTags)
                .filter(Objects::nonNull)
                .collect(Collectors.joining(","));

        if(!tags.isEmpty())
            tags = preamble + tags;
        return tags;
    }

    private String metricName(@Nullable Statistic stat) {
        switch (flavor) {
            case ETSY:
                return nameMapper.toHierarchicalName(stat != null ? id.withTag(stat) : id, config.namingConvention());
            case DATADOG:
            case TELEGRAF:
            default:
                return config.namingConvention().name(id.getName(), id.getType(), id.getBaseUnit());
        }
    }
}

重点看tags方法

StatsdMetricsExportAutoConfiguration

spring-boot-actuator-autoconfigure-2.0.0.RELEASE-sources.jar!/org/springframework/boot/actuate/autoconfigure/metrics/export/statsd/StatsdMetricsExportAutoConfiguration.java

@Configuration
@AutoConfigureBefore({ CompositeMeterRegistryAutoConfiguration.class,
        SimpleMetricsExportAutoConfiguration.class })
@AutoConfigureAfter(MetricsAutoConfiguration.class)
@ConditionalOnBean(Clock.class)
@ConditionalOnClass(StatsdMeterRegistry.class)
@ConditionalOnProperty(prefix = "management.metrics.export.statsd", name = "enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(StatsdProperties.class)
public class StatsdMetricsExportAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean(StatsdConfig.class)
    public StatsdConfig statsdConfig(StatsdProperties statsdProperties) {
        return new StatsdPropertiesConfigAdapter(statsdProperties);
    }

    @Bean
    @ConditionalOnMissingBean
    public StatsdMeterRegistry statsdMeterRegistry(StatsdConfig statsdConfig,
            HierarchicalNameMapper hierarchicalNameMapper, Clock clock) {
        return new StatsdMeterRegistry(statsdConfig, hierarchicalNameMapper, clock);
    }

    @Bean
    @ConditionalOnMissingBean
    public HierarchicalNameMapper hierarchicalNameMapper() {
        return HierarchicalNameMapper.DEFAULT;
    }

    @Bean
    public StatsdMetrics statsdMetrics() {
        return new StatsdMetrics();
    }

}

注意这里使用StatsdPropertiesConfigAdapter将statsdProperties适配为statsdConfig 这里还创建了StatsdMeterRegistry

StatsdMeterRegistry

micrometer-registry-statsd-1.0.1-sources.jar!/io/micrometer/statsd/StatsdMeterRegistry.java

public class StatsdMeterRegistry extends MeterRegistry {
    //......
    private StatsdMeterRegistry(StatsdConfig config,
                                HierarchicalNameMapper nameMapper,
                                NamingConvention namingConvention,
                                Clock clock,
                                @Nullable Function<Meter.Id, StatsdLineBuilder> lineBuilderFunction,
                                @Nullable Consumer<String> lineSink) {
        super(clock);

        this.statsdConfig = config;
        this.nameMapper = nameMapper;
        this.lineBuilderFunction = lineBuilderFunction;
        this.lineSink = lineSink;
        config().namingConvention(namingConvention);

        UnicastProcessor<String> processor = UnicastProcessor.create(Queues.<String>unboundedMultiproducer().get());

        try {
            Class.forName("ch.qos.logback.classic.turbo.TurboFilter", false, getClass().getClassLoader());
            this.publisher = new LogbackMetricsSuppressingUnicastProcessor(processor);
        } catch (ClassNotFoundException e) {
            this.publisher = processor;
        }

        if (lineSink != null) {
            publisher.subscribe(new Subscriber<String>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(String line) {
                    if (started.get()) {
                        lineSink.accept(line);
                    }
                }

                @Override
                public void onError(Throwable t) {
                }

                @Override
                public void onComplete() {
                    meterPoller.dispose();
                }
            });

            // now that we're connected, start polling gauges and other pollable meter types
            meterPoller.replace(Flux.interval(statsdConfig.pollingFrequency())
                    .doOnEach(n -> pollableMeters.forEach(StatsdPollable::poll))
                    .subscribe());
        }

        if (config.enabled())
            start();
    }

    public void start() {
        if (started.compareAndSet(false, true) && lineSink == null) {
            UdpClient.create(statsdConfig.host(), statsdConfig.port())
                    .newHandler((in, out) -> out
                            .options(NettyPipeline.SendOptions::flushOnEach)
                            .sendString(publisher)
                            .neverComplete()
                    )
                    .subscribe(client -> {
                        this.udpClient.replace(client);

                        // now that we're connected, start polling gauges and other pollable meter types
                        meterPoller.replace(Flux.interval(statsdConfig.pollingFrequency())
                                .doOnEach(n -> pollableMeters.forEach(StatsdPollable::poll))
                                .subscribe());
                    });
        }
    }

    public void stop() {
        if (started.compareAndSet(true, false)) {
            udpClient.dispose();
            meterPoller.dispose();
        }
    }

    @Override
    public void close() {
        stop();
        super.close();
    }
    //......
}

1.可以看到底层是使用reactor的UdpClient,processor使用的是UnicastProcessor,用的队列是无界的MpscLinkedQueue 2.这里我看半天没看到配置文件设置的queueSize作用在哪里 3.具体的数据中转是通过这个processor来处理,UdpClient这里订阅processor,然后各个metrics往processor产生数据

比如 micrometer-registry-statsd-1.0.1-sources.jar!/io/micrometer/statsd/StatsdGauge.java

public class StatsdGauge<T> extends AbstractMeter implements Gauge, StatsdPollable {
    private final StatsdLineBuilder lineBuilder;
    private final Subscriber<String> publisher;

    private final WeakReference<T> ref;
    private final ToDoubleFunction<T> value;
    private final AtomicReference<Double> lastValue = new AtomicReference<>(Double.NaN);
    private final boolean alwaysPublish;

    StatsdGauge(Id id, StatsdLineBuilder lineBuilder, Subscriber<String> publisher, @Nullable T obj, ToDoubleFunction<T> value, boolean alwaysPublish) {
        super(id);
        this.lineBuilder = lineBuilder;
        this.publisher = publisher;
        this.ref = new WeakReference<>(obj);
        this.value = value;
        this.alwaysPublish = alwaysPublish;
    }

    @Override
    public double value() {
        T obj = ref.get();
        return obj != null ? value.applyAsDouble(ref.get()) : 0;
    }

    @Override
    public void poll() {
        double val = value();
        if (alwaysPublish || lastValue.getAndSet(val) != val) {
            publisher.onNext(lineBuilder.gauge(val));
        }
    }

    @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
    @Override
    public boolean equals(Object o) {
        return MeterEquivalence.equals(this, o);
    }

    @Override
    public int hashCode() {
        return MeterEquivalence.hashCode(this);
    }
}

可以看到这里的poll方法往publisheronNext数据

小结

springboot2目前的micrometer貌似不支持statsd的prefix定义,这样会造成多个应用服务上报指标的时候,无法区分开来。

doc

  • production-ready-metrics-export-statsd
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-03-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • maven
  • 配置文件
    • flavor
      • StatsdProperties
      • 实例
        • 查看heap
          • 查看eden
            • grafana展示
            • 源码解析
              • tag转name
                • StatsdMetricsExportAutoConfiguration
                  • StatsdMeterRegistry
                  • 小结
                  • doc
                  相关产品与服务
                  Grafana 服务
                  Grafana 服务(TencentCloud Managed Service for Grafana,TCMG)是腾讯云基于社区广受欢迎的开源可视化项目 Grafana ,并与 Grafana Lab 合作开发的托管服务。TCMG 为您提供安全、免运维 Grafana 的能力,内建腾讯云多种数据源插件,如 Prometheus 监控服务、容器服务、日志服务 、Graphite 和 InfluxDB 等,最终实现数据的统一可视化。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档