前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink-sql 流计算可视化 UI 平台

flink-sql 流计算可视化 UI 平台

作者头像
怀朔
发布2022-05-29 11:28:54
2K0
发布2022-05-29 11:28:54
举报
文章被收录于专栏:运维入门时间运维入门时间

朋友多年自主研发的flink-sql 流计算可视化 UI 平台,细细品味一番确实很好用,做到真正的MSP(混合云场景)多数据多复用的情况实现,下面是这个产品的使用说明看看大家有没有使用场景。

一、简介

flink-streaming-platform-web系统是基于flink封装的一个可视化的web系统,用户只需在web界面进行sql配置就能完成流计算任务,

主要功能包含任务配置、启/停任务、告警、日志等功能。目的是减少开发,完全实现flink-sql 流计算任务

支持本地模式、yarn-per模式、STANDALONE模式

支持udf、自定义连接器等,完全兼容官方连接器

目前flink版本已经升级到1.12

源码地址 https://github.com/zhp8341/flink-streaming-platform-web

效果图

本文demo/文档更新不及时,请移步github https://github.com/zhp8341/flink-streaming-platform-web

二、环境以及安装

1、环境

操作系统:linux

hadoop版本 2+

flink 版本 1.11.1 官方地址: https://ci.apache.org/projects/flink/flink-docs-release-1.11/

jdk版本 jdk1.8

scala版本 2.11

kafka版本 1.0+

mysql版本 5.6+

2、应用安装

1、flink客户端安装

下载对应版本

https://archive.apache.org/dist/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.11.tgz 然后解压

a: /flink-1.11.1/conf

1、YARN_PER模式

文件下面放入hadoop客户端配置文件

core-site.xml

yarn-site.xml

hdfs-site.xml

2、LOCAL模式

3、STANDALONE模式

以上三种模式都需要修改 flink-conf.yaml 开启 classloader.resolve-order 并且设置 classloader.resolve-order: parent-first

b: /flink-1.11.1/lib hadoop集成

代码语言:javascript
复制
下载 flink-shaded-hadoop-2-uber-${xxx}.jar 到lib 
地址  https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

完毕后执行 export HADOOP_CLASSPATH=`hadoop classpath`

2、flink-streaming-platform-web安装

技术选型 springboot2.2.8.RELEASE

a:下载最新版本 并且解压 https://github.com/zhp8341/flink-streaming-platform-web/releases/

代码语言:javascript
复制
 tar -xvf   flink-streaming-platform-web.tar.gz

b:执行mysql语句

mysql 版本5.6+以上 创建数据库 数据库名:flink_web 执行表语句 语句地址 https://github.com/zhp8341/flink-streaming-platform-web/blob/master/docs/sql/flink_web.sql

c:修改数据库连接配置

代码语言:javascript
复制
/flink-streaming-platform-web/conf/application.properties改成上面建好的mysql地址

d:启动web

代码语言:javascript
复制
cd  /XXXX/flink-streaming-platform-web/bin启动 : sh deploy.sh  start停止 :  sh deploy.sh  stop日志目录地址:/XXXX/flink-streaming-platform-web/logs/

e:登录

代码语言:javascript
复制
http://${ip或者hostname}:9084/  如: http://hadoop003:9084/登录号:admin  密码 123456

f:集群

如果需要集群部署模式 简单参考图

三、功能介绍

1、新增任务配置说明

a: 任务名称(*必选)

任务名称不能超过50个字符 并且 任务名称仅能含数字,字母和下划线1

b: 运行模式

YARN_PER( yarn独立模式 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn)

STANDALONE(独立集群 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/cluster_setup.html)

LOCAL(本地集群 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/local.html )

LOCAL 需要在本地单机启动flink 服务 ./bin/start-cluster.sh

c: flink运行配置

1、YARN_PER模式

参数(和官方保持一致)但是只支持 -p -yjm -yn -ytm -ys -yqu(必选) -ys slot个数。 -yn task manager 数量。 -yjm job manager 的堆内存大小。 -ytm task manager 的堆内存大小。 -yqu yarn队列明 -p 并行度 详见官方文档如:-yqu flink -yjm 1024m -ytm 2048m -p 1 -ys 1

2、LOCAL模式

无需配置1

3、STANDALONE模式

