正如问题How to manually control the offset commit with camel-kafka?中所问的,我希望使用camel-kafka手动提交偏移量。我的路线: .from(kafka:topic1)
.aggregate(new GroupByExchangeStrategy())
.to(kafka:topic2)
.process(new ManualCommitProcessor()) ,其中ManualCommitProcessor将在将消息发送到另一个主题后进行承诺。 问题是聚合器和kafka生产者是在独立的线程中工作的,kafka消费者负责
我有两个主题,每个主题上有10个分区,我使用下面的代码来收听消息。这里将建立多少个连接?是20吗?每个连接是在分区级别还是在命名空间(bootstarap服务器)级别?
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<Object, Objec
我有一个关于管理多个CG的问题,创建了三个消费者组,每个CG都有自己的kafka服务、组Id和主题。
现在我收到了预期的消息,但是,我想知道是否有可能创建下一个场景:
创建三个消费者组,但只接收来自一个用户的消息,暂时暂停/暂停其他用户组,如果他的kafka服务将下降,则使用下一个消费者组的消息,与第三个用户组相同。
下面是我的代码示例:
function createConsumerGroup(topics){
const ConsumerGroup = kafka.ConsumerGroup;
//CREATE CONSUMER GROUPS FOR EVERY SER
我已经使用python.How创建了实现的Kafka生产者-消费者消息传递主题我可以对队列做同样的事情,这样消息就只会发送给一个消费者。
这是我的生产者代码
# Import KafkaProducer from Kafka library
from kafka import KafkaProducer
# Define server with port
bootstrap_servers = ['localhost:9092']
# Define topic name where the message will publish
topicName = 'Firs