聊聊storm的LoggingClusterMetricsConsumer

本文主要研究一下storm的LoggingClusterMetricsConsumer

LoggingClusterMetricsConsumer

storm-2.0.0/storm-server/src/main/java/org/apache/storm/metric/LoggingClusterMetricsConsumer.java

public class LoggingClusterMetricsConsumer implements IClusterMetricsConsumer {
    public static final Logger LOG = LoggerFactory.getLogger(LoggingClusterMetricsConsumer.class);
​
    static private String padding = "                       ";
​
    @Override
    public void prepare(Object registrationArgument) {
    }
​
    @Override
    public void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints) {
        StringBuilder sb = new StringBuilder();
        String header = String.format("%d\t%15s\t%40s\t",
                                      clusterInfo.getTimestamp(), "<cluster>", "<cluster>");
        sb.append(header);
        logDataPoints(dataPoints, sb, header);
    }
​
    @Override
    public void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints) {
        StringBuilder sb = new StringBuilder();
        String header = String.format("%d\t%15s\t%40s\t",
                                      supervisorInfo.getTimestamp(),
                                      supervisorInfo.getSrcSupervisorHost(),
                                      supervisorInfo.getSrcSupervisorId());
        sb.append(header);
        for (DataPoint p : dataPoints) {
            sb.delete(header.length(), sb.length());
            sb.append(p.getName())
              .append(padding).delete(header.length() + 23, sb.length()).append("\t")
              .append(p.getValue());
            LOG.info(sb.toString());
        }
    }
​
    @Override
    public void cleanup() {
    }
​
    private void logDataPoints(Collection<DataPoint> dataPoints, StringBuilder sb, String header) {
        for (DataPoint p : dataPoints) {
            sb.delete(header.length(), sb.length());
            sb.append(p.getName())
              .append(padding).delete(header.length() + 23, sb.length()).append("\t")
              .append(p.getValue());
            LOG.info(sb.toString());
        }
    }
}
  • 这个是cluster级别的metrics consumer,只能在storm.yaml里头配置
  • 它的handleDataPoints供ClusterMetricsConsumerExecutor回调
  • 这里handleDataPoints仅仅是打印到日志文件

storm.yaml配置

## Cluster Metrics Consumers
storm.cluster.metrics.consumer.register:
   - class: "org.apache.storm.metric.LoggingClusterMetricsConsumer"
#   - class: "com.example.demo.metric.FixedLoggingClusterMetricsConsumer"
#     argument:
#       - endpoint: "metrics-collector.mycompany.org"
#
storm.cluster.metrics.consumer.publish.interval.secs: 5
  • 这里指定了consumer类为LoggingClusterMetricsConsumer,同时指定了publish interval为5秒

cluster.xml

<?xml version="1.0" encoding="UTF-8"?>
<!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
 The ASF licenses this file to You under the Apache License, Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at
​
     http://www.apache.org/licenses/LICENSE-2.0
​
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-->
​
<configuration monitorInterval="60" shutdownHook="disable" packages="org.apache.logging.log4j.core,io.sentry.log4j2">
<properties>
    <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property>
    <property name="patternMetrics">%d %-8r %m%n</property>
