00:00
好,同学们,这一节开始呢,我们给大家呢,写一个事例啊,那么这个事例呢,它是做一个定时任务,这个定时任务的需求包含这么四步。首先呢,就是我们的定时任务要每30秒呢执行一次,然后呢,他要查询最近30秒的第一条数据,按照时间排序的第一条数据,然后将这条数据转为Jason,再通过HTTP呢,呃,发送发送给我们的simple http post server。啊,那么simple simple http post server呢?是我自己写的啊,用雇员写的一个简单的HTTP服务。啊,那么现在呢,先给大家演示一下它的功能。呃,是这样,我把这个代码呢,放到了,放到了这个getu上,大家呢,可以去这里获取。啊,这里呢,先看一下我的这个项目,那这个程序非常简单,它就是一个简单的HTP服务,它呢只能接收POS的请求,然后呢,它会把你这个每次过来的请求呢,它会把这个请求体打印在呃,我们的终端上。
01:00
啊,那么这底下呢,是一套编译的方法啊,还有它的这个参数的使用。啊,在右边呢,我放了一个发行版,点一下之后呢,你可以在这里,哎,直接下载我编译好的在这个,呃,64位叉八六六十四位啊Linux上的一个可执行程序,那么我这里呢,点一下下载。呃,另外呢,同学如果是访问不了github呢,可以关注我们上硅谷的微信公众号,在这个公众号上呢,回复大数据三个字,也可以获取我们本次的课程资料,到时候呢,我会把这个程序啊,直接放到资料包里面。好,那么这里呢,我们这个程序已经下载好了,现在呢,我把它放到虚拟机上。呃,老样子,我们把这个可以执行的程序呢,全部放在OT Mo目录下啊。然后现在呢,我们去打开这个地方,把我们这个程序呢放上来。好,呃,可以看到呢,上来之后呢,它是一个普通文件,现在我们给它赋予执行权限。
02:05
755SIMPLE。好的,那么现在呢,你就可以直接拿这个文件呢去,呃,开启这个HTP服务了啊老样子,他这个程序呢,是用购语言写的,所以说呢,不需要任何的依赖,直接执行即可。啊,我这里什么都没有报啊,那么实际上呢,它是默认会监听这个本地的8080端口,我们现在呢,可以尝试向他呢去发送一点数据。呃,这里呢,我们只用curl命令,然后杠。Post http,然后local host。8080,然后呢,再给他个,再给他发点数据。啊,可以随便发一些,比如说这个什么吧,呃,随便打一个字符串回车,然后可以看到呢,我们这边的这个服务这边就会把我们这个请求的数据呢,原封不动的给打出来。
03:00
呃,所以说待会儿呢,我们就会往这个地方发送Jason,像这个本机的8080端口呢,去发送Jason数据。啊,这这样呢,可以验证我们这个请求呢,有没有成功的发送出来,呃,另外呢,就是我在这个read me里面写了,就是我这个程序呢,还支持你去设一下这个host的参数,Host和端口参数。那现在呢,我们可以给它换绑一下端口,比如说是这样加个杠P,然后呢,我比如说不要8080了,我想绑在9000端口上。好,那么现在这个程序呢,运行起来了,我们把这个刚才的程序呢,再跑一遍,你发现8080呢,现在已经挂掉了,我们使用9000,把端口号换成9000,好可以看到呢,这里数据又收到了啊,所以说这就是我们这个程序的玩法,待会儿呢,我们把那个定时我跑起来,要是他这里面能定时的打印去去打印数据啊,每隔一会儿呢,去打印一条数据,那就说明我们的定时任务是成功的。好,这里呢,我还是先把它挂掉,换成8080。
04:00
好,现在呢,我们就开始去操作定时任务。呃,我们一直说呢,这个定时任务是一个定期执行的flax脚本啊,那么这里面呢,还是要去写flax查询。呃,所以这里呢,我们可以使用这个data explorer,然后去编写我们的查询。呃,我们要查的这个数据呢,就是test in,还有googles,呃,那么我们知道这个指标呢,我们已经做了很多次查询了,它呢里面只有一条序列。好可以看到啊,这里面就是它的数据。啊,我这里面用查询构造器呢,先这么操作之后呢,我们直接生成我们的class DB脚本。然后呢,我们在它的基础之上去做一些操作,做一些修改,这样呢,方便我们更快的去把这个东西写出来,呃,这里呢,我们先去文档里面把图截一下。把我们的之前这个需求解一下,一步步写。呃,看一下之前的这个需求,本任务的需求。好,截图。再回到我们的web UI。
05:01
呃,可以先看一下啊,我们说每30秒调度一次呢,这这里我们先不说啊,查询先看查询逻辑查询最近30秒的数据。呃,那么现在呢,我们应该写润润是限制我们的时间范围,呃,那么这里呢,先把它修改一下。呃,Stop的这个默认值呢,就是当前的时间,那么我们start时间呢,应该是当前时间的,哎,过去30秒,呃,那么查询说30秒的,最近30秒的第一条数据,呃,那么这里呢,需要用到一个。呃,聚合的函数。先给下管一个管道符,然后这里呢,聚合函数叫做first。这个函数呢,就是可以取到我们这一序列的啊,查询出来这个序列的第一条数据。接下来我们再做一个操作,这个操作呢,要把数据呢转成Jason,呃,此处呢,没有一个可以直接把数据转成Jason的管道函数,所以说呢,我们调一下map函数。
06:02
然后在map函数里面呢,我们知道它允许我们传递一个啊这个匿名函数,所以说我们把这个逻辑呢,转换的逻辑写到匿名函数里面啊,这个参数的意思就是这一行数据啊,这里面一条数据,然后我们的函数定义。好,在这里面呢,就去写我们匿名函数的逻辑,把这里面一行数据转成Jason,然后发送给我们的simple http post server。呃,那这里面呢,其实要用到一个叫做Jason code。呃,这么一条数据,这么一个方法。啊,那么这个函数里面呢,它需要传递一个参数叫做V啊V呢就是可以是我们这一行数据,然后我们把这一行数据传进去之后呢,它会自动的帮我们把整个这一行的数据里面的啊自段那包括标签什么的全部转成了啊一个Jason串,现在呢,我们把这一行直接扔进去,然后这里面标红,你可以看到这里说是哎,我缺少返回值,因为map函数呢,它要求你这个里面的匿名函数啊,必须要有一个返回值。
07:06
呃,现在呢,我们可以去把这个数据呢,直接用HTTP发出去。呃,可以看到啊,我这里一敲Jason之后呢,上面自动补全了这个import jeson的操作。呃,那么我们发送http post请求的方式呢,就是啊,有另外一个包HTTP啊,它的包这个里面。它提供了一个叫做post的函数。呃,可以看到我们import了HTTP,然后port的函数呢,需要。几个参数,一个呢是URL,这个是必须的。呃,那么我们的这个URL呢,我们之前说了是HTTP。一。然后是8080端口。呃,之后呢,我们这个data啊,还需要一个参数叫data,那么heads呢,这里啊,它默认是空的啊,这个不用管。那么data呢,你这里面可以直接把我们的数据放进来了。
08:02
好,那么现在呢,也可以看到整个函数的标红啊,是因为我们现在还没有还没有返回值啊,这个块里面还是没有返回值。啊,另外呢,就是如果说你不大清楚某个函数的用法的话呢,你可以直接在这边搜,比如HTTP,我们可以看到这里面有很多这个HTP的方法啊,其中有一个post。啊,这里面它还有示例啊,就是你可以对照右边这个文档呢,去写这个程序啊,最后呢,可以给大家说一下,就是POS的这个函数呢,它会返回我们请求的状态码。啊,我这里呢,就直接叫code把这个值给它付过来啊,最后呢,因为map要求我们每一个啊,每一个这个这里面的匿名函数必须要有返回值,所以我这里呢,刚好就可以把状态码直接返回出去,Return,然后因为呢,这里record的这个这个返回值呢,类型有要求是必须返回record的类型。啊,这里呢,我直接把这个原始数据带上,然后用位字再把这个扣子带上。
09:03
好,那么第一个呢,就是我们的这个呃,Record里面的这个呃,字段名,然后另外呢,就是我们的真正的取值是这个意思。呃,接下来呢,我们可以先去尝试运行一下这个脚本。啊,也可以看到这个地方报错,是因为找不到这个host,我这里写错了,没有写T。Summit好,可以看到呢,呃,现在只有一个点一个点,我们再看CSV。我再看看这个原始数据。啊,那么可以看到我们现在呢,这个数据里面显示的是。呃,查询的是因为查询的是过去30秒的数据,然后呢,又在30秒里面取了第一行,我问你到这一行的时候是不是只剩一条数据了,然后这条数据呢,我们走了一遍,这个转成Jason再发送给我们的啊服务的一个过程,呃,那么现在呢,我们可以看到加上这个状态码之后呢,状态码是200,说明我们发送的这个请求是成功的,然后这一行数据呢,就多了这么一个字段啊,多了一个扣的这个字段,现在我们再回到我们的终端里面看一下。
10:06
可以看到这个地方呢,我已经成功的接收到了一条呃,Jason格式的数据,呃,那么说明我们的这个flax脚本呢,编写的是没有问题的啊,能够满足我们的需求,现在呢,我们给它配置定时任务,我在这里点一下cos。呃,有一个task,那么这里呢,就是配,就是定时任务的配置好,那么可以看到呢,这里面有一个,呃,你的任务名称,我们就说这个,呃A30秒first啊,每30秒第一条数据。呃,这里呢,有一个A瑞,你可以看到这里面写的是三小时30秒啊,其实呢,它是啊,它是个装饰啊,它不是说这个默认值是3H30秒啊,直接打一个30秒,然后offset呢,这个地方我们先不讲啊,先空着。啊,那么这里面呢,写着20M呢,也不是默认值啊,它其实是空值啊,你不要被它迷惑了。
11:01
呃,这里呢,需要指定一个output bucket,就是把我们这个数据的输出呢啊,在指定一个写到哪个存储桶里面,也就是说它是一个回写in Fla DB的操作啊,把数据再写回去的操作啊,那么这个地方你可能比较好奇,说我只是发送一个HTP请求,为什么要回血这个in Fla DB呢?啊这个地方先不要管,后面呢,我们讲这个。哎,配置任务原理的时候呢,会给大家讲这里为什么要回血。呃,暂时呢,先把这个数据放在零一里面,然后点save as task就把它保存成一个task。好,那么可以看到呢,我们自动跳转到了这个tasks这个页面上,呃,这里面有一个说这active就是正在活跃啊,正在使用的,正在运作的一个定时任务,呃,这一行呢,说的是last complete at几点,那么他这是他这个最终完成的最近一次完成的时间。啊,这里面呢,是他的一个任务信息,也就是每30秒钟调度一次。呃,你可以点一下这个标题,这里面呢,有它的详情啊,你包括这个在这里面呢,你还可以手动触发一次啊,我们的任务也可以直接去编辑,我们的任务,你可以直接编辑它。
12:10
呃,那么底下呢,这个是我们的一个调度的历史记录,可以看到在这个02:51呢,02:51整的,它已经调度过一次,我们刷新几下。啊,等这个时间呢,稍微过去一些。啊,不妨呢,我先现在先去看一下这个,诶可以看到这里面又多了一条,哎,我们的这个Jason数据,诶,刚才又跳了一条,看见没有,现在是三条了啊,这第一条是我们当时手动去触发了一次查询,那后面这两条呢,就是我们这个定时任务带来的这个数据。呃,在这里我把这个页面刷新一下,你可以看到这里面呢,有两次执行记录啊,有两次执行记录,那么这就说明我们的定时任务已经执行,已经配置好了。啊,那么这个地方呢,再带大家看一下之前的我们回写的EXAMPLE01。啊,看一下这个data explorer,然后回到我们的。
13:03
呃,新建一个查询吧。呃,在这个ZONE01里面呢,我们可以看到现在有一个测量名称叫做go gotis,好,都选中完了之后呢,我再点一下submit。啊,那么可以看到呢,这其实呢,是每30秒一份数据啊,它还是一条序列啊,但是呢,这个之前我们的啊gogo是十秒一条,现在呢是30秒一条,我们可以把这个。哎,放大看一下,比如说我现在有相邻相邻的两个点。这点。啊,放大。可以看到是02:58:40,接下来呢,是他后面这个点。好可以看到呢,是呃,02:59:10,他俩之间呢,差差了30秒。啊,那么这就是我们之前这个定时任务的一个工作的情况。呃,到这一步呢,这个事例就完成了。
我来说两句