专栏首页大数据成神之路Flink从入门到放弃-Flink分布式缓存

Flink从入门到放弃-Flink分布式缓存

戳更多文章:

1-Flink入门

2-本地环境搭建&构建第一个Flink应用

3-DataSet API

4-DataSteam API

5-集群部署

6-分布式缓存

7-重启策略

8-Flink中的窗口

9-Flink中的Time

1概述

  • Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。
  • 此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。
  • 当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。

2示例

在ExecutionEnvironment中注册一个文件:

//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");

在用户函数中访问缓存文件或者目录(这里是一个map函数)。这个函数必须继承RichFunction,因为它需要使用RuntimeContext读取数据:

DataSet<String> result = data.map(new RichMapFunction<String, String>() {

private ArrayList<String> dataList = new ArrayList<String>();

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

//2:使用文件

File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");

List<String> lines = FileUtils.readLines(myFile);

for (String line : lines) {

this.dataList.add(line);

System.err.println("分布式缓存为:" + line);

}

}

@Override

public String map(String value) throws Exception {

//在这里就可以使用dataList

System.err.println("使用datalist:" + dataList + "------------" +value);

//业务逻辑

return dataList +":" + value;

}

});

result.printToErr();

}

完整代码如下,仔细看注释:

public class DisCacheTest {

public static void main(String[] args) throws Exception{

//获取运行环境

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试

//text 中有4个单词:hello flink hello FLINK env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");

DataSource<String> data = env.fromElements("a", "b", "c", "d");

DataSet<String> result = data.map(new RichMapFunction<String, String>() {

private ArrayList<String> dataList = new ArrayList<String>();

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

//2:使用文件

File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");

List<String> lines = FileUtils.readLines(myFile);

for (String line : lines) {

this.dataList.add(line);

System.err.println("分布式缓存为:" + line);

}

}

@Override

public String map(String value) throws Exception {

//在这里就可以使用dataList

System.err.println("使用datalist:" + dataList + "------------" +value);

//业务逻辑

return dataList +":" + value;

}

});

result.printToErr();

}

}//

输出结果如下:

[hello, flink, hello, FLINK]:a
[hello, flink, hello, FLINK]:b
[hello, flink, hello, FLINK]:c
[hello, flink, hello, FLINK]:d

本文分享自微信公众号 - 大数据技术与架构(import_bigdata)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-02-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 7-Flink的分布式缓存

    Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。

    王知无
  • Flink异步之矛-锋利的Async I/O

    在Flink 流处理过程中,经常需要和外部系统进行交互,用维度表补全事实表中的字段。

    王知无
  • 实时数仓链路分享:kafka =>SparkStreaming=>kudu集成kerberos

    本文档主要介绍在cdh集成kerberos情况下,sparkstreaming怎么消费kafka数据,并存储在kudu里面

    王知无
  • 7-Flink的分布式缓存

    Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。

    王知无
  • LeetCode 804 Unique Morse Code Words

    首先为每个单词的每个字符进行转码, 将转码后的数据放到 Set 集合中, 最后返回 Set 的长度。

    一份执着✘
  • 老板看了我的代码,直呼“666”,说涨工资!

    如何更规范化编写Java 代码的重要性想必毋需多言,其中最重要的几点当属提高代码性能、使代码远离Bug、令代码更优雅。

    程序员小强
  • String s=new String("abc")创建了几个对象?

    String str=new String("abc");   紧接着这段代码之后的往往是这个问题,那就是这行代码究竟创建了几个String对象呢?

    week
  • 这样规范写代码,同事直呼“666”

    zhisheng
  • 你真的懂Java中的String、StringBuilder和StringBuffer吗?

    相信String这个类是Java中使用得最频繁的类之一,并且又是各大公司面试喜欢问到的地方,今天就来和大家一起学习一下String、StringBui...

    java思维导图
  • 聊聊nacos NamingProxy的getServiceList

    nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/net/NamingProxy...

    codecraft

扫码关注云+社区

领取腾讯云代金券