前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink技术整理

Flink技术整理

作者头像
算法之名
发布2021-01-06 10:55:30
3410
发布2021-01-06 10:55:30
举报
文章被收录于专栏:算法之名算法之名

首先先拉取Flink的样例代码

代码语言:javascript
复制
mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.7.2                         \
      -DarchetypeCatalog=local

实现从文件读取的批处理

建立一个hello.txt,文件内容如下

hello world welcome hello welcome

统计词频

代码语言:javascript
复制
public class BatchJavaApp {
    public static void main(String[] args) throws Exception {
        String input = "/Users/admin/Downloads/flink/data";
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> text = env.readTextFile(input);
        text.print();
        text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] tokens = value.toLowerCase().split(" ");
                for (String token : tokens) {
                    collector.collect(new Tuple2<>(token,1));
                }
            }
        }).groupBy(0).sum(1).print();
    }
}

运行结果(日志省略)

代码语言:javascript
复制
hello welcome
hello world welcome
(world,1)
(hello,2)
(welcome,2)

纯Java实现

文件读取类

代码语言:javascript
复制
public class FileOperation {
    /**
     * 读取文件名称为filename中的内容,并将其中包含的所有词语放进words中
     * @param filename
     * @param words
     * @return
     */
    public static boolean readFile(String filename, List<String> words) {
        if (filename == null || words == null) {
            System.out.println("filename为空或words为空");
            return false;
        }
        Scanner scanner;

        try {
            File file = new File(filename);
            if (file.exists()) {
                FileInputStream fis = new FileInputStream(file);
                scanner = new Scanner(new BufferedInputStream(fis),"UTF-8");
                scanner.useLocale(Locale.ENGLISH);
            }else {
                return false;
            }
        } catch (FileNotFoundException e) {
            System.out.println("无法打开" + filename);
            return false;
        }
        //简单分词
        if (scanner.hasNextLine()) {
            String contents = scanner.useDelimiter("\\A").next();
            int start = firstCharacterIndex(contents,0);
            for (int i = start + 1;i <= contents.length();) {
                if (i == contents.length() || !Character.isLetter(contents.charAt(i))) {
                    String word = contents.substring(start,i).toLowerCase();
                    words.add(word);
                    start = firstCharacterIndex(contents,i);
                    i = start + 1;
                }else {
                    i++;
                }
            }
        }
        return true;

    }

    private static int firstCharacterIndex(String s,int start) {
        for (int i = start;i < s.length();i++) {
            if (Character.isLetter(s.charAt(i))) {
                return i;
            }
        }
        return s.length();
    }
}
代码语言:javascript
复制
public class BatchJavaOnly {
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        String input = "/Users/admin/Downloads/flink/data/hello.txt";
        List<String> list = new ArrayList<>();
        FileOperation.readFile(input,list);
        System.out.println(list);
        Map<String,Integer> map = new HashMap<>();
        list.forEach(token -> {
            if (map.containsKey(token)) {
                map.put(token,map.get(token) + 1);
            }else {
                map.put(token,1);
            }
        });
        map.entrySet().stream().map(entry -> new Tuple2(entry.getKey(),entry.getValue()))
                .forEach(System.out::println);
    }
}

运行结果

代码语言:javascript
复制
[hello, world, welcome, hello, welcome]
(world,1)
(hello,2)
(welcome,2)

从网络传输的流式处理

代码语言:javascript
复制
public class StreamingJavaApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9999);
        text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] tokens = value.toLowerCase().split(" ");
                for (String token : tokens) {
                    collector.collect(new Tuple2<>(token,1));
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5))
                .sum(1).print();
        env.execute("StreamingJavaApp");
    }
}

运行前打开端口

nc -lk 9999

运行代码,在nc命令输入a a c d b c e e f a

运行结果(日志省略)

代码语言:javascript
复制
1> (e,2)
9> (d,1)
11> (a,3)
3> (b,1)
4> (f,1)
8> (c,2)

现在我们将元组改成实体类

代码语言:javascript
复制
public class StreamObjJavaApp {
    @AllArgsConstructor
    @Data
    @ToString
    @NoArgsConstructor
    public static class WordCount {
        private String word;
        private int count;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9999);
        text.flatMap(new FlatMapFunction<String, WordCount>() {
            @Override
            public void flatMap(String value, Collector<WordCount> collector) throws Exception {
                String[] tokens = value.toLowerCase().split(" ");
                for (String token : tokens) {
                    collector.collect(new WordCount(token,1));
                }
            }
        }).keyBy("word").timeWindow(Time.seconds(5))
                .sum("count").print();
        env.execute("StreamingJavaApp");
    }
}

运行结果

代码语言:javascript
复制
4> StreamObjJavaApp.WordCount(word=f, count=1)
11> StreamObjJavaApp.WordCount(word=a, count=3)
8> StreamObjJavaApp.WordCount(word=c, count=2)
1> StreamObjJavaApp.WordCount(word=e, count=2)
9> StreamObjJavaApp.WordCount(word=d, count=1)
3> StreamObjJavaApp.WordCount(word=b, count=1)

当然我们也可以这么写

代码语言:javascript
复制
public class StreamObjJavaApp {
    @AllArgsConstructor
    @Data
    @ToString
    @NoArgsConstructor
    public static class WordCount {
        private String word;
        private int count;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9999);
        text.flatMap(new FlatMapFunction<String, WordCount>() {
            @Override
            public void flatMap(String value, Collector<WordCount> collector) throws Exception {
                String[] tokens = value.toLowerCase().split(" ");
                for (String token : tokens) {
                    collector.collect(new WordCount(token,1));
                }
            }
        }).keyBy(WordCount::getWord).timeWindow(Time.seconds(5))
                .sum("count").print().setParallelism(1);
        env.execute("StreamingJavaApp");
    }
}

keyBy里面是一个函数式接口KeySelector

代码语言:javascript
复制
@Public
@FunctionalInterface
public interface KeySelector<IN, KEY> extends Function, Serializable {
   KEY getKey(IN value) throws Exception;
}

flatMap的lambda表达式写法,比较繁琐,不如匿名类的写法

代码语言:javascript
复制
public class StreamObjJavaApp {
    @AllArgsConstructor
    @Data
    @ToString
    @NoArgsConstructor
    public static class WordCount {
        private String word;
        private int count;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9999);
        text.flatMap((FlatMapFunction<String,WordCount>)(value,collector) -> {
            String[] tokens = value.toLowerCase().split(" ");
            for (String token : tokens) {
                collector.collect(new WordCount(token,1));
            }
        }).returns(WordCount.class)
                .keyBy(WordCount::getWord).timeWindow(Time.seconds(5))
                .sum("count").print().setParallelism(1);
        env.execute("StreamingJavaApp");
    }
}

flatMap还可以使用RichFlatMapFunction抽象类

代码语言:javascript
复制
public class StreamObjJavaApp {
    @AllArgsConstructor
    @Data
    @ToString
    @NoArgsConstructor
    public static class WordCount {
        private String word;
        private int count;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9999);
        text.flatMap(new RichFlatMapFunction<String, WordCount>() {
            @Override
            public void flatMap(String value, Collector<WordCount> collector) throws Exception {
                String[] tokens = value.toLowerCase().split(" ");
                for (String token : tokens) {
                    collector.collect(new WordCount(token,1));
                }
            }
        }).keyBy(WordCount::getWord).timeWindow(Time.seconds(5))
                .sum("count").print().setParallelism(1);
        env.execute("StreamingJavaApp");
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档