之前小强和大家共同和写了一个Spark Streaming版本的workcount,那小强发这篇文章和大家聊聊,Streaming背后的故事。
Discretized Stream或者DStream是StreamingContext提供的最基本的抽象,它代表了一系列连续的数据片,包括从数据源哪里接收到的数据和通过各种转换操作得到的输出数据。在Spark内部,DStream就是一系列连续的RDD(弹性分布式数据集)。每一个DStream中的RDD包含了明确的时间间隔内的数据,如下图所示。
每一个我们定义在DStream上面的操作都是作用在底层的RDD上面。比如我们前面例子中的词频统计操作,flatMap操作作用在了lines DStream中的每一个RDD上面并生成了一个words DStream。这个过程正如下图所示。
这些底层RDD上面的操作,都会被Spark引擎所处理。而DStream操作则隐藏了大多数的细节,并提供给我们一个非常好用的高层次的API。
nput DStream是一个从流数据源接受流数据的DStream。在快速开始的例子中,lines就是一个Input DStream,它从netcat server接受流数据。任何一个Input DStream(除了file stream,后面我们会讲到) 都会关联一个Receiver对象,这个对象负责从流数据源接收流数据然后放到内存中等待处理。
Spark Streaming提供了两种类型的流数据源:
下面的章节中,我们会依次对这些数据源进行说明。
注意,如果你想要在你的流处理程序中启用多个不同的数据源,那么你只需要创建多个Input DStream。这样就会有多个Receiver来同时接收不同的流数据。需要注意的是,Spark的work/executor是一个长时间运行的应用。因此,一定要记住一个Spark Streaming应用程序需要分配足够的核心来处理接收的数据,以及运行接收器。
要记住的点:
我们已经在前面的快速开始例子中展示了ssc.socketTextStream(...),它创建了一个从TCP端口接收文本数据的DStream。除此之外,Spark Streaming还为我们提供了一个创建从文件接收数据的DStream。
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
/**
Spark Streaming会监控目录dataDirectory,然后处理任何在这个目录中创建的文件(嵌套目录不支持),注意:
1. 数据文件必须都有同样的格式
2. 数据文件必须通过moving或者renaming方式来创建在监控目录中。
3. 一旦数据文件移动到监控目录中就不能再改变了,持续追加的新数据并不能被识别。
对于普通的文本文件,有一个更简单的方法streamingContext.textFileStream(dataDirectory)。并且File Stream没有Receiver,也就不用占用处理核心了。
*/
这一类的数据源需要依赖non-spark的包才能运行,其中一些更需要大量复杂的依赖包(比如Kafka和Flame)。因此为了最小化依赖冲出问题,创建这些数据源的方法都被移到了一些独立的包里,你在需要的时候可以引入到你的应用中。
这些高级数据源是不能直接在spark-shell中使用的,因此带有这些高级数据源的应用不能在spark-shell中进行试验。如果你真的需要再spark-shell中使用这些高级数据源,你需要下载这些依赖包然后把他们加入到类路径中。
Spark Streaming中基于可靠新来说有两种数据源。一种数据源(Kafka和Flame)允许对传输的数据进行确认。系统从这些具有可靠性的数据源接受的数据都是确保正确的,它可以保证在任何错误的情况下数据都不丢失。
这个updateStateByKey操作允许你管理状态,并可以不断地使用新的状态信息来更新这个状态。要使用这个操作,你需要两步操作:
Spark会把状态更新函数应用于每一个RDD中每一个Key对应数据集Seq,不论这个新的数据分片中是否有数据。如果状态更新函数返回None,那么这个key-value对就会被废弃了。
下面我们用一个例子来对这个状态更新函数进行说明。在这个例子中,我们会对文本输入流的数据进行词频统计。定义这个状态类型为Int,状态更新函数如下:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}
这个函数作用于一个文本输入流上,会按word进行词频统计生成键值对(word,n)。
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
在执行过程中,newValues是当前RDD中的(word,1)键值对中的value值组成的Seq,runningCount是之前统计的key对应的value状态值。我们将其累加,得到新的状态值。
Treansform操作可以对允许任何RDD-to-RDD的装换函数,作用在DStream上。通过这个操作,我们可以利用一些DStream不支持但是RDD支持的API,可以让我们的程序更加灵活。举个例子,把DStream中的每一个数据集和另外的一个数据集做Join操作,这个DStream的join部没有对这个进行支持,所以我们需要使用transform操作,先把DStream转化为RDD然后在进行join。
下面的例子中,我们将进行一个数据清洗操作。首先我们要把输入数据和一份已经处理好的数据(来自于HDFS中)做join,然后再根据相应的规则进行过滤。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
}
注意,这个操作会在每一个时间间隔内被执行一次,所以它允许你做实时的RDD操作。都可以在每一个批次间,改变比如改变分区数,广播变量等等。
Spark Streaming允许一个windowed computations,他可以让你在一个sliding window上应用一些transformactions操作。下面这张图,对这个进行了描述。
上图所示,我们定义了一个窗口,这个窗口会在源DStream上进行滑动,落在这个窗口内的源DStream数据会被合并为一个windowed DStream,而你可以对这个windowed DStream定义若干transformactions。图示的这个窗口大小为3个time units,并且每次滑动2个time units。所以我们在定义窗口的时候需要两个参数:
必须注意的是:这两个参数都必须是源DStream的batch interval的整数倍。
接下来,我们用一个例子来演示一下window operation。我们每隔10秒钟,统计一个过去30秒内从Input DStream中接受的单词词频。为了实现这个需求,我们需要在过去30秒的数据形成的(word,1)上执行reduceByKey操作,对应的window operation为reduceByKeyAndWindow。
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)
Output operations允许把DStream中的数据推送到外部系统中,比如数据库或者文件系统。因为output operations是真正的让外部系统来消费DStream数据,所以他会触发DStream上定义的transformactions。我们来看一下DStream支持的output operations。
DStream.foreachRDD操作是非常强大的,他可以以最简单粗暴的方式把数据推送到外部系统上。为了能够正确的并且高效的使用这个output operation。我们需要避免以下常出现的错误使用方式。
通常情况下导出DStream数据到外部系统需要创建一个连接,使用这个连接来推送数据到外部系统。所以某些开发人员就会尝试在driver中创建一个连接,然后在worker中使用它。就像下面这样:
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}
这种方式是不对的,你在driver上定义了connection,然后把他们序列化后给到worder去使用。因为这些connection对象几乎不可能跨机器使用的。它会引起一个serializable exception。正确的做法是在worker上面创建connection。
可是下面的方法会为每一个DStream中的元素创建一个connection,效率是十分低下的。
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
通常情况下,创建connection对象是需要时间和资源开销的。因此为每一个数据元素创建和销毁connection必然带来了不必要的开销,降低了整个系统的吞吐量。因此,最好的解决方案是使用RDD.foreachPartition来为每一个数据分片创建一个connection对象,然后使用这个对象发送分片数据到外部系统,完成之后销毁这个对象。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
我们可以进一步优化代码,让多个RDD数据分片复用这些connection。我们用一个静态的connection pool来管理这些connection,让RDD数据分片复用这些connection来推送数据到外部系统。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
当然这个connection pool是一个懒加载的,connection会根据需求来创建,并会在一定时间空闲后被销毁,这样才是一个高效的系统。
最后还有几点要注意的:
【转发】和【关注】是对小强最大的支持!!!
本文分享自 MoziInnovations 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体分享计划 ,欢迎热爱写作的你一起参与!