首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

flink时间系统系列之ProcessFunction 使用分析

ProcessFunction 是flink 提供面向用户low-level 层级的api,通过ProcessFunction可以访问state、注册处理时间/事件时间定时器来帮助我们完成一些比较复杂的操作,但是其有一个限制那就是只用使用在keyedStream中,是由于根据getRuntimeContext 得到的StreamingRuntimeContext 只提供了KeyedStateStore的访问权限,所以只能访问keyd state, 另外根据前面的分析可知,注册的定时器必须是与key相关,也就解释了在ProcessFunction中只能在keyedStream做定时器注册。目前在flink中,提供了ProcessFunction与KeyedProcessFunction 这两个面向用户的api,但是ProcessFunction却无法帮助我们注册定时器,透过源码(ProcessOperator)可以发现,注册时会主动抛出UnsupportedOperationException异常。今天重点在于分析KeyedProcessFunction 是如何完成定时功能。

02
您找到你想要的搜索结果了吗?
是的
没有找到

java thrift返回List异常

运行时遇到如下异常,原因是由于hmget返回的List含有null成员,导致thrift编码时异常: 20160415 14:55:39 ERROR org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:484) [Thread-0] Unexpected throwable while invoking! java.lang.NullPointerException         at org.apache.thrift.protocol.TBinaryProtocol.writeString(TBinaryProtocol.java:185)         at com.test.redis_cluster_proxy.RedisClusterProxyService$hmget_result$hmget_resultStandardScheme.write(RedisClusterProxyService.java:19434)         at com.test.redis_cluster_proxy.RedisClusterProxyService$hmget_result$hmget_resultStandardScheme.write(RedisClusterProxyService.java:1)         at com.test.redis_cluster_proxy.RedisClusterProxyService$hmget_result.write(RedisClusterProxyService.java:19337)         at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:53)         at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)         at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:478)         at org.apache.thrift.server.TNonblockingServer.requestInvoke(TNonblockingServer.java:115)         at org.apache.thrift.server.AbstractNonblockingServer$AbstractSelectThread.handleRead(AbstractNonblockingServer.java:209)         at org.apache.thrift.server.TNonblockingServer$SelectAcceptThread.select(TNonblockingServer.java:198)         at org.apache.thrift.server.TNonblockingServer$SelectAcceptThread.run(TNonblockingServer.java:154) 当redis中没有相应的field时,hmget返回的List会包含null成员。解决此问题有两个办法: 1)保证查询的field一定存在 2)对hmget返回值做处理,null成员替换成空字符串""

03

Flink-Cep实现规则动态更新

规则引擎通常对我们的理解就是用来做模式匹配的,在数据流里面检测满足规则要求的数据。有人会问为什么需要规则动态变更呢?直接修改了规则把服务重启一下不就可以了吗,这个当然是不行的,规则引擎里面通常会维护很多不同的规则,例如在监控告警的场景下,如果每个人修改一下自己的监控阈值,就重启一下服务,必然会影响其他人的使用,因此需要线上满足规则动态变更加载。本篇基于Flink-Cep 来实现规则动态变更加载,同时参考了Flink中文社区刘博老师的分享(https://developer.aliyun.com/article/738454),在这个分享里面是针对在处理流中每一个Key使用不同的规则,本篇的讲解将不区分key的规则。

03
领券