Hadoop之Storm

1、Storm概述

如果对Storm用一句话来进行概括,那就是众所周知的“Storm是实时版的Hadoop”。Hadoop为大数据的数据量维度提供了解决方案,不过其本质上是一个批处理平台,并没有带来速度和即时结果/分析需求的解决之道。尽管Hadoop已经成为数据存储和计算领域的转折点,但仍不能解决需要实时分析和结果的问题。

Storm正是定位于大数据速度方面需求的解决方案,该框架对实时数据能够以闪电般的速度执行分布式计算处理。作为一种广泛使用的解决方案,它可用于为高速流数据提供实时警报和分析。Storm是归属于Apache基金会的一个项目,其功能得到了广泛认可并为人知晓。遵循Apache项目的许可规范,Storm是免费和开源的,它是一个分布式计算引擎,具有高度可扩展性、可靠性和容错性且具备保障处理机制,能够处理无界流媒体数据、提供扩展性的计算及顶部上的小微批处理的工具。

Storm为分布式实时计算提供了一组通用原语,可被用于“流处理”之中,实时处理消息并更新数据库。这是管理队列及工作者集群的另一种方式。 Storm也可被用于“连续计算”(continuous computation),对数据流做连续查询,在计算时就将结果以流的形式输出给用户。它还可被用于“分布式RPC”,以并行的方式运行昂贵的运算。Storm可以方便地在一个计算机集群中编写和扩展复杂的实时计算,Storm用于实时处理,就好比 Hadoop 用于批处理。Storm保证每个消息都会得到处理,而且它很快——在一个小集群中,每秒可以处理数以百万计的消息,更棒的是你可以使用任意编程语言来进行开发。

总体来说,Storm为非常宽泛的用例场合提供了解决方案,范围包括警报和实时业务分析、机器学习和ETL用例,以及诸如此类的一些方案。Storm广泛兼容各种各样的输入和输出端点集成。对于输入数据,它可以同RabbitMQ、JMS、Kafka、Amazon Kinesis等领先的队列管理机制良好搭配。对于输出端点,它同Oracle和MySQL这样的主流传统数据库连接良好。Storm的适应性不限于传统的RDBMS系统,它与Cassandra、HBase等大数据存储亦有着非常友好的接口。

简单进行总结,Storm的特点如下:

编程简单:开发人员只需要关注应用逻辑,而且跟Hadoop类似,Storm提供的编程原语也很简单;

高性能、低延迟:可以应用于广告搜索引擎这种要求对广告主的操作进行实时响应的场合;

分布式:可以轻松应对数据量大,单机搞不定的场合;

可扩展:随着业务发展,数据量和计算量越来越大,系统可水平扩展;

容错:单个节点挂掉不影响应用;

消息不丢失:保证消息处理等。

2、离线计算&实时(流式)计算

A、离线计算

离线计算就是在计算开始前已知所有输入数据,输入数据不会发生变化,且在解决一个问题后就要立即得出结果的前提下进行的计算。离线计算的特点有:

数据量巨大且保存时间长;

在大量数据上进行复杂的批量运算;

数据在计算之前已经完全到位,不会发生变化;

能够方便地查询批量计算的结果。

简单使用几个关键词来概括离线计算,那就是:

主要特点:批量获取数据、批量传输数据、周期性批量计算数据、数据展示等;

代表技术:Sqoop批量导入数据、MapReduce批量计算数据、Hive/HDFS批量存储数据等。

B、实时/流式计算

与离线计算相对应的就是实时计算(也叫流式计算),在实时计算中,输入数据是以序列化的方式一个个输入并进行处理,也就是说在开始的时候并不需要知道所有的输入数据。同样地,简单使用几个关键词来概括实时计算,那就是:

主要特点:数据实时产生、数据实时传输、数据实时计算、数据实时存储、数据实时展示等;

代表技术:Flume实时获取数据、Kafka/metaq实时存储数据、Storm/JStorm实时计算数据、Redis实时缓存数据、持久化数据存储(Oracle/MySQL)等。

一句话总结:将源源不断产生的数据实时收集并实时计算,尽可能快地得到计算结果。实时计算的工作流程,类似于自来水厂的处理过程,如下图所示:

