聊聊storm的LoggingMetricsConsumer

本文主要研究一下storm的LoggingMetricsConsumer

LoggingMetricsConsumer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/LoggingMetricsConsumer.java

public class LoggingMetricsConsumer implements IMetricsConsumer {
    public static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsConsumer.class);
    static private String padding = "                       ";

    @Override
    public void prepare(Map<String, Object> topoConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
    }

    @Override
    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
        StringBuilder sb = new StringBuilder();
        String header = String.format("%d\t%15s:%-4d\t%3d:%-11s\t",
                                      taskInfo.timestamp,
                                      taskInfo.srcWorkerHost, taskInfo.srcWorkerPort,
                                      taskInfo.srcTaskId,
                                      taskInfo.srcComponentId);
        sb.append(header);
        for (DataPoint p : dataPoints) {
            sb.delete(header.length(), sb.length());
            sb.append(p.name)
              .append(padding).delete(header.length() + 23, sb.length()).append("\t")
              .append(p.value);
            LOG.info(sb.toString());
        }
    }

    @Override
    public void cleanup() {
    }
}
  • LoggingMetricsConsumer实现了IMetricsConsumer接口,在handleDataPoints方法将taskInfo及dataPoints打印到log;具体打印到哪个log呢,这个需要看storm的log4j2的配置

log4j2/worker.xml

<?xml version="1.0" encoding="UTF-8"?>
<!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
 The ASF licenses this file to You under the Apache License, Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-->

<configuration monitorInterval="60" shutdownHook="disable">
<properties>
    <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property>
    <property name="patternNoTime">%msg%n</property>
    <property name="patternMetrics">%d %-8r %m%n</property>
</properties>
<appenders>
    <RollingFile name="A1"
        fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}"
        filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.%i.gz">
        <PatternLayout>
            <pattern>${pattern}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <RollingFile name="STDOUT"
        fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out"
        filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out.%i.gz">
        <PatternLayout>
            <pattern>${patternNoTime}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="4"/>
    </RollingFile>
    <RollingFile name="STDERR"
        fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err"
        filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err.%i.gz">
        <PatternLayout>
            <pattern>${patternNoTime}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="4"/>
    </RollingFile>
    <RollingFile name="METRICS"
        fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics"
        filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics.%i.gz">
        <PatternLayout>
            <pattern>${patternMetrics}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="2 MB"/>
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514"
        protocol="UDP" appName="[${sys:storm.id}:${sys:worker.port}]" mdcId="mdc" includeMDC="true"
        facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}"
        messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" immediateFail="true" immediateFlush="true"/>
</appenders>
<loggers>
    <root level="info"> <!-- We log everything -->
        <appender-ref ref="A1"/>
        <appender-ref ref="syslog"/>
    </root>
    <Logger name="org.apache.storm.metric.LoggingMetricsConsumer" level="info" additivity="false">
        <appender-ref ref="METRICS"/>
    </Logger>
    <Logger name="STDERR" level="INFO">
        <appender-ref ref="STDERR"/>
        <appender-ref ref="syslog"/>
    </Logger>
    <Logger name="STDOUT" level="INFO">
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="syslog"/>
    </Logger>
