Kafka流中多个分区的聚合

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (1)
  • 关注 (0)
  • 查看 (112)

正如我在上面链接到的问题的答案中提到的,不可能直接进行这种跨分区聚合。但是,答复者提到,如果消息具有相同的键(在本例中为真),则有可能。如何做到这一点?

我还希望能够从一个“全局”状态存储中查询这些聚合值,该存储被复制到KafkaStreams应用程序的每个实例中。

我的第一个想法是用GlobalKTable。但是,此状态存储的Changelog主题具有与原始“Events”主题相同的分区数,并且只是在每个分区的基础上进行聚合,而不是跨所有分区进行聚合。

这是我的应用程序的精简版本-不太确定从哪里开始:

final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "metrics-aggregator");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomDoubleSerde.class.getName());
streamsConfig.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), 0);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, Double> eventStream = builder.stream(INCOMING_EVENTS_TOPIC);
KTable<String, Double> aggregatedMetrics = eventStream
        .groupByKey()
        .aggregate(() -> 0d, (key, value, aggregate) -> value + aggregate);

aggregatedMetrics.toStream().print(Printed.<String, Double>toSysOut());
aggregatedMetrics.toStream().to(METRIC_CHANGES_TOPIC);

final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.cleanUp();
streams.start();

builder.globalTable(METRIC_CHANGES_TOPIC, Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as(METRICS_STORE_NAME));

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    streams.close();
}));
提问于
用户回答回答于

KafkaStreams假设输入主题是按键划分的。这个假设不适用于你的情况。

在你的特殊情况下,可以替换groupByKey带着groupBy()

KTable<String, Double> aggregatedMetrics = eventStream
    .groupBy((k,v) -> k)
    .aggregate(() -> 0d, (key, value, aggregate) -> value + aggregate);

lambda是一个不修改键的虚拟对象,但是它是Kafka流在进行聚合之前根据键重新划分数据的一个提示。

关于GlobalKTable这是一种特殊类型的表,它不是聚合的结果,而是从变更主题中填充的。代码似乎已经做了正确的事情:将聚合结果写入主题,并将主题重新读为GlobalKTable

所属标签

可能回答问题的人

  • 应用案例分享

    1 粉丝490 提问5 回答
  • uncle_light

    5 粉丝518 提问4 回答
  • o o

    4 粉丝495 提问4 回答
  • 学生

    8 粉丝476 提问3 回答

扫码关注云+社区

领取腾讯云代金券

玩转腾讯云 有奖征文活动