Storm的作者是Nathan Marz,Nathan Marz在BackType公司工作的时候有了Storm的点子并独自一人实现了Storm。在2011年Twitter准备收购BackType之际,Nathan Marz为了提高Twitter对BackType的估值,在一篇博客里向外界介绍了Storm。Twitter对这项技术非常感兴趣,因此在Twitter收购BackType的时候Storm发挥了重大作用。后来Nathan Marz开源Storm时,也借着Twitter的品牌影响力而让Storm名声大震!
Storm的特点之一是可靠的消息处理机制,这个机制中最重要的一环是设计一个算法来跟踪Storm中处理的数据,确保Storm知道消息是否被完整的处理。他创造出的这个算法,极大的简化了系统的设计。Nathan Marz说这算法是他职业生涯中开发的最出色的算法之一,也说明了受过良好的计算机科学的教育是非常重要的。有趣的是发明这个算法的那天,正好是他和不久前遇到的一个姑娘约会的日子。当天因为发明了这个算法而非常兴奋,导致他心思一直在这个算法上,毫无疑问就搞砸了和这个姑娘的约会!
Storm官方网站有段简介
Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数据流,像Hadoop批量处理大数据一样,Storm可以实时处理数据。Storm简单,可以使用任何编程语言。
在Storm之前,进行实时处理是非常痛苦的事情: 需要维护一堆消息队列和消费者,他们构成了非常复杂的图结构。消费者进程从队列里取消息,处理完成后,去更新数据库,或者给其他队列发新消息。
这样进行实时处理是非常痛苦的。我们主要的时间都花在关注往哪里发消息,从哪里接收消息,消息如何序列化,真正的业务逻辑只占了源代码的一小部分。一个应用程序的逻辑运行在很多worker上,但这些worker需要各自单独部署,还需要部署消息队列。最大问题是系统很脆弱,而且不是容错的:需要自己保证消息队列和worker进程工作正常。
Storm完整地解决了这些问题。它是为分布式场景而生的,抽象了消息传递,会自动地在集群机器上并发地处理流式计算,让你专注于实时处理的业务逻辑。
Storm有如下特点:
不过Storm不是一个完整的解决方案。使用Storm时你需要关注以下几点:
跟Hadoop不一样,Storm是没有包括任何存储概念的计算系统。这就让Storm可以用在多种不同的场景下:非传统场景下数据动态到达或者数据存储在数据库这样的存储系统里(或数据是被实时操控其他设备的控制器(如交易系统)所消费)
Storm有很多应用:实时分析,在线机器学习(online machine learning),连续计算(continuous computation),分布式远程过程调用(RPC)、ETL等。Storm处理速度很快:每个节点每秒钟可以处理超过百万的数据组。它是可扩展(scalable),容错(fault-tolerant),保证你的数据会被处理,并且很容易搭建和操作。
例如Nathan Marz提供的例子,产生Twitter的趋势信息。Twitter从海量推文中抽取趋势信息,并在本地区域和国家层级进行维护。这意味者一旦一个案例开始出现,Twitter的话题趋势算法就能实时的鉴别出这个话题。这个实时的算法就是通过在Storm上连续分析Twitter数据来实现的。
下表列出了一组开源的大数据解决方案,包括传统的批处理和流式处理的应用程序。
解决方案 | 开发者 | 类型 | 描述 |
---|---|---|---|
Storm | 流式处理 | Twitter的流式处理大数据分析方案 | |
S4 | Yahoo! | 流式处理 | Yahoo!的分布式流式计算平台 |
Hadoop | Apache | 批处理 | MapReduce范式的第一个开源实现 |
Spark | UC Berkeley AMPLab | 批处理 | 支持内存数据集和弹性恢复的分析平台 |
Yahoo! S4和Storm之间的关键差别是Storm在故障的情况下可以保证消息的处理,而S4可能会丢消息。
Hadoop无疑是大数据分析的王者,本质上是一个批量处理系统,它专注于大数据的批量处理。数据存储在Hadoop 文件系统里(HDFS)并在处理的时候分发到集群中的各个节点。当处理完成,产出的数据放回到HDFS上。在Storm上构建的拓扑处理的是持续不断的流式数据。不同于Hadoop的任务,这些处理过程不会终止,会持续处理到达的数据。
Hadoop处理的是静态的数据,而Storm处理的是动态的、连续的数据。Twitter的用户每天都会发上千万的推,所以这种处理技术是非常有用的。Storm不仅仅是一个传统的大数据分析系统:它是一个复杂事件(complex event-processing)处理系统的例子。复杂事件处理系统通常是面向检测和计算的,这两部分都可以通过用户定义的算法在Storm中实现。例如,复杂事件处理可以用来从大量的事件中区分出有意义的事件,然后对这些事件实时处理。
Storm实现了一个数据流(data flow)的模型,在这个模型中数据持续不断地流经一个由很多转换实体构成的网络。一个数据流的抽象叫做流(stream),流是无限的元组(Tuple)序列。元组就像一个可以表示标准数据类型(例如int,float和byte数组)和用户自定义类型(需要额外序列化代码的)的数据结构。每个流由一个唯一的ID来标示的,这个ID可以用来构建拓扑中各个组件的数据源。
如下图所示,其中的水龙头代表了数据流的来源,一旦水龙头打开,数据就会源源不断地流经Bolt而被处理。图中有三个流,用不同的颜色来表示,每个数据流中流动的是元组(Tuple),它承载了具体的数据。元组通过流经不同的转换实体而被处理。
Storm对数据输入的来源和输出数据的去向没有做任何限制。像Hadoop,是需要把数据放到自己的文件系统HDFS里的。在Storm里,可以使用任意来源的数据输入和任意的数据输出,只要你实现对应的代码来获取/写入这些数据就可以。典型场景下,输入/输出数据来是基于类似Kafka或者ActiveMQ这样的消息队列,但是数据库,文件系统或者web服务也都是可以的。
Storm中涉及的主要概念有:
可以看到Storm中各个概念的名字起的非常好,也很形象。
一个Storm拓扑打包了一个实时处理程序的逻辑。一个Storm拓扑跟一个MapReduce的任务(job)是类似的。主要区别是MapReduce任务最终会结束,而拓扑会一直运行(当然直到你杀死它)。一个拓扑是一个通过流分组(stream grouping)把Spout和Bolt连接到一起的拓扑结构。图的每条边代表一个Bolt订阅了其他Spout或者Bolt的输出流。一个拓扑就是一个复杂的多阶段的流计算。
元组是Storm提供的一个轻量级的数据格式,可以用来包装你需要实际处理的数据。元组是一次消息传递的基本单元。一个元组是一个命名的值列表,其中的每个值都可以是任意类型的。元组是动态地进行类型转化的--字段的类型不需要事先声明。在Storm中编程时,就是在操作和转换由元组组成的流。通常,元组包含整数,字节,字符串,浮点数,布尔值和字节数组等类型。要想在元组中使用自定义类型,就需要实现自己的序列化方式。
流是Storm中的核心抽象。一个流由无限的元组序列组成,这些元组会被分布式并行地创建和处理。通过流中元组包含的字段名称来定义这个流。 每个流声明时都被赋予了一个ID。只有一个流的Spout和Bolt非常常见,所以OutputFieldsDeclarer提供了不需要指定ID来声明一个流的函数(Spout和Bolt都需要声明输出的流)。这种情况下,流的ID是默认的“default”。
Spout(喷嘴,这个名字很形象)是Storm中流的来源。通常Spout从外部数据源,如消息队列中读取元组数据并吐到拓扑里。Spout可以是可靠的(reliable)或者不可靠(unreliable)的。可靠的Spout能够在一个元组被Storm处理失败时重新进行处理,而非可靠的Spout只是吐数据到拓扑里,不关心处理成功还是失败了。
Spout可以一次给多个流吐数据。此时需要通过OutputFieldsDeclarer的declareStream函数来声明多个流并在调用SpoutOutputCollector提供的emit方法时指定元组吐给哪个流。
Spout中最主要的函数是nextTuple,Storm框架会不断调用它去做元组的轮询。如果没有新的元组过来,就直接返回,否则把新元组吐到拓扑里。nextTuple必须是非阻塞的,因为Storm在同一个线程里执行Spout的函数。
Spout中另外两个主要的函数是ack和fail。当Storm检测到一个从Spout吐出的元组在拓扑中成功处理完时调用ack,没有成功处理完时调用fail。只有可靠型的Spout会调用ack和fail函数。
在拓扑中所有的计算逻辑都是在Bolt中实现的。一个Bolt可以处理任意数量的输入流,产生任意数量新的输出流。Bolt可以做函数处理,过滤,流的合并,聚合,存储到数据库等操作。Bolt就是流水线上的一个处理单元,把数据的计算处理过程合理的拆分到多个Bolt、合理设置Bolt的task数量,能够提高Bolt的处理能力,提升流水线的并发度。
Bolt可以给多个流吐出元组数据。此时需要使用OutputFieldsDeclarer的declareStream方法来声明多个流并在使用[OutputColletor](https://storm.apache.org/javadoc/apidocs/backtype/storm/task/OutputCollector.html)的emit方法时指定给哪个流吐数据。
当你声明了一个Bolt的输入流,也就订阅了另外一个组件的某个特定的输出流。如果希望订阅另一个组件的所有流,需要单独挨个订阅。InputDeclarer有语法糖来订阅ID为默认值的流。例如declarer.shuffleGrouping("redBolt")订阅了redBolt组件上的默认流,跟declarer.shuffleGrouping("redBolt", DEFAULT_STREAM_ID)是相同的。
在Bolt中最主要的函数是execute函数,它使用一个新的元组当作输入。Bolt使用OutputCollector对象来吐出新的元组。Bolts必须为处理的每个元组调用OutputCollector的ack方法以便于Storm知道元组什么时候被各个Bolt处理完了(最终就可以确认Spout吐出的某个元组处理完了)。通常处理一个输入的元组时,会基于这个元组吐出零个或者多个元组,然后确认(ack)输入的元组处理完了,Storm提供了IBasicBolt接口来自动完成确认。
必须注意OutputCollector不是线程安全的,所以所有的吐数据(emit)、确认(ack)、通知失败(fail)必须发生在同一个线程里。更多信息可以参照问题定位。
每个Spout和Bolt会以多个任务(Task)的形式在集群上运行。每个任务对应一个执行线程,流分组定义了如何从一组任务(同一个Bolt)发送元组到另外一组任务(另外一个Bolt)上。可以在调用TopologyBuilder的setSpout和setBolt函数时设置每个Spout和Bolt的并发数。
组件(component)是对Bolt和Spout的统称
定义拓扑的时候,一部分工作是指定每个Bolt应该消费哪些流。流分组定义了一个流在一个消费它的Bolt内的多个任务(task)之间如何分组。流分组跟计算机网络中的路由功能是类似的,决定了每个元组在拓扑中的处理路线。
在Storm中有七个内置的流分组策略,你也可以通过实现CustomStreamGrouping接口来自定义一个流分组策略:
emitDirect
系列函数来吐元组给直连流。一个Bolt可以通过提供的TopologyContext
来获得消费者的任务ID,也可以通过OutputCollector对象的emit
函数(会返回元组被发送到的任务的ID)来跟踪消费者的任务ID。在ack的实现中,Spout有两个直连输入流,ack和ackFail,使用了这种直连分组的方式。TopologyBuilder
的setBolt
函数时会返回这个对象,它用来声明一个Bolt的输入流并指定流的分组方式。Storm保证了拓扑中Spout产生的每个元组都会被处理。Storm是通过跟踪每个Spout所产生的所有元组构成的树形结构并得知这棵树何时被完整地处理来达到可靠性。每个拓扑对这些树形结构都有一个关联的“消息超时”。如果在这个超时时间里Storm检测到Spout产生的一个元组没有被成功处理完,那Sput的这个元组就处理失败了,后续会重新处理一遍。
为了发挥Storm的可靠性,需要你在创建一个元组树中的一条边时告诉Storm,也需要在处理完每个元组之后告诉Storm。这些都是通过Bolt吐元组数据用的OutputCollector对象来完成的。标记是在emit函数里完成,完成一个元组后需要使用ack函数来告诉Storm。
拓扑以一个或多个Worker进程的方式运行。每个Worker进程是一个物理的Java虚拟机,执行拓扑的一部分任务。例如,如果拓扑的并发设置成了300,分配了50个Worker,那么每个Worker执行6个任务(作为Worker内部的线程)。Storm会尽量把所有的任务均分到所有的Worker上。
ZeroMQ 提供了可扩展环境下的传输层高效消息通信,一开始Storm的内部通信使用的是ZeroMQ,后来作者想把Storm移交给Apache开源基金会来管理,而ZeroMQ的许可证书跟Apache基金会的政策有冲突。在Storm中,Netty比ZeroMQ更加高效,而且提供了worker间通信时的验证机制,所以在Storm0.9中,就改用了Netty。
Clojure Storm系统的实现语言。Clojure是由Rich Hicky作为一种通用语言发明的,它衍生自Lisp语言,简化了多线程编程。
Apache ZooKeeper Zookeeper是一个实现高可靠的分布式协作的开源项目。Storm使用Zookeeper来协调集群中的多个节点。
supervisor会定时从zookeeper获取拓补信息topologies、任务分配信息assignments及各类心跳信息,以此为依据进行任务分配。
在supervisor同步时,会根据新的任务分配情况来启动新的worker或者关闭旧的worker并进行负载均衡。
worker通过定期的更新connections信息,来获知其应该通讯的其它worker。
worker启动时,会根据其分配到的任务启动一个或多个executor线程。这些线程仅会处理唯一的topology。 如果有新的tolopogy被提交到集群,nimbus会重新分配任务,这个后面会说到。
executor线程负责处理多个spouts或者多个bolts的逻辑,这些spouts或者bolts,也称为tasks。
具体有多少个worker,多少个executor,每个executor负责多少个task,是由配置和指定的parallelism-hint共同决定的,但这个值并不一定等于实际运行中的数目。
如果计算出的总的executors超过了nimbus的限制,此topology将不会得到执行。
并行度的作用:
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; 计算所有tolopogy的topology-id到executors的映射;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;(defn- compute-topology->executors [nimbus storm-ids]
"compute a topology-id -> executors map"
(into {} (for [tid storm-ids]
{tid (set (compute-executors nimbus tid))})))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; 计算topology-id到executors的映射;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;(defn- compute-executors [nimbus storm-id]
(let [conf (:conf nimbus)
storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
component->executors (:component->executors storm-base)
storm-conf (read-storm-conf conf storm-id)
topology (read-storm-topology conf storm-id)
task->component (storm-task-info topology storm-conf)]
(->> (storm-task-info topology storm-conf)
reverse-map (map-val sort)
(join-maps component->executors)
(map-val (partial apply partition-fixed))
(mapcat second)
(map to-executor-id)
)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; 计算topology的task-info;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;(defn storm-task-info "Returns map from task -> component id"
[^StormTopology user-topology storm-conf]
(->> (system-topology! storm-conf user-topology)
all-components ;; 获取每个组件的并行数
(map-val (comp #(get % TOPOLOGY-TASKS) component-conf))
(sort-by first)
(mapcat (fn [[c num-tasks]] (repeat num-tasks c)))
(map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))
(into {})
))
上述代码会在nimbus进行任务分配时调用:
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; nimbus进行任务分配;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;mk-assignments->compute-new-topology->executor->node+port->compute-topology->executors-> ...
基本关系如下所示:
所谓本地发布,是指在worker进程内及executor线程间进行消息发布。 所谓远程发布,是指在worker进程间、不同的机器间进行消息发布。
集群的状态机:
集群的状态是通过一个storm-cluster-state的对象来描述的。 其提供了许多功能接口,比如:
如下图所示:
所以,nimbus会根据心跳、topologies信息及已分配的任务信息为依据,来重新分配任务,如下图所示:
一个topology的提交过程:
主要过程如下图所示:
在发布后的一年半时间里,Nathan及其团队继续开发Storm,以便它能在Twitter内部推广。
大企业对技术的要求不同于创业公司。在创业公司,一个小型团队负责开发、运维和发布所有这些工作。而在大公司,这些工作由多个团队完成。因此,Nathan意识到,他们需要创建一个大型的、共享的集群,可以运行许多独立的应用程序。该集群既要确保应用程序可以得到足够的资源,又要保证一个应用程序出现问题不会影响集群中的其它应用程序。这就是“多租户”。
在建成共享集群后,他们又发现了一个问题。用户总是为他们的拓扑配置远远超出实际需要的资源。这降低了集群效率,用户也失去了优化拓扑的动力。Nathan通过开发“隔离调度器(isolation scheduler)”解决了这些问题。
随着Twitter内部Storm用户的增多,他们又发现,用户需要用指标监控他们的拓扑。为此,他们开发了Storm的监控指标API,使用户可以收集任意完全自定义的指标,然后把它们发送给任意监控系统。
Storm的另一大技术跃进是Trident。它是Storm上的一个“微批处理(micro-batching)”API,提供了“仅执行一次”的处理语义。这使Storm可以应用到许多新的场景里。
此外,在使用体验和性能方面还有许多重要的改进。在第一年里,他们平均一个月发布一个版本。每个版本的发布都会提升Storm的知名度。而且,这也从侧面反映出,项目团队能够及时响应用户的问题。
Nathan认为,构建社区并使开发人员为项目做贡献是构建开源项目最难的部分。
在Storm发布后的一年半时间里,Nathan推动了Storm的所有开发,所有的变更都要经过他的认可。这样做的好处是,他可以控制项目的每个细节,确保它的质量、使用体验及发展方向。但是,“智者驱动(visionary-driven)” 的开发有一个很大的缺点,就是难以建立一个活跃的开发者社区。首先,Nathan控制着一切,其他人鲜有机会做出重大贡献。其次,他本人成了项目的瓶颈。随着“拉请求(pull request)”越来越多,他疲于处理,这延长了反馈/合并周期,打击了贡献者的积极性。
还有一个缺点是,用户会把他看成是项目的单点故障点。这会限制Storm的发展。
最后,这种方式最糟糕的一方面是Nathan本人承担了太多的工作。其他人无法深入了解整个代码库,从而不可避免地会产生预想不到的变更结果。
2013年3月,Nathan离开了Twitter。几个月后,他认识到“共识驱动(consensus-driven)”的开发会更利于Storm的发展。
他认为,在项目的解决方案尚未明确之前,智者驱动的开发是最好的。因为一些关键的设计问题只有对整个项目有深入了解的人才能解决好。但到他离开Twitter的时候,Storm的解决方案已经比较明确了。此后的许多创新工作,如从ZeroMQ切换到Netty、实现安全/身份验证、改进性能/扩展性、提高拓扑可视化等,都是意料之中的。
在Nathan离开Twitter之前四个月,Yahoo!的Andy Feng就极力建议他将Storm提交给Apache。其时,他也恰巧在考虑这个问题。他与Hadoop创建者Doug Cutting进行了交谈,从他那里了解了Apache的运作,以及提交到Apache的优缺点。Doug的建议使他真正了解了共识驱动的工作机制。
在Storm最初切换到共识驱动模型时,大部分提交者对代码库的整体把握都非常有限。这是前期智者驱动的结果。但模型切换后,随着时间推移,部分提交者会学习代码库的更多部分,从而在整体上有一个更深层的理解。
Nathan曾担心,转到共识驱动模型会降低软件质量。实际上,也确实有些变更引入了Bug。但这不是大问题,下个版本可以修复这些问题。其实,智者驱动的开发也是如此。
在离开Twitter后,Nathan的精力都用在了新的创业公司上。他需要为Storm选一个长远的家。而之所以选择Apache,是因为它能为Storm提供一个强大的品牌、坚实的法律基础以及共识驱动的模型。
Storm使用ZeroMQ库进行内部进程通信,但ZeroMQ的许可协议与Apache基金会的政策不一致。因此,Yahoo!几名开发人员(后来成为了Storm的提交者)基于Netty创建了替代方案。
在形成Storm最初的提交者列表时,Nathan选择了一些已经为项目做过较大贡献的公司里的开发人员,其中包括其时尚在Health Market Science的Taylor Goetz。他现在就职于Hortonworks,专门从事Storm方面的工作,并担任Storm项目管理委员会主席。
2014年9月,在Andy Feng的帮助下,Nathan向Apache提交了Storm孵化申请。
Nathan写道,Storm进入孵化状态后,他不再是项目瓶颈,开发速度变得越来越快。提交/反馈周期的缩短,这对提交者来说也是一种激励。同时,他会邀请做出重要贡献的人加入提交者行列。
2014年9月17日,Storm正式毕业,升级为顶级项目,而此时离Storm开源尚不足三年。