通过offsets.retention.minutes设置kafka offset的过期时间

前言

本文记录博主如何设置kafka的offset过期时间并测试其效果

1、offsets.retention.minutes

通过修改offsets.retention.minutes的值即可改变kafka offset的过期时间,单位为分钟,改完之后需要重启kafka。具体的配置文件为$KAFKA_HOME/config/server.properties,原生的kafka配置文件里可能没有这个配置项,自己添加上即可,比如设置过期时间为一小时,那么按如下配置即可

offsets.retention.minutes=60

2、官方文档

网上有的博客说官网文档对于这个配置的说明有点错误,将offsets.retention.minutes错写成了offsets.topic.retention.minutes,但是我查看了一下,官方文档上并没有写错,可能是之前的版本写错了,而且很多博客按之前的版本写的,大家注意一下。官网文档地址http://kafka.apache.org/documentation/

3、ambari的bug

因本人用ambari管理大数据集群的各个组件,所以在界面上直接修改kafka的配置,在界面上查看kafka的配置offsets.retention.minutes为86400000,因为kafka offset默认过期时间为一天,那么根据这个86400000来看offsets.retention.minutes的单位为毫秒才对,所以一开始误认为单位为毫秒,所以修改配置后的时间设置的很大,导致一开始测试不成功,经过一点点的验证,发现单位实际上为分钟,而ambari上显示的86400000应该是个bug,因为kafka默认的配置文件里是没有这个配置项的,所以我估计ambari一开始也没有配置只是搜索的时候将其显示为86400000,而并没有真正的生效,只有将这个配置项修改之后,才会生效,并且单位为分钟(看了以下ambari的大部分默认时间单位都是毫秒~)。 后来在官网上看到offsets.retention.minutes的default为1440也证实了这一点。

4、测试效果

虽然本人的需求是将默认的一天的时间改长一点,但是时间长了测试太慢,所以将时间改短一点测试效果即可,测试代码见Spark Streamming+Kafka提交offset实现有且仅有一次,经过多次测试,得出结论,在修改重启之后,不管是新增加的topic还是之前的topic,只要是新保存的offset都会生效,而之前保存的offset,比如之前是一天才会删除,那么修改重启后,之前保存的offset还是会一天后才能删掉。 注:spark保存offset代码

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

5、注意

offset的过期时间是不精确的,实际上大于等于你设置的时间,假如设置的时间为10分钟,那么困难在10-20之后才会删掉,原因我想应该是kafka会定期的检查offset被标记为应该清理的offset,可能offsets.retention.check.interval.ms这个配置项有关,因为其默认时间为十分钟,但是没有去验证这一点。

  • offsets.retention.check.interval.ms 600000 offset管理器检查陈旧offsets的频率

本文由 董可伦 发表于 伦少的博客 ,采用署名-非商业性使用-禁止演绎 3.0进行许可。

非商业转载请注明作者及出处。商业转载请联系作者本人。

本文标题:通过offsets.retention.minutes设置kafka offset的过期时间

本文链接:https://dongkelun.com/2018/06/21/modifyKafkaOffsetTime/

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏区块链

Web安全常见漏洞修复建议

看各大发布漏洞的平台,发现众多挖洞大神精彩的漏洞发掘过程,但在修复建议或者修复方案处,给出千奇百怪神一般的回复,故而总结一下修复建议(才疏学浅不算太全敬请谅解,...

2616
来自专栏同步博客

memcached的安装以及php两个扩展软件安装(memcache、memcached)

百度云安装包:http://pan.baidu.com/s/1pKZeDwn  k3ap

832
来自专栏安恒网络空间安全讲武堂

翻译 | Linux利用动态链接共享对象库提权

Linux利用动态链接共享对象库提权 RPATH和弱文件权限会导致系统的损害。Linux应用程序可以利用动态链接的共享对象库(我们从现在开始称它们为共享库)来提...

2165
来自专栏androidBlog

Git 配置别名 —— 让命令变得更简单

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/gdutxiaoxu/article/de...

951
来自专栏ASP.NETCore

.NET Core扩展IServiceCollection自动注册服务

在ASP.NET Core中使用依赖注入中使用很简单,只需在Startup类的ConfigureServices()方法中,通过IServiceCollecti...

1322
来自专栏木制robot技术杂谈

懒人神器 autoenv

前言 每次去不同的项目下运行程序都要更改相对应的 Python 环境,那么有什么办法可以省去这繁琐的一步吗?答案肯定是有的,Kenneth Reitz 已经为我...

3906
来自专栏一个爱瞎折腾的程序猿

初次尝试Linux并记录一二

若出现 服务器拒绝了SETP连接,但它监听FTP链接。。。没有安装sshd 解决方案

971
来自专栏网站漏洞修补

ecshop全系列版本网站漏洞修复 清理网站木马后门

ecshop漏洞于2018年9月12日被某安全组织披露爆出,该漏洞受影响范围较广,ecshop2.73版本以及目前最新的3.0、3.6、4.0版本都受此次ecs...

3391
来自专栏用户画像

7.4.1 程序查询方式

信息交换的控制完全由主机执行程序实现,程序查询方式接口中设置一个数据缓冲寄存器(数据端口)和一个设备状态寄存器(状态端口)。当主机进行I/O操作时,首先发出询问...

831
来自专栏SAP最佳业务实践

SAP最佳业务实践:ETO–报价处理(232)-17比较项目版本和实际数据

image.png CN41比较项目版本和实际数据 项目版本是项目某个特定时刻的快照,可以作为一个文档。另外它还可以与实际数据进行比较。这一步,项目版本数据与项...

3628

扫码关注云+社区