我的要求是处理股票市场的每小时数据。即,每次流间隔从源获取一次数据,并通过DStream进行处理。
我已经实现了一个定制接收器,通过实现onStart()和onStop()方法及其工作来废弃/监视网站。
遇到的挑战:
我试过的选择:


class CustomReceiver(interval: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
def onStart() {
new Thread("Website Scrapper") {
override def run() { receive() }
}.start()
}
def onStop() {
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
println("Entering receive:" + new Date());
try {
while (!isStopped) {
val scriptsLTP = StockMarket.getLiveStockData()
for ((script, ltp) <- scriptsLTP) {
store(script + "," + ltp)
}
println("sent data")
System.out.println("going to sleep:" + new Date());
Thread.sleep(3600 * 1000);
System.out.println("awaken from sleep:" + new Date());
}
println("Stopped receiving")
restart("Trying to connect again")
} catch {
case t: Throwable =>
restart("Error receiving data", t)
}
println("Exiting receive:" + new Date());
}
}如何使火花流接收器与DStream处理同步?
发布于 2017-06-12 19:12:53
这个用例似乎不适合于星火流。间隔足够长,可以将其视为一个常规批处理作业。这样,我们就可以更好地利用集群资源。
我会将其重写为星火作业,方法是并行化目标代码,使用mapPartitions将执行器用作分布式web剪贴器,然后按预期进行处理。
然后,将星火作业安排为每小时使用cron或更高级的替代方案(如Chronos )在需要的时间运行。
https://stackoverflow.com/questions/44492551
复制相似问题