聊聊storm的GraphiteStormReporter

本文主要研究一下storm的GraphiteStormReporter

GraphiteStormReporter

storm-core-1.2.2-sources.jar!/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java

public class GraphiteStormReporter extends ScheduledStormReporter {
    private final static Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class);

    public static final String GRAPHITE_PREFIXED_WITH = "graphite.prefixed.with";
    public static final String GRAPHITE_HOST = "graphite.host";
    public static final String GRAPHITE_PORT = "graphite.port";
    public static final String GRAPHITE_TRANSPORT = "graphite.transport";

    @Override
    public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) {
        LOG.debug("Preparing...");
        GraphiteReporter.Builder builder = GraphiteReporter.forRegistry(metricsRegistry);

        TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf);
        if (durationUnit != null) {
            builder.convertDurationsTo(durationUnit);
        }

        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
        if (rateUnit != null) {
            builder.convertRatesTo(rateUnit);
        }

        StormMetricsFilter filter = getMetricsFilter(reporterConf);
        if(filter != null){
            builder.filter(filter);
        }
        String prefix = getMetricsPrefixedWith(reporterConf);
        if (prefix != null) {
            builder.prefixedWith(prefix);
        }

        //defaults to 10
        reportingPeriod = getReportPeriod(reporterConf);

        //defaults to seconds
        reportingPeriodUnit = getReportPeriodUnit(reporterConf);

        // Not exposed:
        // * withClock(Clock)

        String host = getMetricsTargetHost(reporterConf);
        Integer port = getMetricsTargetPort(reporterConf);
        String transport = getMetricsTargetTransport(reporterConf);
        GraphiteSender sender = null;
        if (transport.equalsIgnoreCase("udp")) {
            sender = new GraphiteUDP(host, port);
        } else {
            sender = new Graphite(host, port);
        }
        reporter = builder.build(sender);
    }

    private static String getMetricsPrefixedWith(Map reporterConf) {
        return Utils.getString(reporterConf.get(GRAPHITE_PREFIXED_WITH), null);
    }

    private static String getMetricsTargetHost(Map reporterConf) {
        return Utils.getString(reporterConf.get(GRAPHITE_HOST), null);
    }

    private static Integer getMetricsTargetPort(Map reporterConf) {
        return Utils.getInt(reporterConf.get(GRAPHITE_PORT), null);
    }

    private static String getMetricsTargetTransport(Map reporterConf) {
        return Utils.getString(reporterConf.get(GRAPHITE_TRANSPORT), "tcp");
    }
}
  • 继承了ScheduledStormReporter,实现prepare方法
  • prepare方法根据配置文件创建com.codahale.metrics.graphite.GraphiteSender,然后创建com.codahale.metrics.graphite.GraphiteReporter

ScheduledStormReporter

storm-core-1.2.2-sources.jar!/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java

public abstract class ScheduledStormReporter implements StormReporter{
    private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class);
    protected ScheduledReporter reporter;
    protected long reportingPeriod;
    protected TimeUnit reportingPeriodUnit;

    @Override
    public void start() {
        if (reporter != null) {
            LOG.debug("Starting...");
            reporter.start(reportingPeriod, reportingPeriodUnit);
        } else {
            throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
        }
    }

    @Override
    public void stop() {
        if (reporter != null) {
            LOG.debug("Stopping...");
            reporter.stop();
        } else {
            throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
        }
    }

    public static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) {
        TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS);
        return unit == null ? TimeUnit.SECONDS : unit;
    }

    private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) {
        String rateUnitString = Utils.getString(reporterConf.get(configName), null);
        if (rateUnitString != null) {
            return TimeUnit.valueOf(rateUnitString);
        }
        return null;
    }

    public static long getReportPeriod(Map reporterConf) {
        return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue();
    }

    public static StormMetricsFilter getMetricsFilter(Map reporterConf){
        StormMetricsFilter filter = null;
        Map<String, Object> filterConf = (Map)reporterConf.get("filter");
        if(filterConf != null) {
            String clazz = (String) filterConf.get("class");
            if (clazz != null) {
                filter = Utils.newInstance(clazz);
                filter.prepare(filterConf);
            }
        }
        return filter;
    }
}
  • ScheduledStormReporter封装了对reporter的生命周期的控制,启动时调用start,关闭时调用stop

小结

  • storm从1.2版本开始启用了新的metrics,即metrics2,新版的metrics基于Dropwizard Metrics
  • 默认提供了Console Reporter、CSV Reporter、Ganglia Reporter 、Graphite Reporter、JMX Reporter

doc

  • New Metrics Reporting API
  • ubuntu-graphite-grafana

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-10-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏杨熹的专栏

【LEETCODE】模拟面试-134-Gas Station

新生 题目: https://leetcode.com/problems/gas-station/ There are N gas stations alon...

3506
来自专栏向前进

【笔记】实现一个简易的Promise

const PENDING_STATE = "pending"; const FULLFILL_STATE = "fullfilled"; const REJE...

3264
来自专栏cmazxiaoma的架构师之路

你真的了解Spring MVC处理请求流程吗?

3184
来自专栏码匠的流水账

聊聊storm的reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IErrorReporter.java

1463
来自专栏Hongten

python开发_calendar

如果你用过linux,你可能知道在linux下面的有一个强大的calendar功能,即日历

1272
来自专栏开发与安全

90% of python in 90 minutes

注:本文整理自 http://www.slideshare.net/MattHarrison4/learn-90 -----------------------...

2180
来自专栏java、Spring、技术分享

深入分析Spring MVC中RequestBody与ResponseBody

  在SpringMVC中,可以使用@RequestBody和@ResponseBody两个注解,分别完成请求报文到对象和对象到响应报文的转换。在Sprin...

2581
来自专栏个人分享

Socket与Http方式解析发送xml消息封装中间件jar包

  最近项目代码中太多重复的编写Document,不同的接口需要不同的模板,于是重写提取公共部分打成jar包,方便各个系统统一使用~

1543
来自专栏Lambda

常用Lambda表达式实例

集合操作 从集合中过滤出某一个字段存入到新集合 // 从商品集合中过滤出商品类目id为一个新 List<Integer>集合 List<Integer> c...

2138
来自专栏码匠的流水账

聊聊sentinel的ModifyRulesCommandHandler

本文主要研究一下sentinel的ModifyRulesCommandHandler

1171

扫码关注云+社区

领取腾讯云代金券