我想加入一个大的小溪和一个小的小溪。我想广播较小的流,然后将其连接到较大的流。
但是,我不确定如何存储广播的模式,以及如何在processElement method
中查找匹配的模式,然后将这两个元素组合在一起。
编辑:我已经设法使用下面的代码片段制作了一个广播加入的原型。我调整了在官方训练资源库中找到的普通连接:https://github.com/apache/flink-training/blob/release-1.13/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala
这似乎是有效的,但我不确定我的逻辑是否正确。
//The main function has been abbreviated for ease of reading
def main(){
val rides = env
.addSource(rideSourceOrTest(new TaxiRideGenerator()))
.filter { ride => ride.isStart }
// .keyBy { ride => ride.rideId }
val fares = env
.addSource(fareSourceOrTest(new TaxiFareGenerator()))
val broadcastStateDescriptor = new MapStateDescriptor[Long,TaxiFare]("fares_broadcast",classOf[Long],classOf[TaxiFare])
val faresBroadcast: BroadcastStream[TaxiFare] = fares
.broadcast(broadcastStateDescriptor)
val result: DataStream[(TaxiRide,TaxiFare)] = rides
.connect(faresBroadcast)
.process(new BroadcastJoin())
}
class BroadcastJoin() extends BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]{//IN1, IN2, OUT。 That is, non broadcast stream type, broadcast stream type and output stream type
//Broadcast state descriptor
private lazy val broadcastStateDescriptor = new MapStateDescriptor[Long,TaxiFare]("fares_broadcast",classOf[Long],classOf[TaxiFare])
//Process the broadcast stream element, value is the broadcast stream element passed in, and the modifiable broadcast state can be obtained through CTX
override def processBroadcastElement(value: TaxiFare, ctx: BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]#Context, out: Collector[(TaxiRide,TaxiFare)]): Unit = {
val broadcast_status: BroadcastState[Long,TaxiFare] = ctx.getBroadcastState(broadcastStateDescriptor)
if(broadcast_status.contains(value.rideId)){
broadcast_status.remove(value.rideId)
}
broadcast_status.put ( value.rideId , value) // add the broadcast stream element to the broadcast state, which will be saved in local memory
}
//Handle non broadcast stream elements. Value is the non broadcast stream element passed in. Only read-only broadcast status can be obtained through CTX
override def processElement(value: TaxiRide, ctx: BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]#ReadOnlyContext, out: Collector[(TaxiRide,TaxiFare)]): Unit = {
//Read broadcast status
val broadcast_status: ReadOnlyBroadcastState[Long, TaxiFare] = ctx.getBroadcastState(broadcastStateDescriptor)
if(broadcast_status.contains(value.rideId)) {
val foundMatch = broadcast_status.get(value.rideId)
out.collect((value, foundMatch)) //Send out the desired results
}
}
}
发布于 2021-09-08 14:56:52
例如,高流量流可能是金融交易,而低流量广播流可能是各种货币对美元的外汇汇率。您的目标可能是将所有交易标准化为USD。
您将需要描述作为键到值的映射而传播的任何数据。
在这种情况下,您可以广播一个映射,其中键是一种货币(例如,欧元),值是该货币对美元的最新汇率。然后在BroadcastProcessFunction的processBroadcastElement方法中存储这些汇率,在processElement方法中从广播状态查找传入事务的相关汇率。
(这可能不是实现此特定目标的最佳方法--这只是我想到的第一个示例。)
https://stackoverflow.com/questions/69104800
复制