C、Storm与Hadoop比较

3、Storm的体系结构

Storm有两种工作执行模式,如下:

本地模式:这是一个通常用于演示和测试的单节点及非分布式的运行模式。这里,整个拓扑在单个工作者中执行,因此是单个JVM;

分布式模式:这是一个完全或部分分布式的多节点运行模式,也是实时应用程序开发和部署的推荐模式。

Storm集群采用主从架构方式,主节点是Nimbus,从节点是Supervisor,调度相关的信息存储在Zookeeper集群中,Storm体系结构如下图所示:

Nimbus:Storm集群的Master节点,负责集群监控、资源分配和任务提交,将任务指派给具体的Supervisor节点上的Worker节点,去运行Topology对应的组件(Spout/Bolt)的Task;

Supervisor:Storm集群的从节点,负责接收Nimbus分配的任务,启动和停止属于自己管理的Worker进程。通过Storm配置文件中的supervisor.slots.ports配置项,可以指定在一个Supervisor上最大允许多少个Slot,每个Slot通过端口号来唯一标识,一个端口号对应一个Worker进程(这样就设定了一个Supervisor上能够启动多少个Worker进程);

Worker:运行具体处理逻辑组件的进程,Worker运行的任务类型只有两种,一种是Spout任务,另一种是Bolt任务;

Zookeeper:用来协调Nimbus和Supervisor,如果Supervisor因故障出现问题而无法运行Topology,Nimbus会第一时间感知到,并重新分配Topology到其它可用的Supervisor上运行。

4、Storm的运行机制

在Storm中,一个实时应用的计算任务被打包作为Topology发布,这同Hadoop的MapReduce任务相类似。但有一点不同的是,在Hadoop中,MapReduce任务最终会执行完成后结束,而在Storm中,Topology任务一旦提交后永远不会结束,除非人为显式地去停止任务。计算任务Topology是由不同的Spouts和Bolts,通过数据流(Stream)连接起来的图,一个Storm集群在运行一个Topology时,主要是通过Worker进程、Executor线程及Task任务三个实体来执行工作的。

下图简要描述了这三个实体之间的关系:

Worker:运行具体处理逻辑组件的进程,Storm集群里的1台物理机会启动1个或多个Worker进程(即JVM进程),所有的Topology将在这些Worker进程里被运行;

Executor:Storm 0.8版本之后,Executor为Worker进程中具体的物理线程,同一个Spout/Bolt的Task可能会共享一个物理线程,一个Executor中只能运行隶属于同一个Spout/Bolt的Task,换句话说,在1个单独的Worker进程里会运行1个或多个Executor线程,每个Executor只会运行1个Topology的1个component(Spout或Bolt)的Task实例;

Task:Worker中的每一个Spout/Bolt线程称为一个task,在Storm 0.8版本之后,task不再与物理线程对应,不同Spout/Bolt的task可能会共享同一个物理线程,该线程称为Executor;

1个Worker进程执行的是1个Topology的子集(注:不会出现1个Worker进程为多个Topology服务的情况)。1个Worker进程会启动1个或多个Executor线程来执行1个Topology的component(Spout或Bolt)。因此,1个运行中的Topology就是由集群中多台物理机上的多个Worker进程组成的。

Executor是1个被Worker进程启动的单独线程。每个Executor只会运行1个Topology的1个component(Spout或Bolt)的Task(注:Task可以是1个或多个,Storm默认是1个component只生成1个Task,Executor线程里会在每次循环里顺序调用所有Task实例)。

Task是最终运行Spout或Bolt中代码的单元(注:1个Task即为Spout或Bolt的1个实例,Executor线程在执行期间会调用该Task的nextTuple或execute方法)。Topology启动后,1个component(Spout或Bolt)的Task数目是固定不变的,但该component使用的Executor线程数可以动态调整(例如:1个Executor线程可以执行该component的1个或多个task实例)。这意味着,对于1个component存在这样的条件:#threads

下图以自来水厂为例,简单描述自来水处理的过程,通过这张图可以很清晰地了解Storm中Topology的执行逻辑。

整个处理流程的组织协调不需要用户去关心,用户只需要去定义每一个步骤中的具体业务处理逻辑;

