storm消息机制

这章讨论Storm's reliability capabilities, 如何保证从spout emit出来的所有tuple都被正确的执行(fully processed)?

What does it mean for a message to be "fully processed"?

首先的问题是, 什么叫tuple或message被fully processed? 因为tuple被emit出去后, 可能会被多级bolt处理, 并且bolt也有可能由该tuple生成多组tuples, 所以情况还是比较复杂的      最终由一个tuple trigger(触发)的所有tuples会形成一个树或DAG(有向无环图)

只有当tuple tree上的所有节点都被成功处理的时候, storm才认为该tuple被fully processed      如果tuple tree上任一节点失败或者超时, 都被看作该tuple fail, 失败的tuple会被重发       Storm considers a tuple coming off a spout "fully processed" when the tuple tree has been exhausted and every message in the tree has been processed.      A tuple is considered failed when its tree of messages fails to be fully processed within a specified timeout.      This timeout can be configured on a topology-specific basis using the Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS configuration and defaults to 30 seconds.

What happens if a message is fully processed or fails to be fully processed?

该机制是如何实现的?      首先, 所有tuple都有一个唯一标识msgId, 当tuple被emit的时候确定

_collector.emit(new Values("field1", "field2", 3) , msgId);

其次, 看看下面的ISpout接口, 除了获取tuple的nextTuple     还有ack和fail, 当Storm detect到tuple被fully processed, 会调用ack, 如果超时或detect fail, 则调用fail     此处需要注意的是, tuple只有在被产生的那个spout task上可以被ack或fail, 具体原因看后面的实现解释就理解了

a tuple will be acked or failed by the exact same Spout task that created it. So if a Spout is executing as many tasks across the cluster, a tuple won't be acked or failed by a different task than the one that created it.

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

最后, 在spout怎么实现的, 其实比较简单.     对于Spout queue, get message只是open而不是pop, 并且把tuple状态改为pending, 防止该tuple被多次发送.     一直等到该tuple被ack, 才真正的pop该tuple, 当然该tuple如果fail, 就重新把状态改回初始状态     这也解释, 为什么tuple只能在被emit的spout task被ack或fail, 因为只有这个task的queue里面有该tuple

When KestrelSpout takes a message off the Kestrel queue, it "opens" the message.      This means the message is not actually taken off the queue yet, but instead placed in a "pending" state waiting for acknowledgement that the message is completed.      While in the pending state, a message will not be sent to other consumers of the queue. Additionally, if a client disconnects all pending messages for that client are put back on the queue. 

What is Storm's reliability API?

前面一直没有说明的一个问题是, storm本身通过什么机制来判断tuple是否成功被fully processed?

要解决这个问题, 可以分为两个问题,     1. 如何知道tuple tree的结构?     2. 如何知道tuple tree上每个节点的运行情况, success或fail?

答案很简单, 你必须告诉它, 如何告诉它?     1. 对于tuple tree的结构, 需要知道每个tuple由哪些tuple产生, 即tree节点间的link        tree节点间的link称为anchoring. 当每次emit新tuple的时候, 必须显式的通过API建立anchoring

Specifying a link in the tuple tree is called anchoring. Anchoring is done at the same time you emit a new tuple.     Each word tuple is anchored by specifying the input tuple as the first argument to emit.

看下面的代码例子, 

_collector.emit(tuple, new Values(word)); 

emit的第一个参数是tuple, 这就是用于建anchoring     当然你也可以直接调用unanchoring的emit版本, 如果不需要保证reliable的话, 这样效率会比较高

_collector.emit(new Values(word));

同时前面说了, 可能一个tuple依赖于多个输入,

An output tuple can be anchored to more than one input tuple.     This is useful when doing streaming joins or aggregations. A multi-anchored tuple failing to be processed will cause multiple tuples to be replayed from the spouts.

List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

对于Multi-anchoring的情况会导致tuple tree变为tuple DGA, 当前storm的版本已经可以很好的支持DAG Multi-anchoring adds the output tuple into multiple tuple trees.    Note that it's also possible for multi-anchoring to break the tree structure and create tuple DAGs,

