前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊storm的messageTimeout

聊聊storm的messageTimeout

作者头像
code4it
修改2018-12-06 15:15:49
5530
修改2018-12-06 15:15:49
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下storm的messageTimeout

TOPOLOGY_MESSAGE_TIMEOUT_SECS

storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java

代码语言:javascript
复制
    /**
     * True if Storm should timeout messages or not. Defaults to true. This is meant to be used in unit tests to prevent tuples from being
     * accidentally timed out during the test.
     */
    @isBoolean
    public static final String TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS = "topology.enable.message.timeouts";

    /**
     * The maximum amount of time given to the topology to fully process a message emitted by a spout. If the message is not acked within
     * this time frame, Storm will fail the message on the spout. Some spouts implementations will then replay the message at a later time.
     */
    @isInteger
    @isPositiveNumber
    @NotNull
    public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";

    /**
     * How often a tick tuple from the "__system" component and "__tick" stream should be sent to tasks. Meant to be used as a
     * component-specific configuration.
     */
    @isInteger
    public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS = "topology.tick.tuple.freq.secs";
  • defaults.yaml中topology.enable.message.timeouts默认为true
  • defaults.yaml中topology.message.timeout.secs默认为30
  • defaults.yaml中topology.tick.tuple.freq.secs默认为null,实际是取的topology.message.timeout.secs的值

StormCommon.addAcker

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java

代码语言:javascript
复制
    public static void addAcker(Map<String, Object> conf, StormTopology topology) {
        int ackerNum =
            ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
        Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);

        Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
        outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
        outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
        outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));

        Map<String, Object> ackerConf = new HashMap<>();
        ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
        ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));

        Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf);

        for (Bolt bolt : topology.get_bolts().values()) {
            ComponentCommon common = bolt.get_common();
            common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val")));
            common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
            common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
        }

        for (SpoutSpec spout : topology.get_spouts().values()) {
            ComponentCommon common = spout.get_common();
            Map<String, Object> spoutConf = componentConf(spout);
            spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
                          ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
            common.set_json_conf(JSONValue.toJSONString(spoutConf));
            common.put_to_streams(Acker.ACKER_INIT_STREAM_ID,
                                  Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID),
                                 Thrift.prepareDirectGrouping());
            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID),
                                 Thrift.prepareDirectGrouping());
            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
                                 Thrift.prepareDirectGrouping());
        }

        topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker);
    }
  • storm在addAcker的时候,使用了Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS的值作为Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS

Executor.setupTicks

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

代码语言:javascript
复制
    protected void setupTicks(boolean isSpout) {
        final Integer tickTimeSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
        if (tickTimeSecs != null) {
            boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
            if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && Utils.isSystemId(componentId))
                || (!enableMessageTimeout && isSpout)) {
                LOG.info("Timeouts disabled for executor {}:{}", componentId, executorId);
            } else {
                StormTimer timerTask = workerData.getUserTimer();
                timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs,
                                            () -> {
                                                TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs),
                                                                                Constants.SYSTEM_COMPONENT_ID,
                                                                                (int) Constants.SYSTEM_TASK_ID,
                                                                                Constants.SYSTEM_TICK_STREAM_ID);
                                                AddressedTuple tickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
                                                try {
                                                    receiveQueue.publish(tickTuple);
                                                    receiveQueue.flush(); // avoid buffering
                                                } catch (InterruptedException e) {
                                                    LOG.warn("Thread interrupted when emitting tick tuple. Setting interrupt flag.");
                                                    Thread.currentThread().interrupt();
                                                    return;
                                                }
                                            }
                );
            }
        }
    }
  • Executor在setupTicks的时候,使用了Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS作为tickTimeSecs,即tickTuple的调度时间间隔
  • 调度tickTuple的前提之一是有开启Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS
  • 该定时任务每隔tickTimeSecs发射一个tickTuple,该tuple的srcComponent设置为Constants.SYSTEM_COMPONENT_ID(__system),taskId设置为Constants.SYSTEM_TASK_ID(-1),streamId设置为Constants.SYSTEM_TICK_STREAM_ID(__tick)

SpoutExecutor.tupleActionFn

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