</loggers>
</configuration>
  • 以worker.xml为例,这里对name为org.apache.storm.metric.LoggingMetricsConsumer的logger配置了info级别的输出,additivity为false
  • METRICS的appender指定了文件名为${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics,比如workers-artifacts/tickDemo-1-1541070680/6700/worker.log.metrics
  • METRCIS配置的是RollingFile,SizeBasedTriggeringPolicy的大小为2MB

配置

topology配置

conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
  • 可以在topology提交的时候,在conf注册LoggingMetricsConsumer;这种配置只有该topology的worker生效,即有指标数据的话,会写入topology的worker.log.metrics文件

storm.yaml配置

topology.metrics.consumer.register:
  - class: "org.apache.storm.metric.LoggingMetricsConsumer"
    max.retain.metric.tuples: 100
    parallelism.hint: 1
  - class: "org.apache.storm.metric.HttpForwardingMetricsConsumer"
    parallelism.hint: 1
    argument: "http://example.com:8080/metrics/my-topology/"
  • storm.yaml配置是作用于所有的topology,注意这里配置的是topology.metrics.consumer.register,是topology级别的,数据是写入worker.log.metrics文件
  • 如果是cluster级别的话,配置的是storm.cluster.metrics.consumer.register,而且只能使用storm.yaml的配置方式,开启这个的话,有指标数据会写入nimbus.log.metrics以及supervisor.log.metrics文件
  • 启动nimbus以及supervisor采用的log4j配置参数为-Dlog4j.configurationFile=/apache-storm/log4j2/cluster.xml;而启动woker采用的log4j配置参数为-Dlog4j.configurationFile=/apache-storm/log4j2/worker.xml;各个组件的-Dlogfile.name参数分别为nimbus.log、supervisor.log、worker.log

MetricsConsumerBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java

public class MetricsConsumerBolt implements IBolt {
    public static final Logger LOG = LoggerFactory.getLogger(MetricsConsumerBolt.class);
    private final int _maxRetainMetricTuples;
    private final Predicate<IMetricsConsumer.DataPoint> _filterPredicate;
    private final DataPointExpander _expander;
    private final BlockingQueue<MetricsTask> _taskQueue;
    IMetricsConsumer _metricsConsumer;
    String _consumerClassName;
    OutputCollector _collector;
    Object _registrationArgument;
    private Thread _taskExecuteThread;
    private volatile boolean _running = true;

    public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples,
                               Predicate<IMetricsConsumer.DataPoint> filterPredicate, DataPointExpander expander) {

        _consumerClassName = consumerClassName;
        _registrationArgument = registrationArgument;
        _maxRetainMetricTuples = maxRetainMetricTuples;
        _filterPredicate = filterPredicate;
        _expander = expander;

        if (_maxRetainMetricTuples > 0) {
            _taskQueue = new LinkedBlockingDeque<>(_maxRetainMetricTuples);
        } else {
            _taskQueue = new LinkedBlockingDeque<>();
        }
    }

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        try {
            _metricsConsumer = (IMetricsConsumer) Class.forName(_consumerClassName).newInstance();
        } catch (Exception e) {
            throw new RuntimeException("Could not instantiate a class listed in config under section " +
                                       Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e);
        }
        _metricsConsumer.prepare(topoConf, _registrationArgument, context, collector);
        _collector = collector;
        _taskExecuteThread = new Thread(new MetricsHandlerRunnable());
        _taskExecuteThread.setDaemon(true);
        _taskExecuteThread.start();
    }

    @Override
    public void execute(Tuple input) {
        IMetricsConsumer.TaskInfo taskInfo = (IMetricsConsumer.TaskInfo) input.getValue(0);
        Collection<IMetricsConsumer.DataPoint> dataPoints = (Collection) input.getValue(1);
        Collection<IMetricsConsumer.DataPoint> expandedDataPoints = _expander.expandDataPoints(dataPoints);
        List<IMetricsConsumer.DataPoint> filteredDataPoints = getFilteredDataPoints(expandedDataPoints);
        MetricsTask metricsTask = new MetricsTask(taskInfo, filteredDataPoints);

        while (!_taskQueue.offer(metricsTask)) {
            _taskQueue.poll();
        }

        _collector.ack(input);
    }

    private List<IMetricsConsumer.DataPoint> getFilteredDataPoints(Collection<IMetricsConsumer.DataPoint> dataPoints) {
        return Lists.newArrayList(Iterables.filter(dataPoints, _filterPredicate));
    }

    @Override
    public void cleanup() {
        _running = false;
        _metricsConsumer.cleanup();
        _taskExecuteThread.interrupt();
    }

    static class MetricsTask {
        private IMetricsConsumer.TaskInfo taskInfo;
        private Collection<IMetricsConsumer.DataPoint> dataPoints;

        public MetricsTask(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) {
            this.taskInfo = taskInfo;
            this.dataPoints = dataPoints;
        }

        public IMetricsConsumer.TaskInfo getTaskInfo() {
            return taskInfo;
        }

        public Collection<IMetricsConsumer.DataPoint> getDataPoints() {
            return dataPoints;
        }
    }

    class MetricsHandlerRunnable implements Runnable {

        @Override
        public void run() {
            while (_running) {
                try {
                    MetricsTask task = _taskQueue.take();
                    _metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints());
                } catch (InterruptedException e) {
                    break;
                } catch (Throwable t) {
                    LOG.error("Exception occurred during handle metrics", t);
                }
            }
        }
    }

}
  • MetricsConsumerBolt在构造器里头创建了_taskQueue,如果_maxRetainMetricTuples大于0,则创建的是有界队列,否则创建的是无界队列;读取的是topology.metrics.consumer.register下面的max.retain.metric.tuples值,读取不到默认为100
  • MetricsConsumerBolt在prepare的时候启动了MetricsHandlerRunnable线程,该线程从_taskQueue取出MetricsTask,然后调用_metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints());
  • MetricsConsumerBolt的execute方法,在接收到tuple的时候,就会往_taskQueue添加数据,如果添加不进去,则poll掉一个再添加

