首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >在Kafka Streams中聚合多个分区

在Kafka Streams中聚合多个分区
EN

Stack Overflow用户
提问于 2018-06-04 03:53:35
回答 1查看 3.7K关注 0票数 6

在某种程度上,这是Aggregation over a specific partition in Apache Kafka Streams的后续

假设我有一个名为"events“的主题,其中有3个分区,我在其中发送字符串->整数数据,如下所示:

(Bob,3)在分区1上

(Sally,4)在分区2上

(Bob,2)在分区3上

..。

我想要聚合所有分区上的值(在本例中,只是一个简单的总和),以得到如下所示的KTable

(Sally,4)

(鲍勃,5岁)

正如我在上面链接的问题的答案中提到的,直接进行这种跨分区聚合是不可能的。然而,回答者提到,如果消息具有相同的密钥是可能的(在这种情况下是真的)。如何才能做到这一点?

我还希望能够从跨Kafka Streams应用程序的每个实例复制的“全局”状态存储中查询这些聚合值。

我的第一个想法是使用GlobalKTable (我相信,根据this page的说法,这应该是我需要的)。但是,此状态存储的changelog主题具有与原始“事件”主题相同的分区数量,并且仅在每个分区的基础上进行聚合,而不是跨所有分区进行聚合。

这是我的应用程序的精简版本--我真的不确定下一步该怎么做:

代码语言:javascript
复制
final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "metrics-aggregator");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomDoubleSerde.class.getName());
streamsConfig.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), 0);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, Double> eventStream = builder.stream(INCOMING_EVENTS_TOPIC);
KTable<String, Double> aggregatedMetrics = eventStream
        .groupByKey()
        .aggregate(() -> 0d, (key, value, aggregate) -> value + aggregate);

aggregatedMetrics.toStream().print(Printed.<String, Double>toSysOut());
aggregatedMetrics.toStream().to(METRIC_CHANGES_TOPIC);

final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.cleanUp();
streams.start();

builder.globalTable(METRIC_CHANGES_TOPIC, Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as(METRICS_STORE_NAME));

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    streams.close();
}));
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50670436

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档