</properties>
<appenders>
    <RollingFile name="A1" immediateFlush="false"
                 fileName="${sys:storm.log.dir}/${sys:logfile.name}"
                 filePattern="${sys:storm.log.dir}/${sys:logfile.name}.%i.gz">
        <PatternLayout>
            <pattern>${pattern}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <RollingFile name="WEB-ACCESS" immediateFlush="false"
                 fileName="${sys:storm.log.dir}/access-web-${sys:daemon.name}.log"
                 filePattern="${sys:storm.log.dir}/access-web-${sys:daemon.name}.log.%i.gz">
        <PatternLayout>
            <pattern>${pattern}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <RollingFile name="THRIFT-ACCESS" immediateFlush="false"
                 fileName="${sys:storm.log.dir}/access-${sys:logfile.name}"
                 filePattern="${sys:storm.log.dir}/access-${sys:logfile.name}.%i.gz">
    <PatternLayout>
        <pattern>${pattern}</pattern>
    </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <RollingFile name="METRICS"
                 fileName="${sys:storm.log.dir}/${sys:logfile.name}.metrics"
                 filePattern="${sys:storm.log.dir}/${sys:logfile.name}.metrics.%i.gz">
        <PatternLayout>
            <pattern>${patternMetrics}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="2 MB"/>
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514"
            protocol="UDP" appName="[${sys:daemon.name}]" mdcId="mdc" includeMDC="true"
            facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}"
            messageId="[${sys:user.name}:S0]" id="storm" immediateFlush="true" immediateFail="true"/>
</appenders>
<loggers>
​
    <Logger name="org.apache.storm.logging.filters.AccessLoggingFilter" level="info" additivity="false">
        <AppenderRef ref="WEB-ACCESS"/>
        <AppenderRef ref="syslog"/>
    </Logger>
    <Logger name="org.apache.storm.logging.ThriftAccessLogger" level="info" additivity="false">
        <AppenderRef ref="THRIFT-ACCESS"/>
        <AppenderRef ref="syslog"/>
    </Logger>
    <Logger name="org.apache.storm.metric.LoggingClusterMetricsConsumer" level="info" additivity="false">
        <appender-ref ref="METRICS"/>
    </Logger>
    <root level="info"> <!-- We log everything -->
        <appender-ref ref="A1"/>
        <appender-ref ref="syslog"/>
        <appender-ref ref="Sentry" level="ERROR" />
    </root>
</loggers>
</configuration>
  • cluster.xml指定了metrics logging的相关配置,这里使用的是METRICS appender,该appender是一个RollingFile,文件名为​{sys:logfile.name}.metrics,例如nimbus默认的logfile.name为nimbus.log,supervisor默认的logfile.name为supervisor.log,因而写入的文件为nimbus.log.metrics及supervisor.log.metrics

日志输出实例

2018-11-06 07:51:51,488 18628    1541490711           <cluster>                                <cluster>        supervisors             1
2018-11-06 07:51:51,488 18628    1541490711           <cluster>                                <cluster>        topologies              0
2018-11-06 07:51:51,489 18629    1541490711           <cluster>                                <cluster>        slotsTotal              4
2018-11-06 07:51:51,489 18629    1541490711           <cluster>                                <cluster>        slotsUsed               0
2018-11-06 07:51:51,489 18629    1541490711           <cluster>                                <cluster>        slotsFree               4
2018-11-06 07:51:51,489 18629    1541490711           <cluster>                                <cluster>        executorsTotal          0
2018-11-06 07:51:51,489 18629    1541490711           <cluster>                                <cluster>        tasksTotal              0
2018-11-06 07:51:51,496 18636    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        slotsTotal              4
2018-11-06 07:51:51,497 18637    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        slotsUsed               0
2018-11-06 07:51:51,497 18637    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        totalMem                3072.0
2018-11-06 07:51:51,497 18637    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        totalCpu                400.0
2018-11-06 07:51:51,498 18638    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        usedMem                 0.0
2018-11-06 07:51:51,498 18638    1541490711      192.168.99.100     5bbd576d-218c-4365-ac5e-865b1f6e9b29        usedCpu                 0.0

ClusterMetricsConsumerExecutor

storm-2.0.0/storm-server/src/main/java/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java