2. 对于tuple tree上每个节点的运行情况, 你需要在每个bolt的逻辑处理完后, 显式的调用OutputCollector的ack和fail来汇报  

This is done by using the ack and fail methods on the OutputCollector.       You can use the fail method on the OutputCollector to immediately fail the spout tuple at the root of the tuple tree.

看下面的例子, 在execute函数的最后会调用,      _collector.ack(tuple); 

我比较迷惑, 为啥ack是OutputCollector的function, 而不是tuple的function?     而且就算ack也是应该对bolt的input进行ack, 为啥是output, 可能因为所有input都是其他bolt的output产生...这个设计的比较不合理

public class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;
        
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

storm为了保证reliable, 必然是要牺牲效率的, 此处storm会在task memory里面去记录你汇报的tuple tree的结构和运行情况.      而只有当某tuple节点被ack或fail后才会被从内存中删除, 所以如果你总是不去ack或fail, 那么会导致task的out of memory

Every tuple you process must be acked or failed. Storm uses memory to track each tuple, so if you don't ack/fail every tuple, the task will eventually run out of memory.

简单的版本, BasicBolt

上面的机制, 会给程序员造成负担, 尤其对于很多简单的case, 比如filter, 每次都要去显式的建立anchoring和ack…

所以storm提供简单的版本, 会自动的建立anchoring, 并在bolt执行完自动调用ack

A lot of bolts follow a common pattern of reading an input tuple, emitting tuples based on it, and then acking the tuple at the end of the execute method. These bolts fall into the categories of filters and simple functions. Storm has an interface called BasicBolt that encapsulates this pattern for you.

public class SplitSentence extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

How do I make my applications work correctly given that tuples can be replayed?

问题是如何保证"fully fault-tolerant exactly-once messaging semantics”, 因为replay会导致一个message在bolt上多次出现, 这样对类似计数这样的应用会有很大影响.     从Storm0.7开始, 给出的transactional topologies功能就比较好的解决这个问题

As always in software design, the answer is "it depends." Storm 0.7.0 introduced the "transactional topologies" feature, which enables you to get fully fault-tolerant exactly-once messaging semantics for most computations. Read more about transactional topologies here

How does Storm implement reliability in an efficient way?

现在讨论的是Storm如何实现reliablility机制, Storm实现一组特殊的'acker’ task来track每一个spout tuple, 同时acker task的个数你可以根据tuple的数量来配置

A Storm topology has a set of special "acker" tasks that track the DAG of tuples for every spout tuple.    When an acker sees that a DAG is complete, it sends a message to the spout task that created the spout tuple to ack the message.    You can set the number of acker tasks for a topology in the topology configuration using Config.TOPOLOGY_ACKERS. Storm defaults TOPOLOGY_ACKERS to one task -- you will need to increase this number for topologies processing large amounts of messages. 

所有被产生的tuple都会有一个随机的64bit的id用于被track     tuple之间通过emit时的anchor形成tuple tree, 并且每个tuple都知道产生它的spout tuple的id (通过不断的copy传递)

当任何tuple被acked的时候, 都会send message到相应的acker, 具体例子如下图

When a tuple is created in a topology, whether in a spout or a bolt, it is given a random 64 bit id. These ids are used by ackers to track the tuple DAG for every spout tuple.

Every tuple knows the ids of all the spout tuples for which it exists in their tuple trees. When you emit a new tuple in a bolt, the spout tuple ids from the tuple's anchors are copied into the new tuple. When a tuple is acked, it sends a message to the appropriate acker tasks with information about how the tuple tree changed. In particular it tells the acker "I am now completed within the tree for this spout tuple, and here are the new tuples in the tree that were anchored to me".

For example, if tuples "D" and "E" were created based on tuple "C", here's how the tuple tree changes when "C" is acked:

当然storm具体怎样通过acker task来track所有的tuples, 还需要解决下面几个问题:

