首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >如何运行mongo连接器作为kafka的源,并将其与logstash输入集成,从而使用elasticsearch作为接收器?

如何运行mongo连接器作为kafka的源,并将其与logstash输入集成,从而使用elasticsearch作为接收器?
EN

Stack Overflow用户
提问于 2020-12-21 23:47:14
回答 2查看 915关注 0票数 1

我创建了一个https://github.com/mongodb/mongo-kafka构建

但这是如何与我运行的卡夫卡实例连接起来的。

即使这个问题听起来有多蠢。但是,似乎没有文档可以使它与本地运行的replicaset of mongodb一起工作。

所有的博客都指向使用芒果地图集。

如果你有一个好的资源,请引导我走向它。

更新1

用过的maven插件- https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect

把它放入卡夫卡插件,重新启动卡夫卡。

更新2 --如何使mongodb成为kafka的源?

https://github.com/mongodb/mongo-kafka/blob/master/config/MongoSourceConnector.properties

文件用作Kafka的配置。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
bin/kafka-server-start.sh config/server.properties --override config/MongoSourceConnector.properties

更新3-上面的方法没有工作回到博客,它没有提到端口8083是什么。

安装合流和汇流-集线器,仍然不确定蒙戈连接器与卡夫卡工作。

更新4-

动物园管理员,Kafka服务器,Kafka连接运行

蒙戈卡夫卡图书馆档案 卡夫卡连接Avro连接器库文件

用下面的命令我的源开始工作了-

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties

使用下面的logstash配置,我可以将数据推入elasticsearch -

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
input {
  kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["users","organisations","skills"]
  }
}
output {
  elasticsearch {
        hosts => ["localhost:9200"]
  }
  stdout { codec => rubydebug }
}

因此,现在有一个MongoSourceConnector.properties保存了它读取的一个集合名称,我需要为每个集合运行kafka连接不同的属性文件。

我的Logstash将新数据推入elasticsearch,而不是更新旧数据。此外,它没有按照集合的名称创建索引。这个想法应该能够与我的MongoDB数据库进行完美的同步。

最后更新-现在一切都很顺利,

  • 为kafka连接创建多个属性文件
  • 最新的logstash实际上按照主题名称创建索引,并相应地更新索引。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
input {
    kafka {
        bootstrap_servers => "localhost:9092"
        decorate_events => true
        topics => ["users","organisations","skills"]
    }
}
filter {
    json {
        source => "message"
        target => "json_payload"
    }

    json {
        source => "[json_payload][payload]"
        target => "payload"
    }

    mutate {
        add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
        rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
        rename => { "[payload][fullDocument]" => "document"}
        remove_field => ["message","json_payload","payload"]
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "%{es_index}"
        action => "update"
        doc_as_upsert => true
        document_id => "%{mongo_id}"
    }
    stdout {
        codec =>
        rubydebug {
            metadata => true
        }
    }
}

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-12-23 22:43:12

使用弹性搜索成功地实现MongoDb同步的步骤-

  • 首先部署mongodb副本-
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//Make sure no mongo deamon instance is running
//To check all the ports which are listening or open
sudo lsof -i -P -n | grep LISTEN 

//Kill the process Id of mongo instance
sudo kill 775

//Deploy replicaset
mongod --replSet "rs0" --bind_ip localhost --dbpath=/data/db
  • 为Kafka创建配置属性
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//dummycollection.properties <- Filename
name=dummycollection-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# Connection and source configuration
connection.uri=mongodb://localhost:27017
database=dummydatabase
collection=dummycollection
copy.existing=true
topic.prefix=
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options
publish.full.document.only=true
pipeline=[]
batch.size=0
collation=
  • 确保以下urls中的JAR文件可用于您的kafka插件-

Maven中央存储库搜索

卡夫卡连接阿夫罗转换器

  • 部署kafka
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

//Kaka Server
bin/kafka-server-start.sh config/server.properties

