首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Flink 批处理算子详解

Flink 批处理算子详解

作者头像
Tim在路上
发布2020-08-04 19:59:20
发布2020-08-04 19:59:20
86900
代码可运行
举报
运行总次数:0
代码可运行

批处理程序的结果

  1. 获取运行时
代码语言:javascript
代码运行次数:0
运行
复制
 val env = ExecutionEnvironment.getExecutionEnvironment
  1. 添加Source
代码语言:javascript
代码运行次数:0
运行
复制
val text = env.fromElements("who's there","I think I hear")
  1. 定义算子转换函数
代码语言:javascript
代码运行次数:0
运行
复制
    text.flatMap{_.toLowerCase.split("\\w+") filter(_.nonEmpty)}
        .map((_,1))
        .groupBy(0)
        .sum(1)
  1. 定义Sink
代码语言:javascript
代码运行次数:0
运行
复制
counts.print();
  1. 启动程序
代码语言:javascript
代码运行次数:0
运行
复制
env.execute("Kafka Dataset WordCount")

source 定义

// 递归定义整个目录下的所有文件

代码语言:javascript
代码运行次数:0
运行
复制
    val parameter = new Configuration
    parameter.setBoolean("recursive.file.enumeration",true)
    env.readTextFile("file://path/with/files").withParameters(parameter)

算子

Aggregate

代码语言:javascript
代码运行次数:0
运行
复制
    val input: DataSet[(Int,String,Double)] = env.fromElements(
      (1,"hello",4),
      (1,"hello",5),
      (2,"hello",5),
      (3,"word",6),
      (3,"word",6)
    )
    val value = input.groupBy(1).aggregate(Aggregations.SUM,0).aggregate(Aggregations.MIN,2)

连接

连接分为内连接和外连接,外连接分为左外连接,右外连接和内连接

代码语言:javascript
代码运行次数:0
运行
复制
    val input1 = env.fromElements((1,"hello"),(2,"hello"))
    val input2 = env.fromElements(("hello",1),("word",2))
    
    val result = input1.join(input2).where(0).equalTo(1)

广播变量

  1. 动态数据共享。 算子间共享输入和配置参数是静态的,广播变量共享的数据是动态的

广播变量编程步骤:

(1)创建广播变量。

代码语言:javascript
代码运行次数:0
运行
复制
val toBroadcast = env.fromElements(1,2,3);

(2) 注册广播变量

利用 RichFunction 自定义算子函数,注册广播变量

代码语言:javascript
代码运行次数:0
运行
复制
val toBroadcast = env.fromElements(1,2,3);
  val toBroadcast:DataSet[Int] = env.fromElements(1,2,3);

    toBroadcast.map(new RichMapFunction[String,String]() {
      var broadcastSet  = null

      override def open(parameters: Configuration): Unit = {
        // 读取广播变量
        broadcastSet = getRuntimeContext.getBroadcastVariable[String]("broadcastSetName").get(0)
      }
      override def map(value: String): String = {
        
      }
      // 注册广播变量
    }).withBroadcastSet(toBroadcast,"broadcastSetName");
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 批处理程序的结果
  • source 定义
  • 算子
  • 连接
  • 广播变量
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档