我可以消费来自Kafka的消息,只有当我使用它们的严格名称来指定主题时,比如'some_topic',但我想消费与特定正则表达式匹配的主题:
<?php
$conf = new RdKafka\Conf();
//$conf->set('log_level', 1);
//$conf->set('debug', 'all');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("localhost:9092");
$topic = $rk->newTopic("^postgres.public.table_[0-9]+_");
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
$msg = $topic->consume(0, 1000);
if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
continue;
} elseif ($msg->err) {
echo $msg->errstr(), "\n";
break;
} else {
echo $msg->payload;
echo "\n\n";
}
}正如您所看到的,主题名称是^postgres.public.table_[0-9]+_。然而,它不起作用。我在控制台中收到以下错误消息:
%3|1579347443.452|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: ^postgres.public.object_[0-9]+_ [0]: topic does not exist (Broker: Invalid topic)这有什么问题?我该如何修复它?
发布于 2020-01-19 00:12:36
newTopic()用于创建RdKafka\Topic对象的新实例。它不是用来加入主题的。
使用subscribe()方法根据模式连接主题,在本例中为正则表达式。因此,您可以使用类似以下内容:
$rk->subscribe('^postgres.public.table_[0-9]+_');https://stackoverflow.com/questions/59800303
复制相似问题