//Kaka Connect
bin/connect-standalone.sh config/connect-standalone.properties config/dummycollection.properties
  • 配置Logstash -
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// /etc/logstash/conf.d/apache.conf  <- File 
input {
  kafka {
        bootstrap_servers => "localhost:9092"
        decorate_events => true
        topics => ["dummydatabase.dummycollection"]
  }
}
filter {
    json {
        source => "message"
        target => "json_payload"
    }

    json {
        source => "[json_payload][payload]"
        target => "payload"
    }

    mutate {
        add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
        rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
        rename => { "[payload][fullDocument]" => "document"}
        remove_field => ["message","json_payload","payload"]
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "%{es_index}"
        action => "update"
        doc_as_upsert => true
        document_id => "%{mongo_id}"
    }
    stdout {
      codec =>
        rubydebug {
            metadata => true
        }
    }
}
  • 启动ElasticSearch、Kibana和Logstash
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
sudo systemctl start elasticsearch
sudo systemctl start kibana
sudo systemctl start logstash
  • 测试

打开蒙戈罗盘,和

  • 创建集合,在logstash主题中提到这些集合,并为Kafka创建属性文件。
  • 向它添加数据
  • 更新数据

弹性搜索中的评审索引

票数 2
EN

Stack Overflow用户

发布于 2020-12-22 11:49:27

端口8083是Kafka,您可以从其中一个connect-*.sh脚本开始。

它是独立于代理的,属性不会从kafka-server-start中设置。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65404914

