00:00
好,接下来呢,我们来看一下,按照指定时间进行一个消费,那这项功能呢,在生产环境当中用的还是非常多的啊,那假设现在呢,是下午两点半。那如果说我想消费一下昨天开始下午两点半到现在这块的数据,我重新进行一个消费,因为我觉得这个一天前的数据有问题。啊,或者说呢,我想按照某一个时间点,然后往后去进行消费,比如说我现在希望从12点以后的数据,我重新来消费一下。重新进行一个处理,那这样行不行呢?哎,当然呢也是可以的啊,那下面呢,我们就实现一下对应的这个功能。那来到这里面,我们基于刚才的指定位置进行消费,在他代码上进行一个修改。加上一个time。拿到它之后,我们来看一下。上一个案例啊,我们是指定了位置进行一个消费,那他有没有按照时间进行一个消费的呢?比如说我们看。Much啊,比如说卡巴点,哎,你会发现这里面没有跟时间相关的任何东西,它还是只有这么一个sick啊,是他。
01:08
那这个怎么办呢?大家想我这里面呢,是这个现在指定了offet,那如果我能把时间转换成对ET,那这事是不是就OK了。也就说,把我们的时间转换成off。那这个怎么转呢?哎,大家思考一下啊,那看一下这个我们希望啊。希望把时间。转换为对应的。那看一下有没有类似的这种API,你打一个O,你看说offset for time,比如说你这个offset是来源于时间对应的这个API,那这里面呢,需要传入一个集合,集合呢是呃,Topic partan以及辣。好,那需要一个集合,我就给他一个集合。来呗,你一个哈希map,那它的key就是对应的topic part,那它对应的这个Y流值了。
02:09
那这样就行了呗。点吧。现在呢,就给过来对应的这个集合的值。那把这个集合往这里面一放,它不就有了吗?点吧。那现在呢,就能够获取到对应的信息,只不过呢,目前情况下,这个集合里面的数据是不是空的,哎,你说没有往里面填数据,那我们就填一下对应的数据。封装对应的集合。那怎么封装这个集合呢?这里面是有对应的分区信息以及law啊,也就offset,那这个分区怎么获取啊,是不是可以通过循环便利这个assignment,哎,便利它来获取啊。那么来。下面的点for。这样呢,我就拿到了对应的分区信息,然后向这个集合里面添加数据。
03:00
Put,那put的话呢,第一个K呢,就是这个topic partan,那对应的这个Y流值是什么呢?这个Y值啊,是对应的传入的时间啊时间,因为这个时间未来会帮我们转换成对应的。那这个时间怎么给呢?首先你得获取一下当前系统的时间,System current time好。那如果我想获取当前时刻一天前的数据,那我就减去,减去谁,如果是一天,那我乘以24小时,我再乘以3600秒,我要变成毫秒的话,我再乘以个1000。这不就OK了吗?是这样吗?把这个替换一下,哎,这样呢就OK了啊,也就说我就算出来当前时刻一天前的一个奥赛的位置啊,拿到它之后呢,就得到它,得到它这里面有值了,那这里面有值了,我现在要替换这里面的offset,那它怎么能拿到对应的offet呢?你传进去对应的分区取出来的不就是奥赛的值吗?
04:08
那你说这样,拿它点get,你传进去对应的分区。取出来的。两不就是对应的ET。那你把这个offset往这里面一放。点offet这不就来了吗?啊捋一下这个逻辑啊,首先呢,我们是按照指定位置这种方式啊来处理,这里面位置呢是需要一个offset,那这个offset呢,我希望呢,是通过时间的转换获取到它。那通过时间转换,那就往前推啊往前推,那这里面呢,它有一个对应的API是这个。叫通过时间获取到对应,那它里面需要一个什么呢?需要一个集合。那么这个集合啊,它的方式呢,是采用这种方式啊,也说分区加上这个菜啊,它那有了这个集合之后,你得需要往这个集合里面添加数啊,那这里面用到了多个分区,那多个分区只有这种方式,通过塞门呢,它循环便利就能取出一个一个的分区,那好取出来一个一个分区,我向这个分区里面添加上对应的时间,那这个时间未来呢,通过这个API就能帮我们获取到对应的。
05:19
好,那这个fet有了之后,那你就是传进去对应的这个分区,传进分区取出来off。那这边一放这不就OK了吗?啊就是这样啊行,那这个逻辑呢,相对来说要绕一下,大家呢好好捋一捋,那下面呢,我们来执行测试一下,看它好不好用啊。右键执行。你看现在呢,就按照时间的这种方式获取到了一天前的数据。
我来说两句