前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink 问题记录

flink 问题记录

作者头像
sanmutongzi
发布2020-03-04 15:39:19
7510
发布2020-03-04 15:39:19
举报
文章被收录于专栏:stream process

转发请注明原创地址:http://www.cnblogs.com/dongxiao-yang/p/7652337.html

1 WindowFunction类型不匹配无法编译。

flink 版本:1.3.0

参考https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#windowfunction-with-incremental-aggregation写的demo发现reduce加入MyWindowFunction后编译不通过,报错参数类型不匹配。

代码如下

<!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco; min-height: 15.0px} p.p2 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} p.p3 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco; color: #931a68} p.p4 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco; color: #777777} span.s1 {color: #7e504f} span.s2 {color: #0326cc} span.s3 {text-decoration: underline} span.s4 {color: #931a68} span.s5 {color: #000000} span.Apple-tab-span {white-space:pre} -->

代码语言:javascript
复制
        MyReduceFunction a = new MyReduceFunction();
        
        

        DataStream<Tuple3<String, String, Integer>> counts4 = source

                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time
                        .of(1, TimeUnit.SECONDS)))
                .reduce(a,
                        new WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public void apply(
                                    String key,
                                    TimeWindow window,
                                    Iterable<Tuple2<String, Integer>> values,
                                    Collector<Tuple3<String, String, Integer>> out)
                                    throws Exception {
                                for (Tuple2<String, Integer> in : values) {
                                    out.collect(new Tuple3<>(in.f0, in.f0,
                                            in.f1));
                                }
                            }
                        });    



public static class MyReduceFunction implements
            ReduceFunction<Tuple2<String, Integer>> {

        @Override
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
                Tuple2<String, Integer> value2) throws Exception {
            // TODO Auto-generated method stub
            return new Tuple2<String, Integer>(value1.f0,
                    (value1.f1 + value2.f1));
        }

    }

上述代码在reduce函数加入WindowFunction后代码一直报错,显示reduce函数包含的参数类型不匹配。其实原因出在keyBy(0)这个用法上,DataStream在调用public KeyedStream<T, Tuple> keyBy(int... fields) 和public KeyedStream<T, Tuple> keyBy(String... fields) 这两个方法的时候会调用

private KeyedStream<T, Tuple> keyBy(Keys<T> keys) { return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig()))); }

其中,KeySelectorUtil.getSelectorForKeys返回的是一个ComparableKeySelector类型的KeySelector,而这个类的定义为

public static final class ComparableKeySelector<IN> implements KeySelector<IN, Tuple>, ResultTypeQueryable<Tuple>

根据KeySelector的定义可知,ComparableKeySelector输出的所有key类型都为Tuple,所以上述WindowFunction设置的第三个泛型参数String是不对的。

解决办法

1 自定义KeySelector

代码语言:javascript
复制
    private static class TupleKeySelector implements
            KeySelector<Tuple2<String, Integer>, String> {

        @Override
        public String getKey(Tuple2<String, Integer> value) throws Exception {
            return value.f0;
        }
    }


        DataStream<Tuple3<String, String, Integer>> counts4 = source

                .keyBy(new TupleKeySelector())

上面首先定义了一个TupleKeySelector,返回Key类型为String,然后keyby的参数设置为对应的new TupleKeySelector(),表示keyStream根据一个String类型的Key分区

2 WindowFunction第三个Key泛型由String改为Tuple

参考问题:https://stackoverflow.com/questions/36917586/cant-apply-custom-functions-to-a-windowedstream-on-flink

2 flink-connector-elasticsearch5 接入问题

flink 版本:1.3.2

问题1:

代码语言:javascript
复制
java.lang.UnsupportedClassVersionError: org/elasticsearch/action/bulk/BulkProcessor$Listener : Unsupported major.minor version 52.0
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)

解决方案:这个问题是由于es5以后的版本要求最低的jdk版本是1.8,如果job的默认运行环境jdk低于1.8的话,需要手动指定java版本。

