00:00
大家好,欢迎收看pat winnk实战系列教程,本章节为进阶系列六复杂事件处理腾讯云六计算LIST10大数据产品生态体系的实时化分析利器,兼容opachlink作业。Linknk cep复杂事件处理既支持data string API,也支持table色lel。下面将教大家学习如何利用oceanist jar作业,通过在data string程序,通过读取kafka中股票的数据去查找到股票的地点,完成了复杂事件的处理,并将结果输出到kaka的另一个top中去。首先进入Co go实例,新建两个topic user jar作业的数据源端和目的端。
01:07
其次,在本地的IDE中创建maven工程,编写Java jar作业。Page poem文件添加必要的依赖项。添加flink和心包、kafka connector和flink cep依赖包,并添加maven assembly插件,设置好主类。创建com demos ball,编写Java文件。分别编写stock类、序列化、反序列化类和cep test乳类。
02:01
Stock v中须实现equals和hash code方法用于模式匹配中实元素比较。Stock序列化反序列化类。第一步,设置data stream运行环境。
03:03
第二步,配置数据源,这里使用kafka作为source来读取股票实时价格信息。第三步,数据处理。这里我们定义了一个复杂事件,当股票价格初始状态为元,随后出现了元后又紧接着又大于十元,则认为找到了股票的低点。第四步,输出处理完的数据。这里我们使用了flink kafka producer将数据输出到kafka。配置producer时去把kafka配置信息也添加到producer中去。第五步,执行程序。
04:07
编写完Java程序后,在命令行终端执行打包命令。Maven clean package。随后将打包好的jar ball上传到oceanist平台,进入ocean控制台,在依赖管理菜单中将ball上传。然后创建至作业。点击开发调试,进入作业开发页面,在主程序包选择上传的依赖包并填写主类。
05:04
因为这里使用到ocean major kafka连接器,所以需要在作业参数面板中选择对应的kafka connector。点击发布草稿,运行jar作业。我们可以看到作业已经处于运行中状态,接下来进入c Co Co控制台,发送模拟数据到对应的topic。在接收数据的top中可以看到数据已经正常写入。
06:08
到此为止,我们进阶系列的演示就结束了,欢迎大家一元购体验流计算lchless。
我来说两句