StormCommon.systemTopologyImpl

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java

    protected StormTopology systemTopologyImpl(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
        validateBasic(topology);

        StormTopology ret = topology.deepCopy();
        addAcker(topoConf, ret);
        if (hasEventLoggers(topoConf)) {
            addEventLogger(topoConf, ret);
        }
        addMetricComponents(topoConf, ret);
        addSystemComponents(topoConf, ret);
        addMetricStreams(ret);
        addSystemStreams(ret);

        validateStructure(ret);

        return ret;
    }

    public static void addMetricComponents(Map<String, Object> conf, StormTopology topology) {
        Map<String, Bolt> metricsConsumerBolts = metricsConsumerBoltSpecs(conf, topology);
        for (Map.Entry<String, Bolt> entry : metricsConsumerBolts.entrySet()) {
            topology.put_to_bolts(entry.getKey(), entry.getValue());
        }
    }

    public static void addMetricStreams(StormTopology topology) {
        for (Object component : allComponents(topology).values()) {
            ComponentCommon common = getComponentCommon(component);
            StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points"));
            common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo);
        }
    }

    public static Map<String, Bolt> metricsConsumerBoltSpecs(Map<String, Object> conf, StormTopology topology) {
        Map<String, Bolt> metricsConsumerBolts = new HashMap<>();

        Set<String> componentIdsEmitMetrics = new HashSet<>();
        componentIdsEmitMetrics.addAll(allComponents(topology).keySet());
        componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID);

        Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
        for (String componentId : componentIdsEmitMetrics) {
            inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());
        }

        List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);
        if (registerInfo != null) {
            Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>();
            for (Map<String, Object> info : registerInfo) {
                String className = (String) info.get(TOPOLOGY_METRICS_CONSUMER_CLASS);
                Object argument = info.get(TOPOLOGY_METRICS_CONSUMER_ARGUMENT);
                Integer maxRetainMetricTuples = ObjectReader.getInt(info.get(
                    TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES), 100);
                Integer phintNum = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT), 1);
                Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
                metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
                List<String> whitelist = (List<String>) info.get(
                    TOPOLOGY_METRICS_CONSUMER_WHITELIST);
                List<String> blacklist = (List<String>) info.get(
                    TOPOLOGY_METRICS_CONSUMER_BLACKLIST);
                FilterByMetricName filterPredicate = new FilterByMetricName(whitelist, blacklist);
                Boolean expandMapType = ObjectReader.getBoolean(info.get(
                    TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE), false);
                String metricNameSeparator = ObjectReader.getString(info.get(
                    TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR), ".");
                DataPointExpander expander = new DataPointExpander(expandMapType, metricNameSeparator);
                MetricsConsumerBolt boltInstance = new MetricsConsumerBolt(className, argument,
                                                                           maxRetainMetricTuples, filterPredicate, expander);
                Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs,
                                                                               boltInstance, null, phintNum, metricsConsumerConf);

                String id = className;
                if (classOccurrencesMap.containsKey(className)) {
                    // e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
                    int occurrenceNum = classOccurrencesMap.get(className);
                    occurrenceNum++;
                    classOccurrencesMap.put(className, occurrenceNum);
                    id = Constants.METRICS_COMPONENT_ID_PREFIX + className + "#" + occurrenceNum;
                } else {
                    id = Constants.METRICS_COMPONENT_ID_PREFIX + className;
                    classOccurrencesMap.put(className, 1);
                }
                metricsConsumerBolts.put(id, metricsConsumerBolt);
            }
        }
        return metricsConsumerBolts;
    }
  • StormCommon在创建systemTopologyImpl的时候,会添加添加一些系统的components,这里就调用了addMetricComponents以及addMetricStreams
  • addMetricComponents根据conf创建MetricsConsumerBolt,并使用shuffle以及Constants.METRICS_STREAM_ID指定所有的component为输入源
  • addMetricStreams给每个component配置了输出数据到Constants.METRICS_STREAM_ID,且输出的字段为Arrays.asList(“task-info”, “data-points”)