1. 当有多个acker的时候, 当一个tuple被acked的时候, 如果知道给哪一个acker发送message?     因为每个tuple都知道产生它的spout tuple id, 所以使用mod hash(hash方法, m mod n)来分配spout tuple id, 以保证一个spout tuple id所产生的所有tuple tree都会被分配到一个acker上     当某一个tuple被acked的时候, 只要通过hash找到相应的acker即可

You can have an arbitrary number of acker tasks in a topology. This leads to the following question: when a tuple is acked in the topology, how does it know to which acker task to send that information? Storm uses mod hashing to map a spout tuple id to an acker task. Since every tuple carries with it the spout tuple ids of all the trees they exist within, they know which acker tasks to communicate with. 

2. 如果有多个spout task的时候, storm在最终ack spout tuple的时候, 如何知道对应于哪个spout task, 因为必须在产生tuple的那个spout task进行ack?     答案很简单, spout task在emit一个新的tuple的时候, 会发message告诉相应的acker它的task id, 所以acker是知道tupleid和taskid的map的

How the acker tasks track which spout tasks are responsible for each spout tuple they're tracking?

When a spout task emits a new tuple, it simply sends a message to the appropriate acker telling it that its task id is responsible for that spout tuple. Then when an acker sees a tree has been completed, it knows to which task id to send the completion message.

3. 如果Acker在内存里面显式的监控所有的tuple tree, 会有扩展问题, 当面对海量tuple或复杂workflow的时候, 很有可能会爆内存, 怎么解决这个问题?     Storm这里采用了一个特别的方法, 这个是storm的主要的突破之一, 该方法的好处就是对于每个spout tuple, 所需要的内存是固定的无论多复杂, 并且只有about 20 bytes     Acker只需要为每个spout tuple存储spout tuple id, task id, ack val     这个ack val, 64 bit number, 用于表示整个tuple tree的状况, 产生方法是tuple tree中所有created和acked的tuple的id进行异或(同为0, 异为1)     当ack val值为0的时候, 即表示tuple tree被完成

这个思路非常巧妙, 两个相同的数去异或为0, 而created和acked时, 会进行两次异或, 所以所有created的tuple都被acked时, 异或值最终为0     我考虑到不同的tupleid之间的位有重叠时, 是否会有干扰, 简单的试一下, 没有干扰

具体acker工作原理参考, Twitter Storm源代码分析之acker工作流程

Acker tasks do not track the tree of tuples explicitly. For large tuple trees with tens of thousands of nodes (or more), tracking all the tuple trees could overwhelm the memory used by the ackers. Instead, the ackers take a different strategy that only requires a fixed amount of space per spout tuple (about 20 bytes). This tracking algorithm is the key to how Storm works and is one of its major breakthroughs. An acker task stores a map from a spout tuple id to a pair of values. The first value is the task id that created the spout tuple which is used later on to send completion messages. The second value is a 64 bit number called the "ack val". The ack val is a representation of the state of the entire tuple tree, no matter how big or how small. It is simply the xor of all tuple ids that have been created and/or acked in the tree. When an acker task sees that an "ack val" has become 0, then it knows that the tuple tree is completed.

最后, 考虑task fail的情况,      一般task fail, 导致超时, spout会replay     Acker task fail, 会导致它跟踪的所有tuple无法被ack, 所以会全部超时被spout重发     Spout task fail, 如果spout本身fail, 那么需要源头来负责replay, 比如RabbitMQ或Kafka

Now that you understand the reliability algorithm, let's go over all the failure cases and see how in each case Storm avoids data loss:

  • Task dies: In this case the spout tuple ids at the root of the trees for the failed tuple will time out and be replayed. 
  • Acker task dies: In this case all the spout tuples the acker was tracking will time out and be replayed. 
  • Spout task dies: In this case the source that the spout talks to is responsible for replaying the messages. For example, queues like Kestrel and RabbitMQ will place all pending messages back on the queue when a client disconnects. 

As you have seen, Storm's reliability mechanisms are completely distributed, scalable, and fault-tolerant. 