代码语言:javascript
复制
-d,--detached                        If present, runs the job in detached                                          mode-p,--parallelism <parallelism>       The parallelism with which to run the                                          program. Optional flag to override the                                          default value specified in the                                          configuration.-s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job                                          from (for example                                          hdfs:///flink/savepoint-1537).其他运行参数可通过 flink -h查看

d: Checkpoint信息

代码语言:javascript
复制
不填默认不开启checkpoint机制 参数只支持-checkpointInterval-checkpointingMode-checkpointTimeout-checkpointDir-tolerableCheckpointFailureNumber-asynchronousSnapshots如:-asynchronousSnapshots true  -checkpointDir   hdfs://hcluster/flink/checkpoints/(注意目前权限)

参数值说明

e: udf地址

udf地址 只支持http并且填写一个 如:http://xxx.xxx.com/flink-streaming-udf.jar 地址填写后 可以在sql语句里面直接写 CREATE FUNCTION jsonHasKey as ascom.yt.udf.JsonHasKeyUDF;

udf 开发demo 详见 https://github.com/zhp8341/flink-streaming-udf

2、系统设置

 系统设置有三个必选项

1、flink-streaming-platform-web应用安装的目录(必选) 这个是应用的安装目录 如 /root/flink-streaming-platform-web/ 2、flink安装目录(必选) --flink客户端的目录 如:/usr/local/flink-1.11.1/

3、yarn的rm Http地址 --hadoop yarn的rm Http地址 http://hadoop003:8088/

4、flink_rest_http_address LOCAL模式使用 flink http的地址

5、flink_rest_ha_http_address STANDALONE模式 支持HA的 可以填写多个地址 ;用分隔

3、报警设置

报警设置用于: 当运行的任务挂掉的时候会告警 资料:钉钉报警设置官方文档:https://help.aliyun.com/knowledge_detail/106247.html

安全设置 关键词必须填写: 告警

效果图

三、配置demo

demo1 单流kafka写入mysqld 参考

demo2 双流kafka写入mysql 参考

demo3 kafka和mysql维表实时关联写入mysql 参考

demo4 滚动窗口

demo5 滑动窗口

代码语言:javascript
复制
创建函数 UTC2Local AS 'com.streaming.flink.udf.UTC2Local';
创建表 source_table (
f0 整数,
f1 整数,
f2 字符串,
过程时间作为过程时间()
) 和 (
'连接器' = '数据生成',
'每秒行数'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='1000',
'fields.f_random.min'='1',
'fields.f_random.max'='1000',
'fields.f_random_str.length'='10'
);
创建表 print_table (
f0 整数,
f1 整数,
f2 字符串,
t2 时间戳(6),
t1 时间戳(6)
) 和 (
'连接器' = '打印'
);
插入 print_table 选择 f0,f1,f2, proctime as t2, UTC2Local(proctime) as t1 from source
代码语言:javascript
复制
创建函数 jsonHasKey 作为 com.xx.udf.JsonHasKeyUDF;

-- 如果使用udf 函数必须配置udf地址


     创建表 flink_test_6 (
  标识 BIGINT,
  day_time VARCHAR,
  数量 BIGINT,
  过程时间作为过程时间()
)
和 (
'connector.properties.zookeeper.connect'='hadoop001:2181',
  'connector.version'='通用',
  'connector.topic'='flink_test_6',
  'connector.startup-mode'='earliest-offset',
  'format.derive-schema'='true',
  'connector.type'='kafka',
  '更新模式'='追加',
  'connector.properties.bootstrap.servers'='hadoop003:9092',
  'connector.properties.group.id'='flink_gp_test1',
  '格式.type'='json'
);


创建表 flink_test_6_dim (
  标识 BIGINT,
  优惠券_amnount BIGINT
)
和 (
   'connector.type' = 'jdbc',
   'connector.url' = 'jdbc:mysql://127.0.0.1:3306/flink_web?characterEncoding=UTF-8',
   'connector.table' = 'test_dim',
   'connector.username' = 'flink_web_test',
   'connector.password' = 'flink_web_test_123',
   'connector.lookup.max-retries' = '3'
);


创建表 sync_test_3 (
                   day_time 字符串,
                   total_gmv 大整数
) 和 (
   'connector.type' = 'jdbc',
   'connector.url' = 'jdbc:mysql://127.0.0.1:3306/flink_web?characterEncoding=UTF-8',
   'connector.table' = 'sync_test_3',
   'connector.username' = 'flink_web_test',
   'connector.password' = 'flink_web_test_123'

);


插入到 sync_test_3
选择
  白天,
  SUM(amnount - coupon_amnount) 作为 total_gmv
从
  (
    选择
      a.day_time 作为 day_time,
      a.amnount 作为 amnount,
      b.coupon_amnount 作为 coupon_amnount
    从
      flink_test_6 作为
      左连接 flink_test_6_dim FOR SYSTEM_TIME AS OF a.proctime as b
     ON b.id = a.id
  )
按天时间分组;

四、支持flink sql官方语法

完全按照flink1.11.1的连接器相关的配置

详见

http://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connect.html

五、其他

1、由于hadoop集群环境不一样可能导致部署出现困难,整个搭建比较耗时.

2、由于es 、hbase等版本不一样可能需要下载源码重新选择对应版本 源码地址 https://github.com/zhp8341/flink-streaming-platform-web

六、问题

1、

Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.

代码语言:javascript
复制
设置 HADOOP_CONF_DIR=/etc/hadoop/conf 因为没有设置 HADOOP_CONF_DIR。



无法从 JAR 文件构建程序。

使用帮助选项(-h 或 --help)获取有关命令的帮助。


解决办法
   导出 HADOOP_HOME=/etc/hadoop
   导出 HADOOP_CONF_DIR=/etc/hadoop/conf
   导出 HADOOP_CLASSPATH=`hadoop 类路径`

   源 /etc/profile

  最好的配置变量
代码语言:javascript
复制

2020-10-09 14:48:22,060 ERROR com.flink.streaming.core.JobApplication - 任务执行失败:
java.lang.IllegalStateException:无法实例化 java 编译器
        在 org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:434)
        在 org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:375)
        在 org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)
        在 org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149)
        在 org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
        在 org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
        在 org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
        在 org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
        在 org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
        在 org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)
        在 org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875)
        在 org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:475)
        在 org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:488)
        在 org.apache.calcite.rel.metadata.RelMetadataQuery.revise(RelMetadataQuery.java:193)
        在 org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:797)
        在 org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:298)
        在 org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
        在 org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
        在 org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
        在 org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
        在 org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
        在 org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
        在 org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
        在 org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
        在 org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
        在 org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        在 org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        在 scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        在 scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        在 scala.collection.Iterator$class.foreach(Ite​​rator.scala:891)
        在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1334)
        在 scala.collection.IterableLike$class.foreach(Ite​​rableLike.scala:72)
        在 scala.collection.AbstractIterable.foreach(Ite​​rable.scala:54)
        在 scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        在 scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        在 org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        在 org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
        在 org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
        在 org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        在 org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
        在 org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
        在 org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
        在 org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
        在 com.flink.streaming.core.JobApplication.callDml(JobApplication.java:138)
        在 com.flink.streaming.core.JobApplication.main(JobApplication.java:85)
        在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        在 java.lang.reflect.Method.invoke(Method.java:498)
        在 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
        在 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
        在 org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
        在 org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
        在 org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
        在 org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
        在 org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
        在 java.security.AccessController.doPrivileged(本机方法)
        在 javax.security.auth.Subject.doAs(Subject.java:422)
        在 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
        在 org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        在 org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