Executor.setupMetrics

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

    protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;

    protected void setupMetrics() {
        for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {
            StormTimer timerTask = workerData.getUserTimer();
            timerTask.scheduleRecurring(interval, interval,
                                        () -> {
                                            TupleImpl tuple =
                                                new TupleImpl(workerTopologyContext, new Values(interval), Constants.SYSTEM_COMPONENT_ID,
                                                              (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
                                            AddressedTuple metricsTickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
                                            try {
                                                receiveQueue.publish(metricsTickTuple);
                                                receiveQueue.flush();  // avoid buffering
                                            } catch (InterruptedException e) {
                                                LOG.warn("Thread interrupted when publishing metrics. Setting interrupt flag.");
                                                Thread.currentThread().interrupt();
                                                return;
                                            }
                                        }
            );
        }
    }

    public Map<Integer, Map<Integer, Map<String, IMetric>>> getIntervalToTaskToMetricToRegistry() {
        return intervalToTaskToMetricToRegistry;
    }
  • Executor在setupMetrics方法里头,设置了定时任务,采用BROADCAST_DEST的方式定时向METRICS_TICK_STREAM_ID发射metricsTickTuple
  • 这里是依据intervalToTaskToMetricToRegistry来配置的,其key为interval
  • intervalToTaskToMetricToRegistry在Executor构造器中初始化:intervalToTaskToMetricToRegistry = new HashMap<>()

Task.mkTopologyContext

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.java

    private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
        Map<String, Object> conf = workerData.getConf();
        return new TopologyContext(
            topology,
            workerData.getTopologyConf(),
            workerData.getTaskToComponent(),
            workerData.getComponentToSortedTasks(),
            workerData.getComponentToStreamToFields(),
            // This is updated by the Worker and the topology has shared access to it
            workerData.getBlobToLastKnownVersion(),
            workerData.getTopologyId(),
            ConfigUtils.supervisorStormResourcesPath(
                ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())),
            ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()),
            taskId,
            workerData.getPort(), workerData.getLocalTaskIds(),
            workerData.getDefaultSharedResources(),
            workerData.getUserSharedResources(),
            executor.getSharedExecutorData(),
            executor.getIntervalToTaskToMetricToRegistry(),
            executor.getOpenOrPrepareWasCalled());
    }
  • mkTopologyContext方法在创建TopologyContext的时候,传递进去了executor.getIntervalToTaskToMetricToRegistry()

TopologyContext

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java

public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
    private Integer _taskId;
    private Map<String, Object> _taskData = new HashMap<>();
    private List<ITaskHook> _hooks = new ArrayList<>();
    private Map<String, Object> _executorData;
    private Map<Integer, Map<Integer, Map<String, IMetric>>> _registeredMetrics;

    public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
        if (_openOrPrepareWasCalled.get()) {
            throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +
                                       "IBolt::prepare() or ISpout::open() method.");
        }

        if (metric == null) {
            throw new IllegalArgumentException("Cannot register a null metric");
        }

        if (timeBucketSizeInSecs <= 0) {
            throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs " +
                                               "greater than or equal to 1 second.");
        }

        if (getRegisteredMetricByName(name) != null) {
            throw new RuntimeException("The same metric name `" + name + "` was registered twice.");
        }

        Map<Integer, Map<Integer, Map<String, IMetric>>> m1 = _registeredMetrics;
        if (!m1.containsKey(timeBucketSizeInSecs)) {
            m1.put(timeBucketSizeInSecs, new HashMap<Integer, Map<String, IMetric>>());
        }

        Map<Integer, Map<String, IMetric>> m2 = m1.get(timeBucketSizeInSecs);
        if (!m2.containsKey(_taskId)) {
            m2.put(_taskId, new HashMap<String, IMetric>());
        }

        Map<String, IMetric> m3 = m2.get(_taskId);
        if (m3.containsKey(name)) {
            throw new RuntimeException("The same metric name `" + name + "` was registered twice.");
        } else {
            m3.put(name, metric);
        }

        return metric;
    }

    //......
}
  • Executor的intervalToTaskToMetricToRegistry最后传递给了TopologyContext的_registeredMetrics
  • registerMetric方法会往_registeredMetrics添加值,其key为timeBucketSizeInSecs
  • 内置metrics的timeBucketSizeInSecs读取的是Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS(topology.builtin.metrics.bucket.size.secs)值,在defaults.yaml中默认为60,即Executor每隔60秒发射一次metricsTickTuple,其streamId为Constants.METRICS_TICK_STREAM_ID

