首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >后间隔连接做聚合

后间隔连接做聚合
EN

Stack Overflow用户
提问于 2022-07-23 05:14:42
回答 1查看 51关注 0票数 0

我有两个流做间隔连接,streamA是左流,而streamB是右流,代码基如下所示:

代码语言:javascript
运行
复制
streamA
  .keyBy((a: EventA) => a.common_key)
  .intervalJoin(
      streamB
        .keyBy((b: EventB) => b.common_key)
    )
  .between(Time.seconds(0), Time.minutes(5))
  .process(new ProcessJoinFunction<PojoA, PojoB, Result>() {

                    @Override
                    public void processElement(PojoA left, PojoB right, Context ctx, Collector<Result> out) throws Exception {
                        out.collect(Result.build(left, right));
                    }
                })

我得到了一个pojo result由PojoA和PojoB在间隔后加入。result包含一些pojoA和pojoB维度和度量字段,如下所示:

代码语言:javascript
运行
复制
class result {
   long userId; // it's common key
   String name; //from pojoA
   long number; // from pojoA
   String shop; // from pojoB
   long orders; // from pojoA
   double price: //from pojoA
    
   
}

情况是一个streamA可能匹配多个streamB,所以在加入之后,我需要聚合为sum ordersprice进行连接流,然后回到pojo result.for示例,有两个连接记录:

代码语言:javascript
运行
复制
joined record 1: (123, "nameA", 455, "shop", 3, 4.2)
joined record 2: (123, "nameA", 455, "shop", 6, 4.8)

after processing, should be: (123, "nameA", 455, "shop", 9(3+6), 1(4.2+4.8)/(3+6))

如何编写聚合函数来实现它?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-07-25 22:18:53

您可以在约简函数之后执行一个简单的.keyBy,如下所示:

代码语言:javascript
运行
复制
  .keyBy(r -> r.getUserId())
  .reduce(new YourReduceFunction))

其中YourReduceFunction看起来类似于:

代码语言:javascript
运行
复制
public class YourReduceFunction implements ReduceFunction<result> {

    result reduce(result v1, result v2) {
        // calculate the sum of orders, sum of prices, and average price
        // Note you need a new sumOfPrices field to correctly calc average
        // price.
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73088196

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档