public class ClusterMetricsConsumerExecutor {
    public static final Logger LOG = LoggerFactory.getLogger(ClusterMetricsConsumerExecutor.class);
    private static final String ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED =
        "Preparation of Cluster Metrics Consumer failed. " +
        "Please check your configuration and/or corresponding systems and relaunch Nimbus. " +
        "Skipping handle metrics.";
​
    private IClusterMetricsConsumer metricsConsumer;
    private String consumerClassName;
    private Object registrationArgument;
​
    public ClusterMetricsConsumerExecutor(String consumerClassName, Object registrationArgument) {
        this.consumerClassName = consumerClassName;
        this.registrationArgument = registrationArgument;
    }
​
    public void prepare() {
        try {
            metricsConsumer = (IClusterMetricsConsumer) Class.forName(consumerClassName).newInstance();
            metricsConsumer.prepare(registrationArgument);
        } catch (Exception e) {
            LOG.error("Could not instantiate or prepare Cluster Metrics Consumer with fully qualified name " +
                      consumerClassName, e);
​
            if (metricsConsumer != null) {
                metricsConsumer.cleanup();
            }
            metricsConsumer = null;
        }
    }
​
    public void handleDataPoints(final IClusterMetricsConsumer.ClusterInfo clusterInfo, final Collection<DataPoint> dataPoints) {
        if (metricsConsumer == null) {
            LOG.error(ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED);
            return;
        }
​
        try {
            metricsConsumer.handleDataPoints(clusterInfo, dataPoints);
        } catch (Throwable e) {
            LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e);
        }
    }
​
    public void handleDataPoints(final IClusterMetricsConsumer.SupervisorInfo supervisorInfo, final Collection<DataPoint> dataPoints) {
        if (metricsConsumer == null) {
            LOG.error(ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED);
            return;
        }
​
        try {
            metricsConsumer.handleDataPoints(supervisorInfo, dataPoints);
        } catch (Throwable e) {
            LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e);
        }
    }
​
    public void cleanup() {
        if (metricsConsumer != null) {
            metricsConsumer.cleanup();
        }
    }
}
  • ClusterMetricsConsumerExecutor在prepare的时候,根据consumerClassName来实例化IClusterMetricsConsumer的实现类,之后传入调用metricsConsumer.prepare(registrationArgument)做些准备
  • ClusterMetricsConsumerExecutor的handleDataPoints方法实际上是代理了metricsConsumer的handleDataPoints
  • 该handleDataPoints方法有两个,他们都有共同的参数dataPoints,另外一个不同的参数,是一个传的是ClusterInfo,一个是SupervisorInfo,分别用于nimbus及supervisor

Nimbus.launchServer

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

    public void launchServer() throws Exception {
        try {
            BlobStore store = blobStore;
            IStormClusterState state = stormClusterState;
            NimbusInfo hpi = nimbusHostPortInfo;
​
            LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf));
            validator.prepare(conf);
​
            //......
​
            timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
                                    () -> {
                                        try {
                                            if (isLeader()) {
                                                sendClusterMetricsToExecutors();
                                            }
                                        } catch (Exception e) {
                                            throw new RuntimeException(e);
                                        }
                                    });
            
            timer.scheduleRecurring(5, 5, clusterMetricSet);
        } catch (Exception e) {
            if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
                throw e;
            }
​
            if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {
                throw e;
            }
            LOG.error("Error on initialization of nimbus", e);
            Utils.exitProcess(13, "Error on initialization of nimbus");
        }
    }
​
    private boolean isLeader() throws Exception {
        return leaderElector.isLeader();
    }
  • Nimbus的launchServer方法创建了一个定时任务,如果是leader节点,则调用sendClusterMetricsToExecutors方法发送相关metrics到metrics consumer
  • 定时任务的调度时间间隔为DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS(storm.cluster.metrics.consumer.publish.interval.secs),在defaults.yaml文件中默认为60,也可以自己在storm.yaml中自己指定
  • 除了发送metrics到metrics consumer,它还有一个定时任务,每隔5秒调用一下ClusterSummaryMetricSet这个线程