Tuning reliability

当然reliability必然会给系统带来较大的overload, 比如number of messages就会翻倍, 由于和acker之间的通信     所以如果不需要reliability, 可以通过下面的方法将其关闭

Acker tasks are lightweight, so you don't need very many of them in a topology. You can track their performance through the Storm UI (component id "__acker"). If the throughput doesn't look right, you'll need to add more acker tasks. 

If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations -- then you can improve performance by not tracking the tuple tree for spout tuples. Not tracking a tuple tree halves the number of messages transferred since normally there's an ack message for every tuple in the tuple tree. Additionally, it requires less ids to be kept in each downstream tuple, reducing bandwidth usage.

There are three ways to remove reliability. 

1. The first is to set Config.TOPOLOGY_ACKERS to 0. In this case, Storm will call the ack method on the spout immediately after the spout emits a tuple. The tuple tree won't be tracked.

2. The second way is to omit a message id in the SpoutOutputCollector.emit method.

3. Finally, emit them as unanchored tuples

【淘宝讲解】

  • Storm记录级容错的基本原理

首先来看一下什么叫做记录级容错?storm允许用户在spout中发射一个新的源tuple时为其指定一个message id, 这个message id可以是任意的object对象。多个源tuple可以共用一个message id,表示这多个源 tuple对用户来说是同一个消息单元。storm中记录级容错的意思是说,storm会告知用户每一个消息单元是否在指定时间内被完全处理了。那什么叫做完全处理呢,就是该message id绑定的源tuple及由该源tuple后续生成的tuple经过了topology中每一个应该到达的bolt的处理。举个例子。在图4-1中,在spout由message 1绑定的tuple1和tuple2经过了bolt1和bolt2的处理生成两个新的tuple,并最终都流向了bolt3。当这个过程完成处理完时,称message 1被完全处理了。

图4-1

在storm的topology中有一个系统级组件,叫做acker。这个acker的任务就是追踪从spout中流出来的每一个message id绑定的若干tuple的处理路径,如果在用户设置的最大超时时间内这些tuple没有被完全处理,那么acker就会告知spout该消息处理失败了,相反则会告知spout该消息处理成功了。在刚才的描述中,我们提到了”记录tuple的处理路径”,如果曾经尝试过这么做的同学可以仔细地思考一下这件事的复杂程度。但是storm中却是使用了一种非常巧妙的方法做到了。在说明这个方法之前,我们来复习一个数学定理。

A xor A = 0.

A xor B…xor B xor A = 0,其中每一个操作数出现且仅出现两次。

storm中使用的巧妙方法就是基于这个定理。具体过程是这样的:在spout中系统会为用户指定的message id生成一个对应的64位整数,作为一个root id。root id会传递给acker及后续的bolt作为该消息单元的唯一标识。同时无论是spout还是bolt每次新生成一个tuple的时候,都会赋予该tuple一个64位的整数的id。Spout发射完某个message id对应的源tuple之后,会告知acker自己发射的root id及生成的那些源tuple的id。而bolt呢,每次接受到一个输入tuple处理完之后,也会告知acker自己处理的输入tuple的id及新生成的那些tuple的id。Acker只需要对这些id做一个简单的异或运算,就能判断出该root id对应的消息单元是否处理完成了。下面通过一个图示来说明这个过程。

图4-1 spout中绑定message 1生成了两个源tuple,id分别是0010和1011.

图4-2 bolt1处理tuple 0010时生成了一个新的tuple,id为0110.

图4-3 bolt2处理tuple 1011时生成了一个新的tuple,id为0111.

图4-4 bolt3中接收到tuple 0110和tuple 0111,没有生成新的tuple.

可能有些细心的同学会发现,容错过程存在一个可能出错的地方,那就是,如果生成的tuple id并不是完全各异的,acker可能会在消息单元完全处理完成之前就错误的计算为0。这个错误在理论上的确是存在的,但是在实际中其概率是极低极低的,完全可以忽略。

【acker数量】

针对源码进行分析。