代码语言:javascript
复制
    public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
        String streamId = tuple.getSourceStreamId();
        if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
            spoutOutputCollector.flush();
        } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
            pending.rotate();
        } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
            metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
        } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
            Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject();
            if (spoutObj instanceof ICredentialsListener) {
                ((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
            }
        } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
            Long id = (Long) tuple.getValue(0);
            TupleInfo pendingForId = pending.get(id);
            if (pendingForId != null) {
                pending.put(id, pendingForId);
            }
        } else {
            Long id = (Long) tuple.getValue(0);
            Long timeDeltaMs = (Long) tuple.getValue(1);
            TupleInfo tupleInfo = pending.remove(id);
            if (tupleInfo != null && tupleInfo.getMessageId() != null) {
                if (taskId != tupleInfo.getTaskId()) {
                    throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
                }
                Long timeDelta = null;
                if (hasAckers) {
                    long startTimeMs = tupleInfo.getTimestamp();
                    if (startTimeMs != 0) {
                        timeDelta = timeDeltaMs;
                    }
                }
                if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
                    ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo);
                } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
                    failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM");
                }
            }
        }
    }
  • SpoutExecutor在tupleActionFn方法接收到Constants.SYSTEM_TICK_STREAM_ID的tickTuple的时候,触发pending.rotate()方法

RotatingMap.rotate

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java

代码语言:javascript
复制
    public Map<K, V> rotate() {
        Map<K, V> dead = _buckets.removeLast();
        _buckets.addFirst(new HashMap<K, V>());
        if (_callback != null) {
            for (Entry<K, V> entry : dead.entrySet()) {
                _callback.expire(entry.getKey(), entry.getValue());
            }
        }
        return dead;
    }
  • rotate方法会触发expire

SpoutExecutor.RotatingMap.ExpiredCallback

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

代码语言:javascript
复制
    public void init(final ArrayList<Task> idToTask, int idToTaskBase) {
        this.threadId = Thread.currentThread().getId();
        executorTransfer.initLocalRecvQueues();
        while (!stormActive.get()) {
            Utils.sleep(100);
        }

        LOG.info("Opening spout {}:{}", componentId, taskIds);
        this.idToTask = idToTask;
        this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
        this.spouts = new ArrayList<>();
        for (Task task : idToTask) {
            if (task != null) {
                this.spouts.add((ISpout) task.getTaskObject());
            }
        }
        this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() {
            @Override
            public void expire(Long key, TupleInfo tupleInfo) {
                Long timeDelta = null;
                if (tupleInfo.getTimestamp() != 0) {
                    timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
                }
                failSpoutMsg(SpoutExecutor.this, idToTask.get(tupleInfo.getTaskId() - idToTaskBase), timeDelta, tupleInfo, "TIMEOUT");
            }
        });
        //......
    }
  • SpoutExecutor在init的时候注册了pending的RotatingMap.ExpiredCallback,里头对过期的tuple调用failSpoutMsg

小结

  • spout的messageTimeout相关的参数为Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS(topology.enable.message.timeouts默认true)、Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS(topology.message.timeout.secs默认30)
  • StormCommon在addAcker的时候取Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS作为Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS值,作为tickTimeSecs,即ack的tickTuple的调度时间间隔
  • SpoutExecutor在接收到该tickTuple的时候,触发RotatingMap的rotate操作,进行expire回调,而SpoutExecutor在init的时候,注册了RotatingMap.ExpiredCallback,对过期的tuple进行failSpoutMsg操作,回调spout的fail方法,至此完成messageTimeout的功能(对于没有开启可靠(msgId)消息的spout,其pending队列为空,因而这里的rotate以及ExpiredCallback就相当于没有效果)

doc

  • 聊聊storm的ack机制
  • 聊聊storm的tickTuple
  • Guaranteeing Message Processing
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-10-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TOPOLOGY_MESSAGE_TIMEOUT_SECS
  • StormCommon.addAcker
  • Executor.setupTicks
  • SpoutExecutor.tupleActionFn
  • RotatingMap.rotate
  • SpoutExecutor.RotatingMap.ExpiredCallback
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档