我在我的spring boot应用程序中创建了一个kafka消费者,它侦听一个主题- my_topic,在读取事件my_event时,我执行了一些操作。现在我是这样做的:
@KafkaListener(topics = "my_topic",
containerFactory = "my_kafka_container_factory")
public void handleMyKafkaEvents(String eventJson) {
MyDTO my_dto = gson.fromJson(eventJson, MyDTO.class);
String event_type = my_dto.getEventType();
if (event_type != null && event_type.equals("my_event")) {
// do something with my_dto
}
}
// dto object
public class MyDTO {
private String status;
private String eventType;
private String propName;
// some other parameters
// getters and setters
}
我的kafka主题中的对象示例如下:
{
"eventType": "my_event",
"propName": "prop_value",
"status": "DONE",
//some_other_key_value_pairs_required_in_my_DTO
//some_other_key_value_pairs_not_required_in_my_DTO
}
由于我的监听器正在监听推送到kafka主题的所有数据,所以在读取每条记录后,我必须添加一个条件,即如果它的eventType是我需要的,那么我将对它执行一些操作。
到目前为止,这是有效的。因为还会有其他数据被推送到其中,这些数据的eventType不是我需要的,所以这些数据将被忽略,但只有在读取它们之后才会被忽略,因为我不知道如何根据这个eventType进行过滤。
所以我的问题是,当推送到kafka主题的事件数量突然激增时,不仅是我的eventType,还有其他人,它会影响我的服务的性能吗?
我可以在这里改进什么,这样其他的eventTypes就会被忽略,我的监听器甚至不需要知道它们。
发布于 2019-06-05 03:28:22
一些方法可能适合您的用例,也可能不适合,但可能会有所帮助:
在key中
在kafka key
中包含一些特定的代码,因此您不需要阅读payload
来判断是否必须处理消息。
这只是一个愚蠢的例子:
key payload
-----------------
10_ev xxx
08_ev yyy
...
在这个简单的例子中,前两个数字决定了事件的类型。为每个使用者组分配一个特定的事件进行处理,并丢弃其他事件。小心!为此,您需要启动与事件类型一样多的消费者组,这样就不会丢失消息,或者将特定的事件范围分配给所有消费者(f.e,消费者0处理0-9的事件类型,消费者1处理10-19的事件类型,...)
取决于event的
您可以告诉生产者(应该知道他正在生成的事件的类型)对消息进行分区(将消息发送到特定分区到主题中),这样您就可以知道,例如,所有类型为0的事件都在分区0上,并考虑到这一点进行消费。
无论如何,拥有过多的事件类型可能会减少可用的选项。您可以基于范围进行分区(在分区0上,事件类型从0到9,等等),但是,可能会有一些令人头疼的问题。
根据event type的不同主题的
好吧,这肯定是最简单的一个,但是如果你有很多事件类型(如数千,...),可能会有问题。:)
希望能有所帮助。这里有一些关于你的用例here的有趣信息。
https://stackoverflow.com/questions/56449704
复制相似问题