具体执行任务的角色是Worker,Worker执行任务时,具体的行为则由我们定义的业务逻辑决定。

Storm的物理集群结构如下图所示:

下图是Storm中的数据交互图,可以看出两个模块Nimbus和Supervisor之间没有直接交互,状态信息都是保存在Zookeeper中,Worker之间通过Netty传送数据。这里尤其需要注意的一点是,Storm中所有的元数据信息都保存在Zookeeper中!

5、Storm的可靠性

Storm的独特卖点之一是对消息处理的保障,这让它成为十分有利的解决方案。

A、Spout的可靠性

Spout会记录它所发射出去的tuple,当下游任意一个Bolt处理失败时,Spout能够重新发射该tuple。在Spout的nextTuple()发送一个tuple时,为实现可靠消息处理需要给每个Spout发出的tuple带上唯一ID,并将该ID作为参数传递给SpoutOutputCollector的emit()方法:collector.emit(new Values("value1","value2"),tupleID);

实际上,每个Bolt每收到一个tuple,都要向上游应答或报错,在tuple树上的所有Bolt都确认应答,Spout才会隐式调用ack()方法表明这条消息(一条完整的流)已经处理完毕,将会对编号为ID的消息应答确认;处理报错、超时则会调用fail()方法。

B、Bolt的可靠性

Bolt的可靠消息处理机制包含两个步骤:

a、当发射衍生的tuple,需要锚定读入的tuple;

b、当处理消息时,需要应答或报错。

6、Storm节点间通信

一开始Storm以ZeroMQ作为管道来进行节点间的通信,在0.9版本的Storm中,它被Netty尝试性替换,在0.9.2版本中,Netty完全将ZeroMQ取而代之。下面将分别简单介绍ZeroMQ和Netty,以了解一些必要的信息,包括二者之间的不同,以及它们在同类技术中被Storm实施者选中的缘由。

A、ZeroMQ

ZeroMQ不属于像AMQP、RabbitMQ那样完全边缘的消息系统,它是一套可以扩展的文件库,可用于构建真正绩效型的消息系统,适应于精益、严苛的情况需求。ZeroMQ不是一个全面型的框架,它只是一个可扩展的工具包——一个实现功能不错的异步消息库。由于没有在各个层面上向框架过度带来额外负担的拖累,它保持了高性能。该库具有实现快速、高效的消息传递解决方案所有必要的组件,并可通过各种适配器与大量编程组件集成。ZeroMQ是一个轻量级、异步和超快的消息工具包,可供开发人员制作高性能解决方案。

ZeroMQ以C++实现,因此不会像基于JVM的应用程序那样遇到性能和GC问题。这里有一些关键因素使这个库成为在同一个节点或不同节点上的不同工作者之间通信的选择:

它是一个轻量级的异步套接字库,可以制作一个高性能并发框架;

它比TCP快,是集群设置中节点间通信的理想选择;

它不是一个网络协议,但适应诸如IPC、TCP、组播等各种各样的协议;

异步风格有助于为多核消息传输应用程序构建可扩展的I/O模型;

它有扇出、发布-订阅、请求-回复、管道等各种内置的消息模式。

B、Netty

前面描述ZeroMQ的部分,清晰地表明ZeroMQ曾是Storm的完美选择,但是随着时间的推移,Storm改进者已经意识到使用ZeroMQ作为传输层表现出的问题,作为一个原生库,它遇到了平台特定的问题。在早期,Storm与ZeroMQ的安装不是很容易,需要每次下载安装并且构建。另一重要问题是Storm与ZeroMQ紧密耦合,并与相对较旧的2.1.7版本配合工作。

Netty是基于NIO框架的客户端-服务器消息传递解决方案,允许快速开发,并具有相对简单、易于使用的设计。它具有高性能和可伸缩性,还有着可扩展性和灵活性等特性。Netty相比于同类软件,出色的地方表现在以下几方面:

具有非阻塞异步操作的低延迟计算;

更高的吞吐量;

资源消耗更低;

内存复制最小化;

安全的SSL和TLS支持。

Netty为Storm提供了可插入性,开发人员可以仅通过storm.yaml配置文件的设置,在ZeroMQ和Netty两个传输层协议之间进行选择。

7、Storm的性能优化

