前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何用MLSQL快速调试Structured Streaming程序

如何用MLSQL快速调试Structured Streaming程序

作者头像
用户2936994
发布2018-09-29 15:08:56
3990
发布2018-09-29 15:08:56
举报
文章被收录于专栏:祝威廉

前言

早上对Structured Streaming 的window函数, Output Mode 以及Watermark有些疑惑的地方。Structured Streaming 的文档偏少,而且网上的文章同质化太严重,基础的不能再基础了,但是我也不想再开个测试的工程项目,所以直接就给予MLSQL来调试。

本地启动一个

根据streamingpro的文档,在本地启动一个local模式的实例,然后打开 127.0.0.1:9003页面,大概是这个样子的。

image.png

测试过程

首先设置一个应用名称。通过

代码语言:javascript
复制
set streamName="streamExample";

来完成.

接着造一些数据:

代码语言:javascript
复制
-- mock some data.
set data='''
{"key":"1","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}
{"key":"2","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:18.002","timestampType":0}
{"key":"3","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:20.003","timestampType":0}
{"key":"4","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:50.003","timestampType":0}
{"key":"5","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:02:01.003","timestampType":0}
{"key":"6","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:02:01.003","timestampType":0}
''';

这里精心调整下timestamp的实验,因为后面那我们测试都是根据这个时间来完成的。

把这些数据模拟成数据源表,我们取名叫newkafkatable1。

代码语言:javascript
复制
-- load data as table
load jsonStr.`data` as datasource;

-- convert table as stream source
load mockStream.`datasource` options 
stepSizeRange="0-3"
as newkafkatable1;

stepSizeRange 表示每个批次随机会给0-3条数据。你也可用fixSize参数,这样可以控制每个批次每次给多条。 接着对数据做个简单的处理。

代码语言:javascript
复制
select cast(key as string) as k,timestamp  from newkafkatable1 
as table21;

对table2 设置一下WaterMark

代码语言:javascript
复制
register WaterMarkInPlace.`table21` as tmp1
options eventTimeCol="timestamp"
and delayThreshold="60 seconds";

按窗口进行聚合,聚合的窗口大小是20秒。

代码语言:javascript
复制
select collect_list(k),
window(timestamp,"20 seconds").start as start,
window(timestamp,"20 seconds").end as end
from table21 
group by window(timestamp,"20 seconds")
as table22;

最后启动该流程序:

代码语言:javascript
复制
save append table22  
as console.`` 
options mode="Complete"
and duration="10"
and checkpointLocation="/tmp/cpl4";

这里采用Complete模式,然后输出打印在console.

我分别尝试了Complete,Append,Update模式,然后调整WarterMark,以及测试数据的timestamp,然后观察情况。

观察完毕,你可以关掉这个流式程序,按住command键点击任务列表,会新开一个窗口:

image.png

点击关闭任务按钮即可。

因为Console 输出不支持从checkpoint recover ,所以你可以手动删除/tmp/cpl4目录。

接着你修改mlsql脚本,然后点击提交即可。

总结

通过完全校本化,界面操作,以及mock数据的支持,可以很方便你进行structured streaming的探索

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.09.06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 本地启动一个
  • 测试过程
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档