Nimbus.sendClusterMetricsToExecutors

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

    private void sendClusterMetricsToExecutors() throws Exception {
        ClusterInfo clusterInfo = mkClusterInfo();
        ClusterSummary clusterSummary = getClusterInfoImpl();
        List<DataPoint> clusterMetrics = extractClusterMetrics(clusterSummary);
        Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> supervisorMetrics = extractSupervisorMetrics(clusterSummary);
        for (ClusterMetricsConsumerExecutor consumerExecutor : clusterConsumerExceutors) {
            consumerExecutor.handleDataPoints(clusterInfo, clusterMetrics);
            for (Entry<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> entry : supervisorMetrics.entrySet()) {
                consumerExecutor.handleDataPoints(entry.getKey(), entry.getValue());
            }
        }
    }
  • nimbus的sendClusterMetricsToExecutors方法通过extractClusterMetrics及extractSupervisorMetrics提取相关metrics,然后调用consumerExecutor.handleDataPoints传递数据

ClusterSummaryMetricSet

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

    private class ClusterSummaryMetricSet implements Runnable {
        private static final int CACHING_WINDOW = 5;
        
        private final ClusterSummaryMetrics clusterSummaryMetrics = new ClusterSummaryMetrics();
        
        private final Function<String, Histogram> registerHistogram = (name) -> {
            //This histogram reflects the data distribution across only one ClusterSummary, i.e.,
            // data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment.
            // Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update
            final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
            clusterSummaryMetrics.put(name, histogram);
            return histogram;
        };
        private volatile boolean active = false;
​
        //NImbus metrics distribution
        private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs");
​
        //Supervisor metrics distribution
        private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs");
        private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers");
        private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers");
        private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem");
        private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu");
        private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem");
        private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu");
​
        //Topology metrics distribution
        private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks");
        private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors");
        private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers");
        private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs");
        private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count");
        private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap");
        private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap");
        private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu");
        private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap");
        private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap");
        private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu");
        private final StormMetricsRegistry metricsRegistry;
​
        /**
         * Constructor to put all items in ClusterSummary in MetricSet as a metric.
         * All metrics are derived from a cached ClusterSummary object,
         * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters.
         * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than
         * reporting interval to avoid outdated reporting.
         */
        ClusterSummaryMetricSet(StormMetricsRegistry metricsRegistry) {
            this.metricsRegistry = metricsRegistry;
            //Break the code if out of sync to thrift protocol
            assert ClusterSummary._Fields.values().length == 3
                && ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS
                && ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES
                && ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES;
​
            final CachedGauge<ClusterSummary> cachedSummary = new CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) {
                @Override
                protected ClusterSummary loadValue() {
                    try {
                        ClusterSummary newSummary = getClusterInfoImpl();
                        LOG.debug("The new summary is {}", newSummary);
                        /*
                         * Update histograms based on the new summary. Most common implementation of Reporter reports Gauges before
                         * Histograms. Because DerivativeGauge will trigger cache refresh upon reporter's query, histogram will also be
                         * updated before query
                         */
                        updateHistogram(newSummary);
                        return newSummary;
                    } catch (Exception e) {
                        LOG.warn("Get cluster info exception.", e);
                        throw new RuntimeException(e);
                    }
                }
            };
​
            clusterSummaryMetrics.put("cluster:num-nimbus-leaders", new DerivativeGauge<ClusterSummary, Long>(cachedSummary) {
                @Override
                protected Long transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_nimbuses().stream()
                            .filter(NimbusSummary::is_isLeader)
                            .count();
                }
            });
            clusterSummaryMetrics.put("cluster:num-nimbuses", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
                @Override
                protected Integer transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_nimbuses_size();
                }
            });
            clusterSummaryMetrics.put("cluster:num-supervisors", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
                @Override
                protected Integer transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_supervisors_size();
                }
            });
            clusterSummaryMetrics.put("cluster:num-topologies", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
                @Override
                protected Integer transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_topologies_size();
                }
            });
            clusterSummaryMetrics.put("cluster:num-total-workers", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
                @Override
                protected Integer transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_supervisors().stream()
                            .mapToInt(SupervisorSummary::get_num_workers)
                            .sum();
                }
            });
            clusterSummaryMetrics.put("cluster:num-total-used-workers", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
                @Override
                protected Integer transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_supervisors().stream()
                            .mapToInt(SupervisorSummary::get_num_used_workers)
                            .sum();
                }
            });
            clusterSummaryMetrics.put("cluster:total-fragmented-memory-non-negative", new DerivativeGauge<ClusterSummary, Double>(cachedSummary) {
                @Override
                protected Double transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_supervisors().stream()
                            //Filtered negative value
                            .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_mem(), 0))
                            .sum();
                }
            });
            clusterSummaryMetrics.put("cluster:total-fragmented-cpu-non-negative", new DerivativeGauge<ClusterSummary, Double>(cachedSummary) {
                @Override
                protected Double transform(ClusterSummary clusterSummary) {
                    return clusterSummary.get_supervisors().stream()
                            //Filtered negative value
                            .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_cpu(), 0))
                            .sum();
                }
            });
        }