为了能够优化Storm的性能,理解什么是性能瓶颈非常重要,只有知道陷阱在哪里,才能成功避开它们。与其它所有大数据框架一样,Storm另一个颇为值得注意的方面是,它没有性能的经验法则:每个场景都是唯一的,因此每个场景的性能优化计划也都是唯一的。所以,这部分更多的是指导性建议以及经典的“要和不要”提醒,在用例的实际工作中,需要经过几轮对系统的观察和调整后,性能提高的效果才能显现出来。

从根本上来说,Storm是一个高性能的分布式处理系统,工作分配发挥作用的那一刻,它带来了自己的潘多拉盒子,其可能是性能故障,例如同一节点上不同进程之间的交互,或又例如需要信道、数据源和接收器的不同节点上不同进程间的交互。在拓扑中,可以将每个节点看作图中下一个节点的源,在此分布式设置中,以下都是可能会出错的方面:

接收器Bolt可能变慢或堵塞。例如,Bolt运行容量等于1就是达到100%的效率,因此它不会再从ZeroMQ或Netty通道拾取任何消息;

数据源Bolt并不知道这个事实情况,它只是超级有效地将越来越多的元组抽入队列;

结果是队列容量被充满,造成数据溢出或爆满。消息处理失败,因此根据Storm的规则重放,从而形成一个无限扰动的恶性循环;

此时应该注意观察延迟:执行延迟和进程延迟,尝试保持Bolt容量低于1。

可以通过增加并行性来实现性能提升,但同时要认识到工作者进程和核心数量的协调,以避免盲目增加并行性的行为造成所有进程和线程都在竞争中死亡的后果。

Storm集群应根据用例的需求进行调整、缩放和大小调整,以下是需要注意的事项:

输入源的数量和种类;

数据到达速率,TPS或每秒的事务量;

每个事件的大小;

组件(Spout/Bolt)中哪个是最有效的,哪个是最低效的。

性能优化的基本规则如下:

了解网络传输的情况,只有在实际需要时才进行传输。网络延迟影响巨大并且通常不受程序员控制。因此,在采用一个拓扑,且其中所有Spout和Bolt只分布在单个节点上时,与它们被分散在集群的不同节点上的情况相比,前者将有更好的性能,因为它节省了网络跃点的耗费,而后者在出错的情况下具有更高的生存机会,因为信息是在整个集群中而不仅仅在单个节点上传播;

确认者的数量应该保持等于主管的数量;

对于CPU密集型拓扑,执行器的数量通常应该保持为每核1个。

假设有10个主管,每个主管有16个核心,那么有多少并行单位呢?答案是10*16=160。每个节点1个确认者=10个确认者进程的规则;剩余并行单元=160-10=150。将较多并行单元分配给较慢的任务,较少并行单元分配给较快的任务。发送->计算->持久化[拓扑中的三个组件]。其中,持久化是最慢的任务(I/O约束),可以有如下的并行度分配:发送[并行度10]->计算[并行度50]->持久化[并行度90]:

此参数的默认值是unset,表示没有限制,但是基于用例时需要限制,以便当未确认消息的数量等于所设定的参数值的条件达到时,可以停止读取更多数据以让处理赶上并确认所有消息,然后继续进行。此值的设置应该考虑:

不要小到让拓扑在等待空闲;

不要太大,以至于拓扑在处理和确认消息之间被消息淹没。

适宜的建议是,从1000开始,然后对其调整,以找出最适合用例的取值。在Trident的情况下,从15~20的较小值开始调整。

Storm集群里默默无闻的协调者是Zookeeper,这是所有控制和协调的核心,这里的所有操作通常是I/O绑定的磁盘操作。因此,以下的建议有助于保持Zookeeper和Storm集群的高效工作:

使用Zookeeper仲裁(至少有一处三节点设置);

使用专用硬件;

使用专用固定硬盘SSD提高性能。

参考文献:

——《实时大数据分析 基于Storm、Spark技术的实时应用》

——《CSDN博客》

——《百度百科》

——《潭州大数据课程课件》

  • 发表于:
  • 原文链接:https://kuaibao.qq.com/s/20180722G1BL5N00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

同媒体快讯

扫码关注云+社区

领取腾讯云代金券