下面再看一篇文章的讲解消息机制

一、简介

storm可以确保spout发送出来的每个消息都会被完整的处理。本章将会描述storm体系是如何达到这个目标的,并将会详述开发者应该如何使用storm的这些机制来实现数据的可靠处理。

二、理解消息被完整处理

一个消息(tuple)从spout发送出来,可能会导致成百上千的消息基于此消息被创建。

我们来思考一下流式的“单词统计”的例子:

storm任务从数据源(Kestrel queue)每次读取一个完整的英文句子;将这个句子分解为独立的单词,最后,实时的输出每个单词以及它出现过的次数。

本例中,每个从spout发送出来的消息(每个英文句子)都会触发很多的消息被创建,那些从句子中分隔出来的单词就是被创建出来的新消息。

这些消息构成一个树状结构,我们称之为“tuple tree”,看起来如图1所示:

图1 示例tuple tree

在什么条件下,Storm才会认为一个从spout发送出来的消息被完整处理呢?答案就是下面的条件同时被满足:

  • tuple tree不再生长
  • 树中的任何消息被标识为“已处理”

如果在指定的时间内,一个消息衍生出来的tuple tree未被完全处理成功,则认为此消息未被完整处理。这个超时值可以通过任务级参数Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 进行配置,默认超时值为30秒。

三、消息的生命周期

如果消息被完整处理或者未被完整处理,Storm会如何进行接下来的操作呢?为了弄清这个问题,我们来研究一下从spout发出来的消息的生命周期。这里列出了spout应该实现的接口:

首先, Storm使用spout实例的nextTuple()方法从spout请求一个消息(tuple)。 收到请求以后,spout使用open方法中提供的SpoutOutputCollector向它的输出流发送一个或多个消息。每发送一个消息,Spout会给这个消息提供一个message ID,它将会被用来标识这个消息。

假设我们从kestrel队列中读取消息,Spout会将kestrel 队列为这个消息设置的ID作为此消息的message ID。 向SpoutOutputCollector中发送消息格式如下:

接来下,这些消息会被发送到后续业务处理的bolts, 并且Storm会跟踪由此消息产生出来的新消息。当检测到一个消息衍生出来的tuple tree被完整处理后,Storm会调用Spout中的ack方法,并将此消息的messageID作为参数传入。同理,如果某消息处理超时,则此消息对应的Spout的fail方法会被调用,调用时此消息的messageID会被作为参数传入。

注意:一个消息只会由发送它的那个spout任务来调用ack或fail。如果系统中某个spout由多个任务运行,消息也只会由创建它的spout任务来应答(ack或fail),决不会由其他的spout任务来应答。

我们继续使用从kestrel队列中读取消息的例子来阐述高可靠性下spout需要做些什么(假设这个spout的名字是KestrelSpout)。

我们先简述一下kestrel消息队列:

当KestrelSpout从kestrel队列中读取一个消息,表示它“打开”了队列中某个消息。这意味着,此消息并未从队列中真正的删除,而是将此消息设置为“pending”状态,它等待来自客户端的应答,被应答以后,此消息才会被真正的从队列中删除。处于“pending”状态的消息不会被其他的客户端看到。另外,如果一个客户端意外的断开连接,则由此客户端“打开”的所有消息都会被重新加入到队列中。当消息被“打开”的时候,kestrel队列同时会为这个消息提供一个唯一的标识。

KestrelSpout就是使用这个唯一的标识作为这个tuple的messageID的。稍后当ack或fail被调用的时候,KestrelSpout会把ack或者fail连同messageID一起发送给kestrel队列,kestrel会将消息从队列中真正删除或者将它重新放回队列中。

四、可靠相关的API

为了使用Storm提供的可靠处理特性,我们需要做两件事情:

  • 无论何时在tuple tree中创建了一个新的节点,我们需要明确的通知Storm;
  • 当处理完一个单独的消息时,我们需要告诉Storm 这棵tuple tree的变化状态。

