首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何设置火花流接收频率?

如何设置火花流接收频率?
EN

Stack Overflow用户
提问于 2017-06-12 06:19:02
回答 1查看 499关注 0票数 2

我的要求是处理股票市场的每小时数据。即,每次流间隔从源获取一次数据,并通过DStream进行处理。

我已经实现了一个定制接收器,通过实现onStart()和onStop()方法及其工作来废弃/监视网站。

遇到的挑战:

  • 接收线程正在连续地获取数据,即每间隔多次。
  • 无法协调接收器和DStream的执行时间间隔。

我试过的选择:

  1. 接收线程休眠几秒钟(等于流间隔)。在这种情况下,数据不是处理时的最新数据。

代码语言:javascript
运行
复制
class CustomReceiver(interval: Int)
    extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

  def onStart() {
    new Thread("Website Scrapper") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {

  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    println("Entering receive:" + new Date());
    try {
      while (!isStopped) {
        val scriptsLTP = StockMarket.getLiveStockData()
        for ((script, ltp) <- scriptsLTP) {
          store(script + "," + ltp)
        }
        println("sent data")
        System.out.println("going to sleep:" + new Date());
        Thread.sleep(3600 * 1000);
        System.out.println("awaken from sleep:" + new Date());
      }
      println("Stopped receiving")
      restart("Trying to connect again")
    } catch {
      case t: Throwable =>
        restart("Error receiving data", t)
    }
    println("Exiting receive:" + new Date());
  }
}

如何使火花流接收器与DStream处理同步?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-06-12 19:12:53

这个用例似乎不适合于星火流。间隔足够长,可以将其视为一个常规批处理作业。这样,我们就可以更好地利用集群资源。

我会将其重写为星火作业,方法是并行化目标代码,使用mapPartitions将执行器用作分布式web剪贴器,然后按预期进行处理。

然后,将星火作业安排为每小时使用cron或更高级的替代方案(如Chronos )在需要的时间运行。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44492551

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档