​
        private void updateHistogram(ClusterSummary newSummary) {
            for (NimbusSummary nimbusSummary : newSummary.get_nimbuses()) {
                nimbusUptime.update(nimbusSummary.get_uptime_secs());
            }
            for (SupervisorSummary summary : newSummary.get_supervisors()) {
                supervisorsUptime.update(summary.get_uptime_secs());
                supervisorsNumWorkers.update(summary.get_num_workers());
                supervisorsNumUsedWorkers.update(summary.get_num_used_workers());
                supervisorsUsedMem.update(Math.round(summary.get_used_mem()));
                supervisorsUsedCpu.update(Math.round(summary.get_used_cpu()));
                supervisorsFragmentedMem.update(Math.round(summary.get_fragmented_mem()));
                supervisorsFragmentedCpu.update(Math.round(summary.get_fragmented_cpu()));
            }
            for (TopologySummary summary : newSummary.get_topologies()) {
                topologiesNumTasks.update(summary.get_num_tasks());
                topologiesNumExecutors.update(summary.get_num_executors());
                topologiesNumWorker.update(summary.get_num_workers());
                topologiesUptime.update(summary.get_uptime_secs());
                topologiesReplicationCount.update(summary.get_replication_count());
                topologiesRequestedMemOnHeap.update(Math.round(summary.get_requested_memonheap()));
                topologiesRequestedMemOffHeap.update(Math.round(summary.get_requested_memoffheap()));
                topologiesRequestedCpu.update(Math.round(summary.get_requested_cpu()));
                topologiesAssignedMemOnHeap.update(Math.round(summary.get_assigned_memonheap()));
                topologiesAssignedMemOffHeap.update(Math.round(summary.get_assigned_memoffheap()));
                topologiesAssignedCpu.update(Math.round(summary.get_assigned_cpu()));
            }
        }
​
        void setActive(final boolean active) {
            if (this.active != active) {
                this.active = active;
                if (active) {
                    metricsRegistry.registerAll(clusterSummaryMetrics);
                } else {
                    metricsRegistry.removeAll(clusterSummaryMetrics);
                }
            }
        }