复制
相关文章
Eclipse保存文件时出现字符编码错误
eclipse 由于开源所以支持了比较杂的编码方式,而这些一个工程导入时添加了不少的外来程序,由于不是同一工程一次编码带来了其中含有 GBK 或 UTF8 或 UTF16 或 ASCII 等文件编译时就会出现错误警告。
黑泽君
2018/10/11
3.4K0
【SAP SD系列】销售订单创建保存时,出现错误
在创建销售订单时,保存之后,出现如下显示: 点放大镜出现: 出现上面红色错误的时候 怎么解决 解决办法如下: 这个是开启配置了销售订单的合法控制   属于外贸关税的配置里面的   要关闭订单类型的合法控制检查即可 路径:销售分销-外贸关税-合法控制 凭证控制 把合法控制 由 C 改为 空
matinal
2020/11/17
4.6K0
【SAP SD系列】销售订单创建保存时,出现错误
office打开文件时出现向程序发送命令时出现问题_向文件发送命令时错误
大家好,我是架构君,一个会写代码吟诗的架构师。今天说一说office打开文件时出现向程序发送命令时出现问题_向文件发送命令时错误,希望能够帮助大家进步!!!
Java架构师必看
2022/10/24
8K0
SQL Server 2008新特性——SSMS增强
SQL Server 2008已经发布很多天了,今天来总结一下我们最常用的工具SSMS在SQL 2008中的一些改进:
深蓝studyzy
2022/06/16
1.6K0
SQL Server 2008新特性——SSMS增强
VS解决BEX错误但不能关闭DEP保存
问题签名: 问题事件名称: BEX 应用程序名: Auth.exe 应用程序版本号: 0.0.0.0 应用程序时间戳: 546d9e0c 故障模块名称: Auth.exe 故障模块版本号: 0.0.0.0 故障模块时间戳: 546d9e0c 异常偏移: 00137ec6 异常代码: c0000417
全栈程序员站长
2022/07/06
1K0
VS解决BEX错误但不能关闭DEP保存
Emacs 保存 gpg 文件时卡住
在最近的 gnupg 版本中,出现了 breaking changes 导致 Emacs 保存文件时会卡住,涉及的版本有:
飞驰的西瓜
2023/09/06
2080
Emacs 保存 gpg 文件时卡住
Hadoop搭建,上传文件时出现错误,没有到主机的路由
解决方案: (1)从namenode主机ping其它slaves节点的主机名(注意是slaves节点的主机名),如果ping不通,原因可能是namenode节点的/etc/hosts 未配置主机名与IP地址的映射关系,补全主机名与IP地址的映射关系。 (2)从datanode主机ping master节点的主机名(注意也是节点的主机名),如果ping不通,原因可能是datenode节点的/etc/hosts 未配置主机名与IP地址的映射关系,补全主机名与IP地址的映射关系。 (3)查看各机器节点的防火墙是否关闭(或者设置防火墙开启,但对我们的指定端口开放,最好是关闭防火墙)
hankleo
2020/09/17
2.6K0
机器人自主完成手术
据美国科学促进会网站2016年5月5日报道,研究人员首次利用一台受监控的自主机器人成功完成软组织手术。该机器人在猪肠道手术中的表现战胜了外科专家和目前的机器人辅助手术技术。自主机器人在无需人工干预的情况下便可减少软组织手术的并发症并提高安全性和有效性,其中美国每年完成的此类手术约有4500万例。目前机器人辅助手术依赖于外科医生的手动控制,效果取决于手术医生个人的训练水平和经验。自动化手术领域的研究使得硬组织手术取得了进展,例如骨切割,但这对于软组织手术而言则很具挑战性,因为软组织具有延展性和移动性,不确定性
人工智能快报
2018/03/13
1K0
Typecho评论时可能出现的错误
根据URL得知应该是在发评论的时候出现的错误。很奇怪,因为很久之前这位博友jiuki's blog也给我反馈过一次。说明这问题不是偶然出现的,所以必须要解决才行。
Melody132
2020/03/13
1.5K0
【SQL】用SSMS连接Oracle手记
我想在A机用ssms连C机的oracle,一番了解,普遍做法是装ODAC xcopy包,里面有oledb组件,然后就可以在sqlserver的链接服务器里添加oracle数据库。如图:
AhDung
2020/04/10
2.3K0
【SQL】用SSMS连接Oracle手记
navicat连接mysql时出现2003(10060)错误
参考http://jingyan.baidu.com/article/95c9d20dac9040ec4f75617a.html,发现是防火墙未关闭;
全栈程序员站长
2021/06/21
8.1K1
navicat连接mysql时出现2003(10060)错误
file_put_contents 保存文件时乱码
tomcat让人抓狂,后台java写的一个应用程序生成的静态html居然是ANSI编码格式的文件,前台首页点击查看页面时直接乱码了…
meteoric
2018/11/15
1.7K0
访问网时出现403 Forbidden错误的原因:
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/113700.html原文链接:https://javaforall.cn
全栈程序员站长
2022/07/07
10.1K1
上传文件时出现跨域问题
所以啊,这根本不是跨域的问题,Tomcat默认上传的文件大小就是1MB,你上传的文件超过而已。
乐心湖
2020/07/31
3.6K0
上传文件时出现跨域问题
IDEA远程提交hadoop任务时出现的错误
上述问题解决:开启historyserver服务 mr-jobhistory-daemon.sh start historyserver
ZONGLYN
2019/08/08
2.3K0
Android studio 出现文件错误或者错乱
Android studio经常用着用着,出现Java显示乱码,xml显示Java,各种文件显示不全,或者build.gradle 显示成java的文件等
包子388321
2020/06/16
1.5K0
使用Androidkiller或APKIDE编译APK文件时出现libpng error: Not a PNG file的错误
>W: libpng error: Not a PNG file >W: ERROR: Failure processing PNG image D:\xin\AndroidKiller_v1.3.12018\projects\CFF_100\Project\res\mipmap-xxhdpi-v4\ic_launcher.png
全栈程序员站长
2022/11/17
4.4K0
使用Androidkiller或APKIDE编译APK文件时出现libpng error: Not a PNG file的错误
解决在打开word时,出现 “word 在试图打开文件时遇到错误” 的问题(亲测有效)[通俗易懂]
最近在网上查找期刊论文的模板时,发现从期刊官网下载下来的论文格式模板,在本地用word打开时,出现错误,情况如下
全栈程序员站长
2022/09/03
4.2K0
解决在打开word时,出现 “word 在试图打开文件时遇到错误” 的问题(亲测有效)[通俗易懂]
compile 时只保存项目内的文件
Emacs 中有一个非常好用的 compile 模块,可以非常方便的编译代码、运行测试等。不熟悉的读者可以参考: Compiling and running scripts in Emacs - Mastering Emacs[1] 。
飞驰的西瓜
2022/12/14
3790
点击加载更多

相似问题

警告: EPERM。咕噜声不能完成手术

23

使用SSMS连接到本地实例时出现SQL连接错误

23

手术无法完成

11

手术无法完成。(OSStatus错误-54.)

42

手术无法完成。(CHSErrorDomain错误1300.)

36
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文