通过上面的两步,storm就可以检测到一个tuple tree何时被完全处理了,并且会调用相关的ack或fail方法。Storm提供了简单明了的方法来完成上述两步。

为tuple tree中指定的节点增加一个新的节点,我们称之为锚定(anchoring)。锚定是在我们发送消息的同时进行的。为了更容易说明问题,我们使用下面代码作为例子。本示例的bolt将包含整句话的消息分解为一系列的子消息,每个子消息包含一个单词。

每个消息都通过这种方式被锚定:把输入消息作为emit方法的第一个参数。因为word消息被锚定在了输入消息上,这个输入消息是spout发送过来的tuple tree的根节点,如果任意一个word消息处理失败,派生这个tuple tree那个spout 消息将会被重新发送。

与此相反,我们来看看使用下面的方式emit消息时,Storm会如何处理:

如果以这种方式发送消息,将会导致这个消息不会被锚定。如果此tuple tree中的消息处理失败,派生此tuple tree的根消息不会被重新发送。根据任务的容错级别,有时候很适合发送一个非锚定的消息。

一个输出消息可以被锚定在一个或者多个输入消息上,这在做join或聚合的时候是很有用的。一个被多重锚定的消息处理失败,会导致与之关联的多个spout消息被重新发送。多重锚定通过在emit方法中指定多个输入消息来实现:

多重锚定会将被锚定的消息加到多棵tuple tree上。

注意:多重绑定可能会破坏传统的树形结构,从而构成一个DAGs(有向无环图),如图2所示:

图2 多重锚定构成的钻石型结构

Storm的实现可以像处理树那样来处理DAGs。

锚定表明了如何将一个消息加入到指定的tuple tree中,高可靠处理API的接下来部分将向您描述当处理完tuple tree中一个单独的消息时我们该做些什么。这些是通过OutputCollector 的ack和fail方法来实现的。回头看一下例子SplitSentence,可以发现当所有的word消息被发送完成后,输入的表示句子的消息会被应答(acked)。

每个被处理的消息必须表明成功或失败(acked 或者failed)。Storm是使用内存来跟踪每个消息的处理情况的,如果被处理的消息没有应答的话,迟早内存会被耗尽!

很多bolt遵循特定的处理流程: 读取一个消息、发送它派生出来的子消息、在execute结尾处应答此消息。一般的过滤器(filter)或者是简单的处理功能都是这类的应用。Storm有一个BasicBolt接口封装了上述的流程。示例SplitSentence可以使用BasicBolt来重写:

使用这种方式,代码比之前稍微简单了一些,但是实现的功能是一样的。发送到BasicOutputCollector的消息会被自动的锚定到输入消息,并且,当execute执行完毕的时候,会自动的应答输入消息。

很多情况下,一个消息需要延迟应答,例如聚合或者是join。只有根据一组输入消息得到一个结果之后,才会应答之前所有的输入消息。并且聚合和join大部分时候对输出消息都是多重锚定。然而,这些特性不是IBasicBolt所能处理的。

五、高效的实现tuple tree

Storm 系统中有一组叫做“acker”的特殊的任务,它们负责跟踪DAG(有向无环图)中的每个消息。每当发现一个DAG被完全处理,它就向创建这个根消息的spout任务发送一个信号。拓扑中acker任务的并行度可以通过配置参数Config.TOPOLOGY_ACKERS来设置。默认的acker任务并行度为1,当系统中有大量的消息时,应该适当提高acker任务的并发度。

为了理解Storm可靠性处理机制,我们从研究一个消息的生命周期和tuple tree的管理入手。当一个消息被创建的时候(无论是在spout还是bolt中),系统都为该消息分配一个64bit的随机值作为id。这些随机的id是acker用来跟踪由spout消息派生出来的tuple tree的。

每个消息都知道它所在的tuple tree对应的根消息的id。每当bolt新生成一个消息,对应tuple tree中的根消息的messageId就拷贝到这个消息中。当这个消息被应答的时候,它就把关于tuple tree变化的信息发送给跟踪这棵树的acker。例如,他会告诉acker:本消息已经处理完毕,但是我派生出了一些新的消息,帮忙跟踪一下吧。

