专栏首页Spark学习技巧flink 有状态udf 引起血案一

flink 有状态udf 引起血案一

场景

最近在做一个画像的任务,sql实现的,其中有一个udf,会做很多事情,包括将从redis读出历史值加权,并将中间结果和加权后的结果更新到redis。

大家都知道,flink 是可以支持事件处理的,也就是可以没有时间的概念,那么在聚合,join等操作的时候,flink内部会维护一个状态,假如此时你也用redis维护了历史状态,也即是类似 result = currentState(flink)+lastState(redis),且此时要针对计算的结果用where进行筛选.

SQL如下

CREATE VIEW view_count AS
select
  `time`,
  gid,
  cid,
  count(feed_id) * 1 as strength
FROM
  view_cid
GROUP BY
  gid,
  cid,`time`;

CREATE VIEW view_strength AS select
  `time`,
  gid,
  cid ,
  Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95)  as `result`
FROM
  view_count
;

insert into
  hx_app_server_sink_common
SELECT
  gid,
  cid,
  `result`
FROM
  view_strength
where `result` <> '0.0' 
GROUP BY
  gid,
  cid,
  `result`;

业务分析

第一个sql视图完成的是首先分组,然后统计某一个字段并乘以权重;

第二个sql视图,udf :Get_Strength_Weaken完成当前值和历史值叠加工作,历史值存储在redis,同时将结果返回并更新redis,返回值作为result字段。

第三个sql在输出的时候,result字段作为了where的条件和group by里的字段。

这时候生成的flink概图如下:

观察中间的结构图可以发现,Get_Strength_Weaken被调用两次:

1. where条件,这个的生成是由于第三条sql

where `result` <> '0.0'

产生的执行计划,是不是看起来很懵逼。。。

2. select里面还有一次调用Get_Strength_Weaken,这个很明显。

当然,可以打印一下flink udf里eval函数的调用细节日志,很容易发现重复调用的问题,浪院长这个也是通过分析日志,对比输出结果来得出的论。

综合上面分析和udf调用日志,结论就是udf被调用了两次。

对于这个flink的udf被多次调用引起的结果偏大,整整调试了一下午。

由于上面分析可以得出结论,flink将where条件下推了,where 条件判断会先执行,而select里后执行,那么可以调整SQL,如下:

CREATE VIEW view_count AS
select
 `time`,
 gid,
 cid,
 count(feed_id) * 1 as strength
FROM
 view_cid
GROUP BY
 gid,
 cid,`time`;

CREATE VIEW view_strength AS select
 `time`,
 gid,
 cid ,
getResult(gid,cid) as `result`
FROM
 view_count
where Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95)  as `result` <> '0.0'
;

insert into
 hx_app_server_sink_common
SELECT
 gid,
 cid,
 `result`
FROM
 view_strength
GROUP BY
 gid,
 cid,
 `result`;

那么实际上,select里的udf主要目的是取出来计算结果,那么这个时候可以写个简单的udf--getResult,只让他从redis获取 where条件里更新到redis里的结果,由于该udf是无状态的即使多次调用,也无所谓。

所以,总结一下,对于flink 来说,由于基于事件的处理,聚合、join等操作会有状态缓存,那么此时再用到含有外部存储状态的udf,一定要慎重,结合执行计划,来合理放置udf的位置,避免出错。

当然,调试阶段最好是有详细的日志,便于分析和定位问题。

flink 状态删除

其实,flink聚合等内部状态有配置可以使其自动删除的,具体配置使用如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12));

// define query
Table result = ...

// create TableSink
TableSink<Row> sink = ...

// emit result Table via a TableSink
result.writeToSink(sink, qConfig);

// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);

[完]

本文分享自微信公众号 - Spark学习技巧(bigdatatip)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-10-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 深入理解Apache Flink核心技术

    Spark学习技巧
  • 最近做大数据面试官的感想

    Spark学习技巧
  • Java面试知识点解析——JVM篇

    Spark学习技巧
  • golang slice N选3 组合

    package main import( "fmt" ) func main(){ var a = []int32{1,2,3,...

    李海彬
  • 异名解题:7. 整数反转

    给出一个 32 位的有符号整数,你需要将这个整数中每位上的数字进行反转。注意:假设我们的环境只能存储得下 32 位的有符号整数,则其数值范围为 [−2³¹, ...

    异名
  • 2016 小回顾

    时间很快, 已经走到了 2016 的末尾, 惯例的做个小回顾。(注:这篇起笔的时间是圣诞节TAT)

    Jintao Zhang
  • ES6特性之:Rest参数

    其实在JavaScript中,通过使用arguments对象也能实现这种可变参数的能力,但是,arguments对象本身有点奇怪,它看起来像一个数组,但其实它不...

    一斤代码
  • 请简单说明一下什么是迭代器?

    Iterator提供了统一遍历操作集合元素的统一接口, Collection接口实现Iterable接口,

    剑走天涯
  • 据说想要学好C++,这几本书一定要看

    我之前问过ACM大神,如何学好C++?他说最好的办法就是读书,读大量的书,就可以解决。要把C++作为日常语言,而不是一种程序语言,这样就好办了。

    谭庆波
  • [Leetcode][python]Simplify Path

    化简Unix系统下一个文件的绝对路径。 输入: path = “/a/./b/../../c/”

    后端技术漫谈

扫码关注云+社区

领取腾讯云代金券