问题1.1:

设置env.java.home: /opt/jdk1.8.0_121 后上述错误依旧存在

目前看来env.java.home这个环境变量在使用yarn-cluster模式提交的情况下对container的JAVA_HOME是不起作用的

解决方案 :提交作业脚本增加-yD yarn.taskmanager.env.JAVA_HOME=/opt/jdk1.8.0_121 参数

参考文档为:https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html#full-reference ,就是说这个参数在1.1的文档里提过,到了1.3反而没有了。。

问题2 :WebUI 500错误,yarn 日志报错

ERROR org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler - Caught exception java.lang.AbstractMethodError at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:73) at io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:107) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62) at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57) at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:105) at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:745)

任务能够正常运行,但是点击ApplicationMaster进入webui的时候报500错误打不开,yarn错误日志如上。

这个问题是flink-connector-elasticsearch5包依赖了一个netty-commom的jar,这个包里面有一个io.netty.util.ReferenceCountUtil类和flink自带的netty-all同名类产生了冲突。

解决方案:netty-commom无法exclude,不然es运行时会缺其他的类,暂时的解决方案是把flink-streaming-java_2.10的<scope>provide</scope>给关掉,把flink原有的ReferenceCountUtil打进fat jar里面,覆盖掉上述错误的class。

<!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco; color: #4e9192} p.p2 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} p.p3 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco; color: #4f76cb} span.s1 {color: #009193} span.s2 {color: #4e9192} span.s3 {text-decoration: underline} span.s4 {color: #000000} span.Apple-tab-span {white-space:pre} -->

代码语言:javascript
复制
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>${flink.version}</version>
            <!-- <scope>provided</scope> -->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch5_2.10</artifactId>
            <version>${flink.version}</version>
        </dependency

3Flink 1.4.1支持yarn node-lable特性

1 手动合并https://github.com/apache/flink/pull/5593这个pr

2 pom文件里的hadoop.version升级到支持node-lable的yarn版本 <hadoop.version>2.8.2</hadoop.version>

4 Caused by: java.lang.ClassNotFoundException: javax.ws.rs.ext.MessageBodyReader

flink 版本 1.5.0

java.lang.NoClassDefFoundError: javax/ws/rs/ext/MessageBodyReader at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.hadoop.yarn.util.timeline.TimelineUtils.<clinit>(TimelineUtils.java:50) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:179) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.getClusterDescriptor(FlinkYarnSessionCli.java:966) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createDescriptor(FlinkYarnSessionCli.java:269) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:444) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:92) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:221) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) at org.apache.flink.client.cli.CliFrontend.lambda$main$14(CliFrontend.java:1096) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) Caused by: java.lang.ClassNotFoundException: javax.ws.rs.ext.MessageBodyReader at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 52 more

打包启动后发现上述错误,解决办法是把flink-shade-hadoop2 pom文件里hadoop-yarn-common 这个依赖对于jersey-core的exclusion取消

代码语言:javascript
复制
<!--<exclusion>-->
   <!--<groupId>com.sun.jersey</groupId>-->
   <!--<artifactId>jersey-core</artifactId>-->
<!--</exclusion>-->

5 could not find implicit value for evidence parameter

参考解释:https://www.iteblog.com/archives/2047.html

解决办法: import org.apache.flink.streaming.api.scala._

<!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} --><!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} --><!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} --><!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} span.s1 {color: #931a68} span.s2 {color: #7e504f} --><!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} span.s1 {color: #931a68} span.s2 {color: #7e504f} --><!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} --><!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} --><!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco; color: #4e9192} span.s1 {color: #009193} span.s2 {color: #000000} --><!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco; color: #4e9192} p.p2 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} p.p3 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco; min-height: 15.0px} span.s1 {color: #000000} span.s2 {color: #009193} span.s3 {color: #4e9192} span.s4 {text-decoration: underline} span.Apple-tab-span {white-space:pre} -->

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017-10-11 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档