Executor.metricsTick

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

    public void metricsTick(Task task, TupleImpl tuple) {
        try {
            Integer interval = tuple.getInteger(0);
            int taskId = task.getTaskId();
            Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = intervalToTaskToMetricToRegistry.get(interval);
            Map<String, IMetric> nameToRegistry = null;
            if (taskToMetricToRegistry != null) {
                nameToRegistry = taskToMetricToRegistry.get(taskId);
            }
            if (nameToRegistry != null) {
                IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
                    hostname, workerTopologyContext.getThisWorkerPort(),
                    componentId, taskId, Time.currentTimeSecs(), interval);
                List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();
                for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) {
                    IMetric metric = entry.getValue();
                    Object value = metric.getValueAndReset();
                    if (value != null) {
                        IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);
                        dataPoints.add(dataPoint);
                    }
                }
                if (!dataPoints.isEmpty()) {
                    task.sendUnanchored(Constants.METRICS_STREAM_ID,
                                        new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);
                    executorTransfer.flush();
                }
            }
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }
  • SpoutExecutor以及BoltExecutor在tupleActionFn中接收到streamId为Constants.METRICS_TICK_STREAM_ID的tuple的时候,会调用父类Executor.metricsTick方法
  • metricsTick采用task.sendUnanchored(Constants.METRICS_STREAM_ID, new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);发射数据,发射到Constants.METRICS_STREAM_ID中,values为taskInfo及dataPoints;dataPoints的数据从TopologyContext的_registeredMetrics中读取(这个使用的是旧版的metrics,非V2版本)
  • MetricsConsumerBolt接收到数据之后,就是放入_taskQueue队列;与此同时MetricsHandlerRunnable线程会阻塞从_taskQueue中取数据,然后回调_metricsConsumer.handleDataPoints方法来消费数据

小结

  • LoggingMetricsConsumer是storm metric提供的,metrics2中没有;nimbus及supervisor使用的是-Dlog4j.configurationFile=/apache-storm/log4j2/cluster.xml;worker使用的是-Dlog4j.configurationFile=/apache-storm/log4j2/worker.xml;各个组件的-Dlogfile.name分别为nimbus.log、supervisor.log、worker.log
  • storm在构建topology的时候会添加系统的component,其中就包括添加metricsConsumerBolt以及metricStreams;同时Executor在init方法中会setupMetrics,定时发射metricsTickTuple;SpoutExecutor以及BoltExecutor在tupleActionFn接收到metricsTickTuple的时候,会调用metricsTick方法来生产数据发射到Constants.METRICS_STREAM_ID中,之后MetricsConsumerBolt就可以接收数据,然后回调_metricsConsumer.handleDataPoints方法来消费数据
  • 这里要注意两个参数,一个是MetricsConsumerBolt里头用到的max.retain.metric.tuples,它是配置在topology.metrics.consumer.register下面的,如果没有配置默认为100;它是MetricsConsumerBolt里头_taskQueue队列的大小,如果设置为0,则表示无界;内置metric的interval读取的是Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS(topology.builtin.metrics.bucket.size.secs)参数,默认为60,即60秒发射一次metricsTickTuple

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊storm的maxSpoutPending

storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java

2075
来自专栏函数式编程语言及工具

Akka(23): Stream:自定义流构件功能-Custom defined stream processing stages

    从总体上看:akka-stream是由数据源头Source,流通节点Flow和数据流终点Sink三个框架性的流构件(stream components)...

4358
来自专栏智能大石头

C++返回值优化RVO

返回值优化,是一种属于编译器的技术,它通过转换源代码和对象的创建来加快源代码的执行速度。RVO = return value optimization。 测试...

3089
来自专栏积累沉淀

研究MapReduce源码之实现自定义LineRecordReader完成多行读取文件内容

TextInputFormat是Hadoop默认的数据输入格式,但是它只能一行一行的读记录,如果要读取多行怎么办? 很简单 自己写一个输入格式,然后写一个对...

2079
来自专栏程序员宝库

Java 8 时间 API 快速入门

Java 8 出来很久了,各位也可能已经在用了,不过其中新的时间日期 API 可能很少人用,甚至不知道怎么上手。本文快速介绍一下其中的主要的类的概念和用法。 一...

3185
来自专栏码匠的流水账

聊聊flink的RichParallelSourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/func...

1482
来自专栏菩提树下的杨过

机器学习笔记(6):多类逻辑回归-使用gluon

上一篇演示了纯手动添加隐藏层,这次使用gluon让代码更精减,代码来自:https://zh.gluon.ai/chapter_supervised-learn...

2589
来自专栏小樱的经验随笔

Codeforces 725B Food on the Plane

B. Food on the Plane time limit per test:2 seconds memory limit per test:256 meg...

3708
来自专栏码匠的流水账

聊聊storm的JoinBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java

1584
来自专栏Hellovass 的博客

优雅地烘焙 DBService

记得大二那年第一次接触 GreenDao 这个神奇的数据库,惊叹道,哇,原来代码还能这么写啊,不用自己手撸 SQLiteDatabase,不用写那些麻烦的 SQ...

1414

扫码关注云+社区

领取腾讯云代金券