首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何自动调用kafka-stream-processor中的process()方法?

如何自动调用kafka-stream-processor中的process()方法?
EN

Stack Overflow用户
提问于 2019-06-05 00:57:51
回答 1查看 171关注 0票数 0

我正在学习kafka streams,并写了一个简单的应用程序,代码片段如下:

MainApp:

代码语言:javascript
复制
        Topology topology = new Topology();

        topology.addSource("SOURCE", "source-topic");
        topology.addProcessor("Processor1", () -> new Processor1(), "SOURCE");
        topology.addProcessor("Processor2", () -> new Processor2(), "Processor1");
        topology.addProcessor("Processor3", () -> new Processor3(), "Processor2");
        topology.addSink("SINK", "sink-topic", "Processor3");

        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.start();

单个流处理器的片段:

代码语言:javascript
复制
public class Processor1 implements Processor<String, String> {

   // Rest of code

    @Override
    public void process(String key, String value) {
        System.out.println("Inside Processor1#process() method");
        context.forward(key, value);
    }

我知道我们需要创建Topology,然后调用streams.start();来启动它

我不能理解process()方法是如何被自动调用的,是谁在调用它?

EN

回答 1

Stack Overflow用户

发布于 2019-06-05 03:33:26

ProcessorContextImpl类在特定拓扑节点的每个传入消息上自动调用的Processor process()方法。对于构建的拓扑,当消息到达传入主题时,SOURCE节点使用它,并通过内部调用forward方法将消息转发(传播)到子节点(您可以调试/查看来自类ProcessorContextImpl的代码)。在本例中,SOURCE节点将键和值转发给子节点Processor1。在那之后,来自类Processor1process()方法被触发。当代码到达context.forward()时,消息转发到下一个子节点Processor2。之后,消息以类似的方式传播到Processor3SINK节点,最后,消息生成到出站主题。用于特定消息的这种管道在单个线程上执行(如果您为配置num.stream.threads = 1设置了默认值,则所有消息都将在每个应用程序实例的单个线程上处理)。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56448317

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档