举个例子,假设消息D和E是由消息C派生出来的,这里演示了消息C被应答时,tuple tree是如何变化的。

因为在C被从树中移除的同时D和E会被加入到tuple tree中,因此tuple tree不会被过早的认为已完全处理。

关于Storm如何跟踪tuple tree,我们再深入的探讨一下。前面说过系统中可以有任意个数的acker,那么,每当一个消息被创建或应答的时候,它怎么知道应该通知哪个acker呢?

系统使用一种哈希算法来根据spout消息的messageId确定由哪个acker跟踪此消息派生出来的tuple tree。因为每个消息都知道与之对应的根消息的messageId,因此它知道应该与哪个acker通信。

当spout发送一个消息的时候,它就通知对应的acker一个新的根消息产生了,这时acker就会创建一个新的tuple tree。当acker发现这棵树被完全处理之后,他就会通知对应的spout任务。

tuple是如何被跟踪的呢?系统中有成千上万的消息,如果为每个spout发送的消息都构建一棵树的话,很快内存就会耗尽。所以,必须采用不同的策略来跟踪每个消息。由于使用了新的跟踪算法,Storm只需要固定的内存(大约20字节)就可以跟踪一棵树。这个算法是storm正确运行的核心,也是storm最大的突破。

acker任务保存了spout消息id到一对值的映射。第一个值就是spout的任务id,通过这个id,acker就知道消息处理完成时该通知哪个spout任务。第二个值是一个64bit的数字,我们称之为“ack val”, 它是树中所有消息的随机id的异或结果。ack val表示了整棵树的的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和被应答的时候都会有相同的消息id发送过来做异或。

每当acker发现一棵树的ack val值为0的时候,它就知道这棵树已经被完全处理了。因为消息的随机ID是一个64bit的值,因此ack val在树处理完之前被置为0的概率非常小。假设你每秒钟发送一万个消息,从概率上说,至少需要50,000,000年才会有机会发生一次错误。即使如此,也只有在这个消息确实处理失败的情况下才会有数据的丢失!

六、选择合适的可靠性级别

Acker任务是轻量级的,所以在拓扑中并不需要太多的acker存在。可以通过Storm UI来观察acker任务的吞吐量,如果看上去吞吐量不够的话,说明需要添加额外的acker。

如果你并不要求每个消息必须被处理(你允许在处理过程中丢失一些信息),那么可以关闭消息的可靠处理机制,从而可以获取较好的性能。关闭消息的可靠处理机制意味着系统中的消息数会减半(每个消息不需要应答了)。另外,关闭消息的可靠处理可以减少消息的大小(不需要每个tuple记录它的根id了),从而节省带宽。

有三种方法可以关系消息的可靠处理机制:

  • 将参数Config.TOPOLOGY_ACKERS设置为0,通过此方法,当Spout发送一个消息的时候,它的ack方法将立刻被调用;
  • 第二个方法是Spout发送一个消息时,不指定此消息的messageID。当需要关闭特定消息可靠性的时候,可以使用此方法;
  • 最后,如果你不在意某个消息派生出来的子孙消息的可靠性,则此消息派生出来的子消息在发送时不要做锚定,即在emit方法中不指定输入消息。因为这些子孙消息没有被锚定在任何tuple tree中,因此他们的失败不会引起任何spout重新发送消息。

七、集群的各级容错

到现在为止,大家已经理解了Storm的可靠性机制,并且知道了如何选择不同的可靠性级别来满足需求。接下来我们研究一下Storm如何保证在各种情况下确保数据不丢失。

1、任务级失败

因为bolt任务crash引起的消息未被应答。此时,acker中所有与此bolt任务关联的消息都会因为超时而失败,对应spout的fail方法将被调用。

  • acker任务失败。如果acker任务本身失败了,它在失败之前持有的所有消息都将会因为超时而失败。Spout的fail方法将被调用。
  • Spout任务失败。这种情况下,Spout任务对接的外部设备(如MQ)负责消息的完整性。例如当客户端异常的情况下,kestrel队列会将处于pending状态的所有的消息重新放回到队列中。

