首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在Java中使用Kafka Streams创建不同的流?

如何在Java中使用Kafka Streams创建不同的流?
EN

Stack Overflow用户
提问于 2019-07-03 14:46:30
回答 1查看 191关注 0票数 0

我正在尝试从Kafka Java中的KStream创建一个“不同”的流。

我有一个输入流,其中的值是一组Double V0…Vn。输出流应计算V0 - 0、V1 - V0、V2 - V1…之间的差异Vn -Vn-1。

我的第一个想法是这样做:

代码语言:javascript
运行
复制
    KStream<String, Double> stream = builder.stream(TOPIC)

    KTable<String, Double> difference = stream.groupByKey().reduce(
            (oldValue, newValue) -> {
              return newValue - oldValue
            }
    ).toStream()

假设我有一个具有下列值的KStream输入:

代码语言:javascript
运行
复制
Key  -> Value
"A1" -> 2 
"B2" -> 4
"A1" -> 6
"A1" -> 10
"B2" -> 13 
"A1" -> 7

我想用下面的值创建一个新的Stream输出:

代码语言:javascript
运行
复制
Key  -> Value
"A1" ->  2  (2-0  =  2) 
"B2" ->  4  (4-0  =  4)
"A1" ->  4  (6-2  =  4)
"A1" ->  4  (10-6 =  4)
"B2" ->  9  (13-4 =  9)
"A1" -> -3  (7-10 = -3)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-07-03 17:40:05

您可以使用类似于

代码语言:javascript
运行
复制
        stream.groupByKey().aggregate(Diff::new, new Aggregator<String, Double, Diff>() {

        @Override
        public Diff apply(String key, Double newValue, Diff aggregate) {
            Double difference = newValue - aggregate.getLastValue();
            aggregate.setDifference(difference);
            aggregate.setLastValue(newValue);
            return aggregate;
        }
        }).mapValues(new ValueMapper<Diff, Double>() {

        @Override
        public Double apply(Diff value) {
            return value.getDifference();
        }

    }).toStream().to("diff");

哪里

代码语言:javascript
运行
复制
public class Diff {

  private Double lastValue = 0d;

  private Double difference = 0d;
  //getters and setters
  // ...
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56864385

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档