分布式流处理是对无边界数据集进行连续不断的处理、聚合和分析。它跟MapReduce
一样是一种通用计算,但我们期望延迟在毫秒或者秒级别。这类系统一般采用有向无环图(DAG
)。
DAG
是任务链的图形化表示,我们用它来描述流处理作业的拓扑。如下图,数据从sources
流经处理任务链到sinks
。单机可以运行DAG
,但本篇文章主要聚焦在多台机器上运行DAG
的情况。
当选择不同的流处理系统时,有以下几点需要注意的:
map
或者filter
这类易扩展、处理单条信息的函数;处理多条信息的函数aggregation
;跨数据流、不易扩展的操作join
。at most once
,at least once
和exactly once
。At most once
的消息传输机制是每条消息传输零次或者一次,即消息可能会丢失;At least once
意味着每条消息会进行多次传输尝试,至少一次成功,即消息传输可能重复但不会丢失;Exactly once
的消息传输机制是每条消息有且只有一次,即消息传输既不会丢失也不会重复。Latency
),吞吐量(Throughput
)和扩展性(Scalability
)是流处理应用中极其重要的指标。运行时和编程模型是一个系统最重要的特质,因为它们定义了表达方式、可能的操作和将来的局限性。因此,运行时和编程模型决定了系统的能力和适用场景。
实现流处理系统有两种完全不同的方式:
(1) 一种是称作原生流处理
,意味着所有输入的记录一旦到达即会一个接着一个进行处理。
(2) 第二种称为微批处理
。把输入的数据按照某种预先定义的时间间隔(典型的是几秒钟)分成短小的批量数据,流经流处理系统。
两种方法都有其先天的优势和不足。首先以原生流处理
开始,原生流处理的优势在于它的表达方式。数据一旦到达立即处理,这些系统的延迟性远比其它微批处理要好。除了延迟性外,原生流处理的状态操作也容易实现,后续将详细讲解。一般原生流处理系统为了达到低延迟和容错性会花费比较大的成本,因为它需要考虑每条记录。原生流处理的负载均衡也是个问题。比如,我们处理的数据按key分区,如果分区的某个key是资源密集型,那这个分区很容易成为作业的瓶颈。
接下来看下微批处理
。将流式计算分解成一系列短小的批处理作业,也不可避免的减弱系统的表达力。像状态管理或者join
等操作的实现会变的困难,因为微批处理系统必须操作整个批量数据。并且,batch interval
会连接两个不易连接的事情:基础属性和业务逻辑。相反地,微批处理系统的容错性和负载均衡实现起来非常简单,因为微批处理系统仅发送每批数据到一个worker
节点上,如果一些数据出错那就使用其它副本。微批处理系统很容易建立在原生流处理系统之上。
编程模型一般分为组合式
和声明式
。组合式编程提供基本的构建模块,它们必须紧密结合来创建拓扑。新的组件经常以接口的方式完成。相对应地,声明式API操作是定义的高阶函数。它允许我们用抽象类型和方法来写函数代码,并且系统创建拓扑和优化拓扑。声明式API经常也提供更多高级的操作(比如,窗口函数或者状态管理)。后面很快会给出样例代码。
有一系列各种实现的流处理框架,不能一一列举,这里仅选出主流的流处理解决方案,并且支持Scala API
。因此,我们将详细介绍Apache Storm
,Trident
,Spark Streaming
,Samza
和Apache Flink
。前面选择讲述的虽然都是流处理系统,但它们实现的方法包含了各种不同的挑战。这里暂时不讲商业的系统,比如Google MillWheel
或者Amazon Kinesis
,也不会涉及很少使用的Intel GearPump
或者Apache Apex
。
Apache Storm
最开始是由Nathan Marz
和他的团队于2010年在数据分析公司BackType
开发的,后来BackType
公司被Twitter
收购,接着Twitter
开源Storm
并在2014年成为Apache
顶级项目。毋庸置疑,Storm
成为大规模流数据处理的先锋,并逐渐成为工业标准。Storm
是原生的流处理系统,提供low-level
的API。Storm
使用Thrift
来定义topology
和支持多语言协议,使得我们可以使用大部分编程语言开发,Scala
自然包括在内。
Trident
是对Storm
的一个更高层次的抽象,Trident
最大的特点以batch
的形式进行流处理。Trident
简化topology
构建过程,增加了窗口操作、聚合操作或者状态管理等高级操作,这些在Storm
中并不支持。相对应于Storm
的At most once
流传输机制,Trident
提供了Exactly once
传输机制。Trident
支持Java
,Clojure
和Scala
。
当前Spark
是非常受欢迎的批处理框架,包含Spark SQL
,MLlib
和Spark Streaming
。Spark
的运行时是建立在批处理之上,因此后续加入的Spark Streaming
也依赖于批处理,实现了微批处理。接收器把输入数据流分成短小批处理,并以类似Spark
作业的方式处理微批处理。Spark Streaming
提供高级声明式API(支持Scala
,Java
和Python
)。
Samza
最开始是专为LinkedIn
公司开发的流处理解决方案,并和LinkedIn
的Kafka
一起贡献给社区,现已成为基础设施的关键部分。Samza
的构建严重依赖于基于log
的Kafka
,两者紧密耦合。Samza
提供组合式API,当然也支持Scala
。
最后来介绍Apache Flink
。Flink
是个相当早的项目,开始于2008年,但只在最近才得到注意。Flink
是原生的流处理系统,提供high level
的API。Flink
也提供API
来像Spark
一样进行批处理,但两者处理的基础是完全不同的。Flink
把批处理当作流处理中的一种特殊情况。在Flink
中,所有的数据都看作流,是一种很好的抽象,因为这更接近于现实世界。
快速的介绍流处理系统之后,让我们以下面的表格来更好清晰的展示它们之间的不同:
Wordcount
之于流处理框架学习,就好比hello world
之于编程语言学习。它能很好的展示各流处理框架的不同之处,让我们从Storm
开始看看如何实现Wordcount
:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new Split(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
...
Map<String, Integer> counts = new HashMap<String, Integer>();
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.containsKey(word) ? counts.get(word) + 1 : 1;
counts.put(word, count);
collector.emit(new Values(word, count));
}
首先,定义topology
。第二行代码定义一个spout
,作为数据源。然后是一个处理组件bolt
,分割文本为单词。接着,定义另一个bolt
来计算单词数(第四行代码)。也可以看到魔数5,8和12,这些是并行度,定义集群每个组件执行的独立线程数。第八行到十五行是实际的WordCount bolt
实现。因为Storm
不支持内建的状态管理,所有这里定义了一个局部状态。
按之前描述,Trident
是对Storm
的一个更高层次的抽象,Trident
最大的特点以batch
的形式进行流处理。除了其它优势,Trident
提供了状态管理,这对wordcount
实现非常有用:
public static StormTopology buildTopology(LocalDRPC drpc) {
FixedBatchSpout spout = ...
TridentTopology topology = new TridentTopology();
TridentState wordCounts = topology.newStream("spout1", spout)
.each(new Fields("sentence"),new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(),
new Count(), new Fields("count"));
...
}
如你所见,上面代码使用higher level
操作,比如each
(第七行代码)和groupby
(第八行代码)。并且使用Trident
管理状态来存储单词数(第九行代码)。
下面是时候祭出提供声明式API的Apache Spark
。记住,相对于前面的例子,这些代码相当简单,几乎没有冗余代码。下面是简单的流式计算单词数:
val conf = new SparkConf().setAppName("wordcount")
val ssc = new StreamingContext(conf, Seconds(1))
val text = ...
val counts = text.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.print()
ssc.start()
ssc.awaitTermination()
每个Spark Streaming
的作业都要有StreamingContext
,它是流式函数的入口。StreamingContext
加载第一行代码定义的配置conf
,但更重要地,第二行代码定义batch interval
(这里设置为1秒)。第六行到八行代码是整个单词数计算。这些是标准的函数式代码,Spark
定义topology
并且分布式执行。第十二行代码是每个Spark Streaming
作业最后的部分:启动计算。记住,Spark Streaming
作业一旦启动即不可修改。
接下来看下Apache Samza
,另外一个组合式API例子:
class WordCountTask extends StreamTask {
override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector,
coordinator: TaskCoordinator) {
val text = envelope.getMessage.asInstanceOf[String]
val counts = text.split(" ").foldLeft(Map.empty[String, Int]) {
(count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wordcount"), counts))
}
Samza
的属性配置文件定义topology
,为了简明这里并没把配置文件放上来。定义任务的输入和输出,并通过Kafka topic
通信。在单词数计算整个topology
是WordCountTask
。在Samza
中,实现特殊接口定义组件StreamTask
,在第三行代码重写方法process
。它的参数列表包含所有连接其它系统的需要。第八行到十行简单的Scala
代码是计算本身。
Flink
的API跟Spark Streaming
是惊人的相似,但注意到代码里并未设置batch interval
:
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(...)
val counts = text.flatMap ( _.split(" ") )
.map ( (_, 1) )
.groupBy(0)
.sum(1)
counts.print()
env.execute("wordcount")
上面的代码是相当的直白,仅仅只是几个函数式调用,Flink支持分布式计算。
上面给出了基本的理论和主流流处理框架介绍,下篇文章将会更深入的探讨其它关注点。希望你能对前面的文章感兴趣,如果有任何问题,请联系我讨论这些主题。
原文:http://www.infoq.com/cn/articles/comparison-of-main-stream-processing-framework