我的目标是在rabbitmq交换队列和kafka主题之间建立一个连接器。
我按照这个指南来设置连接器:https://camel.apache.org/camel-kafka-connector/latest/try-it-out-locally.html。我从源代码https://github.com/apache/camel-kafka-connector下载并安装了连接器,为camel-rabbitmq-kafka-connector构建了它并解压缩了文件。我还将plugin.path指向我在camel-rabbitmq-kafka-connector -Standalone.properties中解压缩jars的文件夹。
我在CamelRabbitSourceConnector中使用的参数如下:
name=CamelRabbitmqSourceConnector
connector.class=org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector
tasks.max=1
# use the kafka converters that better suit your needs, these are just defaults:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
# comma separated topics to send messages into
topics=mytopic
# mandatory properties (for a complete properties list see the connector documentation):
# The exchange name determines the exchange to which the produced messages will be sent to. In the case of consumers, the exchange name determines the exchange the queue will be bound to.
camel.source.path.exchangeName=myexchange
camel.source.endpoint.hostname=myhostname
camel.source.endpoint.addresses=localhost:5672
camel.source.endpoint.queue=myqueue我的rabbitmq的docker run命令看起来是这样的:docker run --rm -it --hostname myhostname -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:3-management。对于kafka,我使用了标准的“入门”指南。
使用python Pika库发送消息:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='myqueue',durable=True,auto_delete=True)
channel.basic_publish(exchange='', routing_key='myqueue', body='some body...')如您所见,我发送消息时没有在channel.basic_publish函数中指定exchange参数。如果我将它设置为camel.source.path.exchangeName,那么我的消息就会在这两者之间的某个地方丢失,所以这里我可能遗漏了一些东西。
发布于 2020-09-28 21:39:13
我能够通过将我的客户端更改为Java:https://www.rabbitmq.com/tutorials/tutorial-one-java.html而不是python来解决这个问题。
发布于 2020-10-03 04:58:40
我可以使用以下属性使其正常工作:
name=CamelRabbitmqSourceConnector
connector.class=org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector
tasks.max=1
# use the kafka converters that better suit your needs, these are just defaults:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
# comma separated topics to send messages into
topics=mytopic
# mandatory properties (for a complete properties list see the connector documentation):
# The exchange name determines the exchange to which the produced messages will be sent to. In the case of consumers, the exchange name determines the exchange the queue will be bound to.
camel.source.endpoint.hostname=myhostname
camel.source.endpoint.addresses=localhost:5672
camel.source.endpoint.queue=myqueue
camel.source.endpoint.autoDelete=false
camel.source.endpoint.skipExchangeDeclare=true
camel.source.endpoint.skipQueueBind=truehttps://stackoverflow.com/questions/64053714
复制相似问题