前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >周期性清除Spark Streaming流状态的方法

周期性清除Spark Streaming流状态的方法

作者头像
王知无-import_bigdata
修改2019-08-17 23:14:41
1.1K0
修改2019-08-17 23:14:41
举报

5万人关注的大数据成神之路,不来了解一下吗?

5万人关注的大数据成神之路,真的不来了解一下吗?

5万人关注的大数据成神之路,确定真的不来了解一下吗?

欢迎您关注《大数据成神之路》

在Spark Streaming程序中,我们经常需要使用有状态的流来统计一些累积性的指标,比如各个商品的PV。简单的代码描述如下,使用mapWithState()算子:

代码语言:javascript
复制
 val productPvStream = stream.mapPartitions(records => {
    var result = new ListBuffer[(String, Int)]
      for (record <- records) {
        result += Tuple2(record.key(), 1)
      }
    result.iterator
  }).reduceByKey(_ + _).mapWithState(
    StateSpec.function((productId: String, pv: Option[Int], state: State[Int]) => {
      val sum = pv.getOrElse(0) + state.getOption().getOrElse(0)
      state.update(sum)
      (productId, sum)
  })).stateSnapshots()

现在的问题是,PV并不是一直累加的,而是每天归零,重新统计数据。要达到在凌晨0点清除状态的目的,有以下两种方法。

编写脚本重启Streaming程序

用crontab、Azkaban等在凌晨0点调度执行下面的Shell脚本:

代码语言:javascript
复制
stream_app_name='com.xyz.streaming.MallForwardStreaming'
cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`

if [ ${cnt} -eq 1 ]; then
  pid=`ps aux | grep SparkSubmit | grep ${stream_app_name} | awk '{print $2}'`
  kill -9 ${pid}
  sleep 20
  cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`
  if [ ${cnt} -eq 0 ]; then
    nohup sh /path/to/streaming/bin/mall_forward.sh > /path/to/streaming/logs/mall_forward.log 2>&1
  fi
fi

这种方式最简单,也不需要对程序本身做任何改动。但随着同时运行的Streaming任务越来越多,就会显得越来越累赘了。

给StreamingContext设置超时

在程序启动之前,先计算出当前时间点距离第二天凌晨0点的毫秒数:

代码语言:javascript
复制
def msTillTomorrow = {
  val now = new Date()
  val tomorrow = new Date(now.getYear, now.getMonth, now.getDate + 1)
  tomorrow.getTime - now.getTime
}

然后将Streaming程序的主要逻辑写在while(true)循环中,并且不像平常一样调用StreamingContext.awaitTermination()方法,而改用awaitTerminationOrTimeout()方法,即:

代码语言:javascript
复制
while (true) {
    val ssc = new StreamingContext(sc, Seconds(BATCH_INTERVAL))
    ssc.checkpoint(CHECKPOINT_DIR)

    // ...处理逻辑...

    ssc.start()
    ssc.awaitTerminationOrTimeout(msTillTomorrow)
    ssc.stop(false, true)
    Thread.sleep(BATCH_INTERVAL * 1000)
  }

在经过msTillTomorrow毫秒之后,StreamingContext就会超时,再调用其stop()方法(注意两个参数,stopSparkContext表示是否停止关联的SparkContext,stopGracefully表示是否优雅停止),就可以停止并重启StreamingContext。

以上两种方法都是仍然采用Spark Streaming的机制进行状态计算的。如果其他条件允许的话,我们还可以抛弃mapWithState(),直接借助外部存储自己维护状态。比如将Redis的Key设计为product_pv:[product_id]:[date],然后在Spark Streaming的每个批次中使用incrby指令,就能方便地统计PV了,不必考虑定时的问题。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-06-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 编写脚本重启Streaming程序
  • 给StreamingContext设置超时
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档