前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink DataStream中CoGroup实现原理与三种 join 实现

Flink DataStream中CoGroup实现原理与三种 join 实现

作者头像
Flink实战剖析
发布2022-04-18 12:05:33
1.7K0
发布2022-04-18 12:05:33
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

CoGroup

CoGroup 表示联合分组,将两个不同的DataStream联合起来,在相同的窗口内按照相同的key分组处理,先通过一个demo了解其使用方式:

代码语言:javascript
复制
case class Order(id:String, gdsId:String, amount:Double)

case class Gds(id:String, name:String)

case class RsInfo(orderId:String, gdsId:String, amount:Double, gdsName:String)

object CoGroupDemo{

def main(args:Array[String]):Unit={

    val env =StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    val kafkaConfig =newProperties();

    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");

    val orderConsumer =newFlinkKafkaConsumer011[String]("topic1",newSimpleStringSchema, kafkaConfig)

    val gdsConsumer =newFlinkKafkaConsumer011[String]("topic2",newSimpleStringSchema, kafkaConfig)

    val orderDs = env.addSource(orderConsumer)

.map(x =>{

        val a = x.split(",")

Order(a(0), a(1), a(2).toDouble)

})

    val gdsDs = env.addSource(gdsConsumer)

.map(x =>{

        val a = x.split(",")

Gds(a(0), a(1))

})



    orderDs.coGroup(gdsDs)

.where(_.gdsId)// orderDs 中选择key

.equalTo(_.id)//gdsDs中选择key

.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))

.apply(newCoGroupFunction[Order,Gds,RsInfo]{

overridedef coGroup(first: lang.Iterable[Order], second: lang.Iterable[Gds],out:Collector[RsInfo]):Unit={

//得到两个流中相同key的集合

}

})

    env.execute()

}}
 

从源码角度分析CoGrop的实现

  1. 两个DataStream进行CoGroup得到的是一个CoGroupedStreams类型,后面的where、equalTo、window、apply之间的一些转换,最终得到一个WithWindow类型,包含两个dataStream、key选择、where条件、window等属性
  2. 重点:WithWindow 的apply方法
  • 对两个DataStream打标签进行区分,得到TaggedUnion,TaggedUnion包含one、two两个属性,分别对应两个流
  • 将两个打标签后的流TaggedUnion 进行union操作合并为一个DataStream类型流unionStream
  • unionStream根据不同的流选择对应where/equalTo条件进行keyBy 得到KeyedStream流
  • 通过指定的window方式得到一个WindowedStream,然后apply一个被CoGroupWindowFunction包装之后的function,后续就是window的操作

到这里已经将一个CoGroup操作转换为window操作,接着看后续是如何将相同的key的两个流的数据如何组合在一起的

1. 在用户定义CoGroupFunction 被CoGroupWindowFunction包装之后,会接着被InternalIterableWindowFunction包装,一个窗口相同key的所有数据都会在一个Iterable中, 会将其传给CoGroupWindowFunction

2. 在CoGroupWindowFunction中,会将不同流的数据区分开来得到两个list,传给用户自定义的CoGroupFunction中

JOIN

在理解了coGroup的实现后,join实现原理也就比较简单,DataStream join 同样表示连接两个流,也是基于窗口实现,其内部调用了CoGroup的调用链,使用姿势p与调用流程跟CoGroup及其相似,主要有以下两点不同:

  1. 不在使用CoGroupFunction,而是JoinFunction,在JoinFunction里面得到的是来自不同两个流的相同key的每一对数据
  2. 函数调用链

中间增加了FlatJoinCoGroupFunction函数调用,使用嵌套遍历方式得到两个流的笛卡尔积传给用户自定义函数

Left/Right join实现分析

Flink 中DataStream 只提供了inner join 的实现,并未提供left join 与 right join 的实现,那么同样可以通过CoGroup来实现这两种join,以left join 为例,处理逻辑在CoGroupFunction中,实现如下:

代码语言:javascript
复制
overridedef coGroup(first: lang.Iterable[Order], second: lang.Iterable[Gds],out:Collector[RsInfo]):Unit={



          first.foreach(x =>{

if(!second.isEmpty){

               second.foreach(y=>{

out.collect(newRsInfo(x.id,x.gdsId,x.amount,y.name))

})

}

if(second.isEmpty){

out.collect(newRsInfo(x.id,x.gdsId,x.amount,null))

}

})

}

end

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-11-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CoGroup
  • JOIN
  • Left/Right join实现分析
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档