2、任务槽(slot) 故障

  • worker失败。每个worker中包含数个bolt(或spout)任务。supervisor负责监控这些任务,当worker失败后,supervisor会尝试在本机重启它。
  • supervisor失败。supervisor是无状态的,因此supervisor的失败不会影响当前正在运行的任务,只要及时的将它重新启动即可。supervisor不是自举的,需要外部监控来及时重启。
  • nimbus失败。nimbus是无状态的,因此nimbus的失败不会影响当前正在运行的任务(nimbus失败时,无法提交新的任务),只要及时的将它重新启动即可。nimbus不是自举的,需要外部监控来及时重启。

3.、集群节点(机器)故障

  • storm集群中的节点故障。此时nimbus会将此机器上所有正在运行的任务转移到其他可用的机器上运行。
  • zookeeper集群中的节点故障。zookeeper保证少于半数的机器宕机仍可正常运行,及时修复故障机器即可。

八、小结

本章介绍了storm集群如何实现数据的可靠处理。借助于创新性的tuple tree跟踪技术,storm高效的通过数据的应答机制来保证数据不丢失。

storm集群中除nimbus外,没有单点存在,任何节点都可以出故障而保证数据不会丢失。nimbus被设计为无状态的,只要可以及时重启,就不会影响正在运行的任务。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏喔家ArchiSelf

消息队列在RTOS的应用

传说互联网应用有两大利器,一个是缓存,另一个就是消息队列。 一直相对消息队列做一下梳理,希望早日另有成文。 一叶知秋,实际上消息队列在嵌入式系统中同样有着广泛的...

703
来自专栏王小雷

Hive基础(1)

Hive基础(1) Hive的HQL(2) 1. Hive并不是分布式的,它独立于机器之外,类似于Hadoop的客户端。 2. 元数据和数据的区别,前者如表名、...

1987
来自专栏木可大大

漫谈文件系统

翻译成中文大致意思:文件系统主要是管理数据存储以及数据如何检索的,而数据存储在磁盘或内存中。上期我们聊过了漫谈虚拟内存,本期我们就重点介绍磁盘中的机械磁盘的组成...

49812

浅析Kafka的消费者和消费进度的案例研究

本文主要讨论Kafka组件中的消费者和其消费进度。我们将通过一个使用Scala语言实现的原型系统来学习。本文假设你知道Kafka的基本术语。

4610
来自专栏美图数据技术团队

Spark Streaming VS Flink

本文从编程模型、任务调度、时间机制、Kafka 动态分区的感知、容错及处理语义、背压等几个方面对比 Spark Stream 与 Flink,希望对有实时处理...

711
来自专栏北京马哥教育

Redis 性能调优相关笔记

info 可以使用info [类别]输出指定类别内容 info命令输出的数据可分为10个类别,分别是: server clients # Clients co...

33710
来自专栏java达人

Kafka漫游记

我是一条消息,从我被生产者发布到topic的时候,我就清楚自己的使命:被消费者获取消费。但我一直很纳闷,把我直接推送给消费者不就行了,为什么一定要先推送到类似队...

2147
来自专栏公有云大数据平台弹性MapReduce

EMR之HBASE集群参数调优与压测

HBase 是Hadoop生态里重要一员。对HBase的调优,对节约成本,提升用户体验有重要意义。

1814
来自专栏架构师小秘圈

HBase极简教程

HBase 系统架构 HBase是Apache Hadoop的数据库,能够对大型数据提供随机、实时的读写访问。HBase的目标是存储并处理大型的数据。HBase...

4306
来自专栏郝阳的专栏

主从同步中的关键技术解析

主从同步的整体思路不外乎“数据镜像(image) + 流水(binlog)”,但是仔细考虑,会有一些值得思考的细节问题,看看你是否考虑过?

62011

扫码关注云+社区