引起:java.lang.ClassCastException:org.codehaus.janino.CompilerFactory 不能转换为 org.codehaus.commons.compiler.ICompilerFactory
        在 org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
        在 org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
        在 org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:432)
        ... 60 更多

主要日志目录

1、web系统日志

/{安装目录}/flink-streaming-platform-web/logs/

2 、flink客户端命令

FLINKHOME/log/flink−{FLINK_HOME}/log/flink-FLINKHOME/log/flink−{USER}-client-.log

七、RoadMap

1、支持除官方以外的连接器

2、 支持Flink Session模式

八、生活

赠送泸沽湖早上的风景--必去一种圣地

工作再累也要注意身体,时代鼓舞勇者 作为平凡的人生 也要劳逸结合。我们都是文明人,都知道加班无可避免。但愿大家(老板)对加班很大宽容程度[流泪][流泪][流泪][流泪]年轻的时候想着去旅游去玩耍 现实造就我们种种原因 兄弟们大胆冲啊 此时此刻 非去不可 此时不去 更待何时 加油吧 奥利给

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 运维入门时间 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、简介
  • 二、环境以及安装
    • 1、环境
      • 2、应用安装
        • 1、flink客户端安装
        • 2、flink-streaming-platform-web安装
    • 三、功能介绍
      • 1、新增任务配置说明
        • 2、系统设置
          • 3、报警设置
          • 三、配置demo
          • 四、支持flink sql官方语法
          • 五、其他
          • 六、问题
          • 七、RoadMap
          • 八、生活
          相关产品与服务
          大数据
          全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档