前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Streaming优雅的关闭策略优化

Spark Streaming优雅的关闭策略优化

作者头像
我是攻城师
发布2018-05-14 17:32:38
1.5K0
发布2018-05-14 17:32:38
举报
文章被收录于专栏:我是攻城师我是攻城师

前面文章介绍了不少有关Spark Streaming的offset的管理以及如何优雅的关闭Spark Streaming的流程序。

到目前为止还有几个问题:

(1)有关spark streaming集成kafka时,如果kafka新增分区, 那么spark streaming程序能不能动态识别到而不用重启?

(2)如果需要重启,那么在自己管理offset时,如何才能识别到新增的分区?

(3)spark streaming优雅关闭的策略还有那些?

首先第一个问题,如果kafka要新增分区,对于正在运行的实时流程序能否动态识别到?

经过测试,是不能识别的,我推测使用createDirectStream创建流对象一旦创建就是不可变的,也就是说创建实例那一刻的分区数量,会一直使用直到流程序结束,就算中间kafka的分区数量扩展了,流程序也是不能识别到的。所以在扩展kafka分区前,一定要先把流程序给停掉,然后扩展完成后需要再次重启流程序。

然后看第二个问题,如果是我们自己管理offset时,一定要考虑到kafka扩展分区的情况,每次启动程序前都得检测下目前保存的偏移量里面的kafka的分区个数是否小于kafka实际元数据里面实际的分区个数,正常没扩展分区的情况下两个值应该是相等的,如果值不一致,就说明是kafka分区得到扩展了,所以我们的程序需要能够兼容这种情况。

核心代码如下:

上面的代码在每次启动程序时,都会检查当前我们自己管理的offset的分区数量与zk元数据里面实际的分区数量,如果不一致就会把新增的分区id给加到TopicAndPartition里面并放入到Map对象里面,这样在启动前就会传入到createDirectStream对象中,就能兼容新增的分区了。

最后一个问题,前面的文章谈到过我们可以有两种方式来更加优雅的停止流程序,分别是通过http暴露服务,和通过HDFS做消息中转来定时扫描mark文件是否存在来触发关闭服务。

下面我们先来看下通过http暴露服务的核心代码:

然后在来看下另一种方式扫描HDFS文件的方式:

上面是两种方式的核心代码,最后提下触发停止流程序:

第一种需要在启动服务的机器上,执行下面封装的脚本:

第二种方式,找到一个拥有HDFS客户端机器,向HDFS上写入指定的文件:

所有代码,已经同步更新到我的github上,有兴趣的朋友可以参考这个链接:

https://github.com/qindongliang/streaming-offset-to-zk

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-12-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 我是攻城师 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档