前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink 1.11.2 学习笔记(5)-lambda表达式的使用问题

flink 1.11.2 学习笔记(5)-lambda表达式的使用问题

作者头像
菩提树下的杨过
发布2021-03-12 13:09:34
8120
发布2021-03-12 13:09:34
举报

flink的api,提供了流畅的链式编程写法,写起来行云流水,感觉一下:

代码语言:javascript
复制
SingleOutputStreamOperator<Tuple3<String, Integer, String>> counts = env
        //设置并行度1,方便观察输出
        .setParallelism(1)
        //添加kafka数据源
        .addSource(
                new FlinkKafkaConsumer011<>(
                        SOURCE_TOPIC,
                        new SimpleStringSchema(),
                        props))
        //转变成pojo对象
        .map((MapFunction<String, WordCountPojo>) value -> {
            WordCountPojo pojo = gson.fromJson(value, WordCountPojo.class);
            return pojo;
        })
        //设置watermark以及事件时间提取逻辑
        .assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<WordCountPojo>(Time.milliseconds(200)) {
                    @Override
                    public long extractTimestamp(WordCountPojo element) {
                        return element.eventTimestamp;
                    }
                })
        //统计每个word的出现次数
        .flatMap(new FlatMapFunction<WordCountPojo, Tuple3<String, Integer, String>>() {
            @Override
            public void flatMap(WordCountPojo value, Collector<Tuple3<String, Integer, String>> out) throws Exception {
                String word = value.word;
                //获取每个统计窗口的时间(用于显示)
                String windowTime = sdf.format(new Date(TimeWindow.getWindowStartWithOffset(value.eventTimestamp, 0, 60 * 1000)));
                if (word != null && word.trim().length() > 0) {
                    //收集(类似:map-reduce思路)
                    out.collect(new Tuple3<>(word.trim(), 1, windowTime));
                }
            }
        })
        .keyBy(v -> v.f0)
        //按1分钟开窗(TumblingWindows)
        .timeWindow(Time.minutes(1))
        //允许数据延时10秒
        .allowedLateness(Time.seconds(10))
        //将word的count汇总
        .sum(1);

如果idea环境,使用jdk1.8的话,可能会智能提示,让你把24行改与lambda表达式,看上去更清爽一些:

代码语言:javascript
复制
SingleOutputStreamOperator<Tuple3<String, Integer, String>> counts = env
    //设置并行度1,方便观察输出
    .setParallelism(1)
    //添加kafka数据源
    .addSource(
            new FlinkKafkaConsumer011<>(
                    SOURCE_TOPIC,
                    new SimpleStringSchema(),
                    props))
    //转变成pojo对象
    .map((MapFunction<String, WordCountPojo>) value -> {
        WordCountPojo pojo = gson.fromJson(value, WordCountPojo.class);
        return pojo;
    })
    //设置watermark以及事件时间提取逻辑
    .assignTimestampsAndWatermarks(
            new BoundedOutOfOrdernessTimestampExtractor<WordCountPojo>(Time.milliseconds(200)) {
                @Override
                public long extractTimestamp(WordCountPojo element) {
                    return element.eventTimestamp;
                }
            })
    //统计每个word的出现次数
    .flatMap((FlatMapFunction<WordCountPojo, Tuple3<String, Integer, String>>) (value, out) -> {
        //改成lambda写法
        String word = value.word;
        //获取每个统计窗口的时间(用于显示)
        String windowTime = sdf.format(new Date(TimeWindow.getWindowStartWithOffset(value.eventTimestamp, 0, 60 * 1000)));
        if (word != null && word.trim().length() > 0) {
            //收集(类似:map-reduce思路)
            out.collect(new Tuple3<>(word.trim(), 1, windowTime));
        }
    })
    .keyBy(v -> v.f0)
    //按1分钟开窗(TumblingWindows)
    .timeWindow(Time.minutes(1))
    //允许数据延时10秒
    .allowedLateness(Time.seconds(10))
    //将word的count汇总
    .sum(1);

逻辑完全没变,但是运行后,会遇到一个报错:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.

大致意思是,lambda写法无法提供足够的类型信息,无法推断出正确的类型,建议要么改成匿名类写法,要么用type information提供明细的类型信息。

解决方法:

代码语言:javascript
复制
SingleOutputStreamOperator<Tuple3<String, Integer, String>> counts = env
    .setParallelism(1)
    .addSource(
            new FlinkKafkaConsumer011<>(
                    SOURCE_TOPIC,
                    new SimpleStringSchema(),
                    props))
    .map((MapFunction<String, WordCountPojo>) value -> {
        WordCountPojo pojo = gson.fromJson(value, WordCountPojo.class);
        return pojo;
    })
    .assignTimestampsAndWatermarks(
            new BoundedOutOfOrdernessTimestampExtractor<WordCountPojo>(Time.milliseconds(200)) {
                @Override
                public long extractTimestamp(WordCountPojo element) {
                    return element.eventTimestamp;
                }
            })
    .flatMap((FlatMapFunction<WordCountPojo, Tuple3<String, Integer, String>>) (value, out) -> {
        String word = value.word;
        String windowTime = sdf.format(new Date(TimeWindow.getWindowStartWithOffset(value.eventTimestamp, 0, 60 * 1000)));
        if (word != null && word.trim().length() > 0) {
            out.collect(new Tuple3<>(word.trim(), 1, windowTime));
        }
    })
    //明细指定返回类型
    .returns(((TypeInformation) TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, String.class)))
    .keyBy(0)
    .timeWindow(Time.minutes(1))
    .allowedLateness(Time.seconds(10))
    .sum(1);

27行这里,明细指定返回类型,同时keyBy的写法,略为调整下,就能正常运行了。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-03-10 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档