我创建了一个https://github.com/mongodb/mongo-kafka构建
但这是如何与我运行的卡夫卡实例连接起来的。
即使这个问题听起来有多蠢。但是,似乎没有文档可以使它与本地运行的replicaset
of mongodb
一起工作。
所有的博客都指向使用芒果地图集。
如果你有一个好的资源,请引导我走向它。
更新1
用过的maven插件- https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect
把它放入卡夫卡插件,重新启动卡夫卡。
更新2 --如何使mongodb成为kafka的源?
https://github.com/mongodb/mongo-kafka/blob/master/config/MongoSourceConnector.properties
文件用作Kafka的配置。
bin/kafka-server-start.sh config/server.properties --override config/MongoSourceConnector.properties
更新3-上面的方法没有工作回到博客,它没有提到端口8083是什么。
安装合流和汇流-集线器,仍然不确定蒙戈连接器与卡夫卡工作。
更新4-
动物园管理员,Kafka服务器,Kafka连接运行
用下面的命令我的源开始工作了-
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties
使用下面的logstash配置,我可以将数据推入elasticsearch -
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["users","organisations","skills"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
stdout { codec => rubydebug }
}
因此,现在有一个MongoSourceConnector.properties保存了它读取的一个集合名称,我需要为每个集合运行kafka连接不同的属性文件。
我的Logstash将新数据推入elasticsearch,而不是更新旧数据。此外,它没有按照集合的名称创建索引。这个想法应该能够与我的MongoDB数据库进行完美的同步。
最后更新-现在一切都很顺利,
input {
kafka {
bootstrap_servers => "localhost:9092"
decorate_events => true
topics => ["users","organisations","skills"]
}
}
filter {
json {
source => "message"
target => "json_payload"
}
json {
source => "[json_payload][payload]"
target => "payload"
}
mutate {
add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
rename => { "[payload][fullDocument]" => "document"}
remove_field => ["message","json_payload","payload"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{es_index}"
action => "update"
doc_as_upsert => true
document_id => "%{mongo_id}"
}
stdout {
codec =>
rubydebug {
metadata => true
}
}
}
发布于 2020-12-23 22:43:12
使用弹性搜索成功地实现MongoDb同步的步骤-
//Make sure no mongo deamon instance is running
//To check all the ports which are listening or open
sudo lsof -i -P -n | grep LISTEN
//Kill the process Id of mongo instance
sudo kill 775
//Deploy replicaset
mongod --replSet "rs0" --bind_ip localhost --dbpath=/data/db
//dummycollection.properties <- Filename
name=dummycollection-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb://localhost:27017
database=dummydatabase
collection=dummycollection
copy.existing=true
topic.prefix=
poll.max.batch.size=1000
poll.await.time.ms=5000
# Change stream options
publish.full.document.only=true
pipeline=[]
batch.size=0
collation=
//Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
//Kaka Server
bin/kafka-server-start.sh config/server.properties
//Kaka Connect
bin/connect-standalone.sh config/connect-standalone.properties config/dummycollection.properties
// /etc/logstash/conf.d/apache.conf <- File
input {
kafka {
bootstrap_servers => "localhost:9092"
decorate_events => true
topics => ["dummydatabase.dummycollection"]
}
}
filter {
json {
source => "message"
target => "json_payload"
}
json {
source => "[json_payload][payload]"
target => "payload"
}
mutate {
add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
rename => { "[payload][fullDocument]" => "document"}
remove_field => ["message","json_payload","payload"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{es_index}"
action => "update"
doc_as_upsert => true
document_id => "%{mongo_id}"
}
stdout {
codec =>
rubydebug {
metadata => true
}
}
}
sudo systemctl start elasticsearch
sudo systemctl start kibana
sudo systemctl start logstash
打开蒙戈罗盘,和
弹性搜索中的评审索引
发布于 2020-12-22 11:49:27
端口8083是Kafka,您可以从其中一个connect-*.sh
脚本开始。
它是独立于代理的,属性不会从kafka-server-start
中设置。
https://stackoverflow.com/questions/65404914
复制相似问题