我有两个流做间隔连接,streamA是左流,而streamB是右流,代码基如下所示:
streamA
.keyBy((a: EventA) => a.common_key)
.intervalJoin(
streamB
.keyBy((b: EventB) => b.common_key)
)
.between(Time.seconds(0), Time.minutes(5))
.process(new ProcessJoinFunction<PojoA, PojoB, Result>() {
@Override
public void processElement(PojoA left, PojoB right, Context ctx, Collector<Result> out) throws Exception {
out.collect(Result.build(left, right));
}
})我得到了一个pojo result由PojoA和PojoB在间隔后加入。result包含一些pojoA和pojoB维度和度量字段,如下所示:
class result {
long userId; // it's common key
String name; //from pojoA
long number; // from pojoA
String shop; // from pojoB
long orders; // from pojoA
double price: //from pojoA
}情况是一个streamA可能匹配多个streamB,所以在加入之后,我需要聚合为sum orders和price进行连接流,然后回到pojo result.for示例,有两个连接记录:
joined record 1: (123, "nameA", 455, "shop", 3, 4.2)
joined record 2: (123, "nameA", 455, "shop", 6, 4.8)
after processing, should be: (123, "nameA", 455, "shop", 9(3+6), 1(4.2+4.8)/(3+6))如何编写聚合函数来实现它?
发布于 2022-07-25 22:18:53
您可以在约简函数之后执行一个简单的.keyBy,如下所示:
.keyBy(r -> r.getUserId())
.reduce(new YourReduceFunction))其中YourReduceFunction看起来类似于:
public class YourReduceFunction implements ReduceFunction<result> {
result reduce(result v1, result v2) {
// calculate the sum of orders, sum of prices, and average price
// Note you need a new sumOfPrices field to correctly calc average
// price.
}
}https://stackoverflow.com/questions/73088196
复制相似问题