​
        @Override
        public void run() {
            try {
                setActive(isLeader());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
  • 这个线程主要是调用setActive方法,做的工作的话,就是不断判断节点状态变化,如果leader发生变化,自己是leader则注册clusterSummaryMetrics,如果自己变成不是leader则删除掉clusterSummaryMetrics
  • clusterSummaryMetrics添加了cluster:num-nimbus-leaders、cluster:num-nimbuses、cluster:num-supervisors、cluster:num-topologies、cluster:num-total-workers、cluster:num-total-used-workers、cluster:total-fragmented-memory-non-negative、cluster:total-fragmented-cpu-non-negative这几个指标

小结

  • LoggingClusterMetricsConsumer消费的是cluster级别的指标,它消费了指标数据,然后打印到日志文件,log4j2的配置读取的是cluster.xml,最后写入的文件是nimbus.log.metrics、supervisor.log.metrics;而LoggingMetricsConsumer是topology的worker级别的,log4j2的配置读取的是worker.xml,最后写入的文件是worker.log.metrics
  • Nimbus在launchServer的时候,会建立一个定时任务,在当前节点是leader的情况下,定时发送metrics指标到ClusterMetricsConsumerExecutor,然后间接回调LoggingClusterMetricsConsumer的handleDataPoints方法,把数据打印到日志;LoggingMetricsConsumer采取的是在Executor设置定时任务来发射metricsTickTuple,触发SpoutExecutor以及BoltExecutor发送指标到topology内置的MetricsConsumerBolt,然后MetricsConsumerBolt回调LoggingMetricsConsumer.handleDataPoints方法来消费数据,把数据打印到日志
  • handleDataPoints处理两类info,一类是ClusterInfo,一类是SupervisorInfo;这里要注意一下定时任务是在当前节点是leader的情况下才会sendClusterMetricsToExecutors的,正常情况nimbus与supervisor不在同一个节点上,因而supervisor.log.metrics可能是空的
  • LoggingMetricsConsumer的实现依赖旧版的IMetric,而LoggingClusterMetricsConsumer则不依赖IMetric,它是从IStormClusterState中获取的指标
  • storm 1.2版本引入了基于Dropwizard Metrics的新的指标系统,TopologyContext中返回IMetric的registerMetric在1.2版本已经被标记为Deprecated,因而LoggingMetricsConsumer后续可能需要改造为基于metrics2的MetricRegistry来获取指标

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏生信小驿站

数据处理第一节:选取列的基本到高级方法选取列列名

博客原文:https://suzan.rbind.io/2018/01/dplyr-tutorial-1/ 作者:Suzan Baert

922
来自专栏积累沉淀

初识HtmlParser

1、概念 网页解析,即程序自动分析网页内容、获取信息,从而进一步处理信息。 htmlparser包提供方便、简洁的处理html文件的方法,它将html页面中...

2165
来自专栏古时的风筝

模拟实现Spring中的注解装配

在Spring中,XML文件中的bean配置是实现Spring IOC的核心配置文件,在早版本的Spring中,只能基于XML配置文件,配置各个对象之间的依赖关...

2235
来自专栏函数式编程语言及工具

FunDA(15)- 示范:任务并行运算 - user task parallel execution

    FunDA的并行运算施用就是对用户自定义函数的并行运算。原理上就是把一个输入流截分成多个输入流并行地输入到一个自定义函数的多个运行实例。这些函数运行实例...

1919
来自专栏Ldpe2G的个人博客

Graphviz4S ---- 在Scala中使用DOT语言绘图的开源工具

2036
来自专栏我叫刘半仙

原自己手写一个Mybatis框架(简化)

       继上一篇手写SpringMVC之后,我最近趁热打铁,研究了一下Mybatis。MyBatis框架的核心功能其实不难,无非就是动态代理和jdbc的操...

1.9K6
来自专栏MasiMaro 的技术博文

windows错误处理

在调用windows API时函数会首先对我们传入的参数进行校验,然后执行,如果出现什么情况导致函数执行出错,有的函数可以通过返回值来判断函数是否出错,比如对于...

1232
来自专栏北京马哥教育

深度详解 Python yield与实现

学Python最简单的方法是什么?推荐阅读:Python开发工程师成长魔法 Python yield与实现 yield的功能类似于return,但是不同之处在于...

56612
来自专栏深度学习之tensorflow实战篇

Shapes and line types for R

Solution ? ? Note that with bitmap output, the filled symbols 15-18 may render ...

3828
来自专栏技术栈大杂烩

Python: 浅析 return 和 finally 共同挖的坑

  相信每一个用过Python函数的童鞋, 肯定会用过return语句, return顾名思义, 就是用来返回值给调用者, 例如:

1074

扫码关注云+社区

领取腾讯云代金券