如何在Ubuntu 14.04上使用Transporter将转换后的数据从MongoDB同步到Elasticsearch

介绍

Elasticsearch有助于对数据进行全文搜索,而MongoDB则擅长存储数据。使用MongoDB存储数据和使用Elasticsearch进行搜索是一种常见的体系结构。

很多时候,您可能会发现需要将数据从MongoDB批量迁移到Elasticsearch。为此编写自己的程序虽然是一项很好的练习,但却是一项繁琐的工作。有一个很棒的开源实用程序叫做Transporter,由Compose(数据库的云平台)开发,可以非常高效地完成这项任务。

本教程将向您展示如何使用开源实用程序Transporter通过自定义转换将数据从MongoDB快速复制到Elasticsearch。

目标

在本文中,我们将介绍如何使用Transporter实用程序将数据从MongoDB复制到Ubuntu 14.04上的Elasticsearch 。

我们将从快速概述开始,向您展示如何安装MongoDB和Elasticsearch,尽管我们不会详细介绍两个系统中的数据建模。如果您已经安装了这两个步骤,请随意快速浏览安装步骤。

然后我们将转移到Transporter。

其他版本的Ubuntu以及其他Linux发行版的说明类似。

先决条件

请完成以下先决条件。

第1步 - 安装MongoDB

导入MongoDB存储库的公钥。

sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 7F0CEB10

为MongoDB创建一个列表文件。

echo 'deb http://downloads-distro.mongodb.org/repo/ubuntu-upstart dist 10gen' | sudo tee /etc/apt/sources.list.d/mongodb.list

重新加载本地包数据库。

sudo apt-get update

安装MongoDB包:

sudo apt-get install -y mongodb-org

请注意,每个包都包含关联的版本号。

安装完成后,您可以启动,停止和检查服务的状态。它将在安装后自动启动。

尝试连接到作为服务运行的MongoDB实例:

mongo

如果它已启动并运行,您将看到如下内容:

MongoDB shell version: 2.6.9
connecting to: test
Welcome to the MongoDB shell.
For interactive help, type "help".
For more comprehensive documentation, see
    http://docs.mongodb.org/
Questions? Try the support group
    http://groups.google.com/group/mongodb-user

这意味着数据库服务器正在运行!你现在可以退出:

exit

第2步 - 安装Java

Java是Elasticsearch的先决条件。我们现在安装它。

首先,添加存储库:

sudo apt-add-repository ppa:webupd8team/java

再次更新您的包列表:

sudo apt-get update

安装Java:

sudo apt-get install oracle-java8-installer

当提示您接受许可时,请选择<Ok>然后<Yes>

第3步 - 安装Elasticsearch

现在我们将安装Elasticsearch。

首先,创建一个新目录,您将在其中安装搜索软件,然后进入该目录。

mkdir ~/utils
cd ~/utils

访问Elasticsearch的下载页面以查看最新版本。

现在下载最新版本的Elasticsearch。在撰写本文时,最新版本为1.5.0。

wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.5.0.zip

安装解压缩:

sudo apt-get install unzip

解压缩存档:

unzip elasticsearch-1.5.0.zip

导航到您解压缩的目录:

cd elasticsearch-1.5.0

通过发出以下命令启动Elasticsearch:

bin/elasticsearch

Elasticsearch启动需要几秒钟。你会看到一些启动日志。Elasticsearch现在将在终端窗口中运行。

注意:在某些时候,您可能希望将Elasticsearch作为服务运行,以便您可以使用sudo service elasticsearch restart和类似的命令来控制它; 有关提示,请参阅本教程有关Upstart的信息。或者,您可以从Ubuntu的存储库安装Elasticsearch,尽管您可能会获得旧版本。

保持此终端打开。在另一个终端窗口中建立与服务器的另一个SSH连接,并检查您的实例是否已启动并运行:

curl -XGET http://localhost:9200

9200是Elasticsearch的默认端口。如果一切顺利,您将看到类似于以下所示的输出:

{
  "status" : 200,
  "name" : "Northstar",
  "cluster_name" : "elasticsearch",
  "version" : {
    "number" : "1.5.0",
    "build_hash" : "927caff6f05403e936c20bf4529f144f0c89fd8c",
    "build_timestamp" : "2015-03-23T14:30:58Z",
    "build_snapshot" : false,
    "lucene_version" : "4.10.4"
  },
  "tagline" : "You Know, for Search"
}

注意:对于本文的后半部分,当您要复制数据时,请确保Elasticsearch正在运行(并在端口9200上)。

第4步 - 安装Mercurial

接下来我们将安装修订控制工具Mercurial。

sudo apt-get install mercurial

验证是否正确安装了Mercurial:

hg

如果安装正确,您将获得以下输出:

Mercurial Distributed SCM
​
basic commands:
​
. . .

第5步 - 安装Go

Transporter是用Go语言编写的。因此,您需要在系统上安装golang

sudo apt-get install golang

要使Go正常工作,您需要设置以下环境变量:

从您的$HOME目录为Go创建一个文件夹:

mkdir ~/go; echo "export GOPATH=$HOME/go" >> ~/.bashrc

更新你的路径:

echo "export PATH=$PATH:$HOME/go/bin:/usr/local/go/bin" >> ~/.bashrc

注销当前的SSH会话并再次登录。您可以只关闭您工作的会话并保持Elasticsearch会话的运行。此步骤对于环境变量的更新至关重要。再次登录,并验证是否已添加您的变量:

echo $GOPATH

这应该显示Go的新路径。在我们的例子中,它将是:

/home/sammy/go

如果它没有正确显示路径,请仔细检查本节中的步骤。

一旦我们$GOPATH设置正确,我们需要通过构建一个简单的程序来检查Go是否正确安装。

创建一个名为hello.go的文件并将以下程序放入其中。您可以使用任何所需的文本编辑器。我们将在本文中使用nano文本编辑器。键入以下命令以创建新文件:

nano ~/hello.go

现在将下面这个简短的“Hello,world”程序复制到新打开的文件中。此文件的全部内容是帮助我们验证Go是否正常工作。

package main;
import "fmt"
​
func main() {
    fmt.Printf("Hello, world\n")
}

完成后,按CTRL+X退出文件。它会提示您保存文件。按Y,然后按ENTER。它会询问您是否要更改文件名。再按ENTER一次保存当前文件。

然后,从您的主目录,使用Go运行该文件:

go run hello.go

你应该看到这个输出:

Hello, world

如果您看到“Hello,world”消息,则Go已正确安装。

现在转到$GOPATH目录并创建子目录srcpkgbin。这些目录构成Go的工作空间。

cd $GOPATH
mkdir src pkg bin
  • src 包含组织到包中的Go源文件(每个目录一个包)
  • pkg 包含包对象
  • bin 包含可执行命令

第6步 - 安装Git

我们将使用Git来安装Transporter。使用以下命令安装Git:

sudo apt-get install git

第7步 - 安装Transporter

现在创建并移动到Transporter的新目录。由于该实用程序是由Compose开发的,我们将为该目录取名为compose

mkdir -p $GOPATH/src/github.com/compose
cd $GOPATH/src/github.com/compose

这是compose/transporter将安装的地方。

克隆Transporter GitHub存储库:

git clone https://github.com/compose/transporter.git

进入新目录:

cd transporter

取得/usr/lib/go目录的所有权:

sudo chown -R $USER /usr/lib/go

确保为GCC安装了build-essential

sudo apt-get install build-essential

运行该go get命令以获取所有依赖项:

go get -a ./cmd/...

这一步可能需要一段时间,所以请耐心等待。完成后,您可以构建Transporter。

go build -a ./cmd/...

如果一切顺利,它将完成,没有任何错误或警告。运行此命令检查Transporter是否已正确安装:

transporter

你应该看到这样的输出:

usage: transporter [--version] [--help] <command> [<args>]

Available commands are:
    about    Show information about database adaptors
    eval     Eval javascript to build and run a transporter application

. . .

所以安装完成了。现在,我们需要在MongoDB中使用一些我们要同步到Elasticsearch的测试数据。

故障排除:

如果您收到以下错误:

transporter: command not found

这意味着您的$GOPATH未添加到PATH变量中。检查您是否正确执行了该命令:

echo "export PATH=$PATH:$HOME/go/bin:/usr/local/go/bin" >> ~/.bashrc

尝试注销并再次登录。如果错误仍然存在,请改用以下命令:

$GOPATH/bin/transporter

第8步 - 创建示例数据

现在我们已经安装了所有东西,我们可以继续进行数据同步部分。

连接到MongoDB:

mongo

您现在应该看到MongoDB提示符>。创建一个名为foo的数据库。

use foo

将一些示例文档插入名为的集合中bar

db.bar.save({"firstName": "Robert", "lastName": "Baratheon"});
db.bar.save({"firstName": "John", "lastName": "Snow"});

选择刚刚输入的内容:

db.bar.find().pretty();

这应显示如下所示的结果(在您的机器上ObjectId会有所不同):

{
    "_id" : ObjectId("549c3ef5a0152464dde10bc4"),
    "firstName" : "Robert",
    "lastName" : "Baratheon"
}
{
    "_id" : ObjectId("549c3f03a0152464dde10bc5"),
    "firstName" : "John",
    "lastName" : "Snow"
}

现在您可以退出数据库了:

exit

一些术语:

  • MongoDB中的数据库类似于Elasticsearch中的索引
  • MongoDB中的集合类似于Elasticsearch中的类型

我们的最终目标是将来自MongoDB 的foo数据库的bar集合中的数据同步到Elasticsearch 中的foo索引的bar类型。

第9步 - 配置Transporter

现在,我们可以继续进行配置更改,将我们的数据从MongoDB迁移到Elasticsearch。Transporter需要配置文件(config.yaml),转换文件(myTransformation.js)和应用程序文件(application.js

  • 配置文件指定节点,类型和URI
  • 应用程序文件指定从源到目标的数据流以及可选的转换步骤
  • 转换文件将转换应用于数据

注意: 本节中的所有命令都假定您正在执行transporter目录中的命令。

移至transporter目录:

cd ~/go/src/github.com/compose/transporter

配置文件

如果您愿意,可以查看示例文件config.yaml。我们将备份原件,然后用我们自己的内容替换它。

mv test/config.yaml test/config.yaml.00

新文件类似,但更新了一些URI和一些其他设置以匹配我们服务器上的内容。让我们从这里复制内容并粘贴到新config.yaml文件中。再次使用nano编辑器。

nano test/config.yaml

将以下内容复制到文件中。完成后,如前所述保存文件。

# api:
#   interval: 60s
#   uri: "http://requestb.in/13gerls1"
#   key: "48593282-b38d-4bf5-af58-f7327271e73d"
#   pid: "something-static"
nodes:
  localmongo:
    type: mongo
    uri: mongodb://localhost/foo
  es:
    type: elasticsearch
    uri: http://localhost:9200/
  timeseries:
    type: influx
    uri: influxdb://root:root@localhost:8086/compose
  debug:
    type: file
    uri: stdout://
  foofile:
    type: file
    uri: file:///tmp/foo

请注意该nodes部分。与原始文件相比,我们稍微调整了localmongoes节点。节点是各种数据源和目的地。Type定义节点的类型。例如,

  • mongo 意味着它是一个MongoDB实例/集群
  • elasticsearch 意味着它是一个Elasticsearch节点
  • file 意味着它是一个纯文本文件

uri将使API端点与节点连接。如果未指定,默认端口将用于MongoDB(27017)。由于我们需要从MongoDB 的foo数据库中捕获数据,因此URI应如下所示:

mongodb://localhost/foo

同样,Elasticsearch的URI将如下所示:

http://localhost:9200/

保存config.yaml文件。您无需进行任何其他更改。

应用文件

现在,打开test目录中的application.js文件。

nano test/application.js

使用下面显示的内容替换文件的示例内容:

Source({name:"localmongo", namespace:"foo.bar"})
.transform({filename: "transformers/addFullName.js"})
.save({name:"es", namespace:"foo.bar"});

保存文件并退出。以下是我们pipeline的简要说明。

  • Source(options) 标识从中获取数据的源
  • transform 指定要对每条记录应用的转换
  • save(options) 识别保存数据的位置

选项包括:

  • name:``config.yaml文件中显示的节点名称
  • namespace:标识数据库和表名; 它必须用点()限定

转换文件

现在,最后一块拼图就是转型。如果你还记得,我们用firstNamelastName存储了MongoDB中的两条记录。在将数据从MongoDB同步到Elasticsearch时,您可以在这里看到转换数据的真正力量。

假设我们希望存储在Elasticsearch中的文档有另一个名叫fullName的字段。为此,我们需要创建一个新的转换文件test/transformers/addFullName.js

nano test/transformers/addFullName.js

将下面的内容粘贴到文件中。如前所述保存并退出。

module.exports = function(doc) {
  doc._id = doc._id['$oid']; 
  doc["fullName"] = doc["firstName"] + " " + doc["lastName"];
  return doc
}

第一行是解决Transporter处理MongoDB ObjectId()字段的方式所必需的。第二行告诉Transporter连接firstNamelastName来形成fullName

这是一个简单的转换示例,但是使用一点JavaScript,您可以在准备搜索数据时执行更复杂的数据操作。

第10步 - 执行转换

现在我们完成了设置,现在是时候同步和转换我们的数据了。

确保Elasticsearch正在运行!如果不是,请在新的终端窗口中再次启动它:

~/utils/elasticsearch-1.5.0/bin/elasticsearch

原始终端中,请确保您位于以下transporter目录中:

cd ~/go/src/github.com/compose/transporter

执行以下命令以复制数据:

transporter run --config ./test/config.yaml ./test/application.js

Transporter 的run命令需要两个参数。首先是配置文件,第二个是应用程序文件。如果一切顺利,命令将完成而没有任何错误。

通过我们的转换,检查Elasticsearch以验证数据是否已被复制:

curl -XGET localhost:9200/foo/bar/_search?pretty=true

你会得到这样的结果:

{
  "took" : 10,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "foo",
      "_type" : "bar_full_name",
      "_id" : "549c3ef5a0152464dde10bc4",
      "_score" : 1.0,
      "_source":{"_id":"549c3ef5a0152464dde10bc4","firstName":"Robert","fullName":"Robert Baratheon","lastName":"Baratheon"}
    }, {
      "_index" : "foo",
      "_type" : "bar_full_name",
      "_id" : "549c3f03a0152464dde10bc5",
      "_score" : 1.0,
      "_source":{"_id":"549c3f03a0152464dde10bc5","firstName":"John","fullName":"John Snow","lastName":"Snow"}
    } ]
  }
}

请注意 f字段 fullName, 其中包含了用空格将他们之间的空格连接起来的firstNamelastName,这也就是我们的转换生效的地方。

结论

现在我们知道如何使用Transporter将数据从MongoDB复制到Elasticsearch,以及如何在同步时将转换应用于我们的数据。您可以以相同的方式应用更复杂的转换。此外,您可以在管道中链接多个转换。

如果您正在进行多次转换,请将它们保存在单独的文件中,并将它们链接起来,这是一种很好的做法。这样,您可以使每个转换都可以独立使用。

所以,这就是它。您可以在GitHub上查看Transporter项目,以便及时了解API中的最新更改。

更多Ubuntu教程请前往腾讯云+社区学习更多知识。


参考文献:《How To Sync Transformed Data from MongoDB to Elasticsearch with Transporter on Ubuntu 14.04》

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏算法修养

Flask 学习篇二:学习Flask过程中的记录

Flask学习笔记: GitHub上面的Flask实践项目 https://github.com/SilentCC/FlaskWeb 1.Applicati...

33390
来自专栏技术博客

ExtJs十二(ExtJs Mvc图片管理之二)

接着图片管理一http://www.cnblogs.com/aehyok/archive/2013/04/27/3048278.html。

14410
来自专栏名山丶深处

springboot集成schedule(深度理解)

76850
来自专栏烂笔头

Python爬虫—破解JS加密的Cookie

目录[-] 前言 在GitHub上维护了一个代理池的项目,代理来源是抓取一些免费的代理发布网站。上午有个小哥告诉我说有个代理抓取接口不能用了,返回状态521...

75670
来自专栏运维

yum安装rabbitmq3.6.11与erlange20配置及优化

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件...

19410
来自专栏Python中文社区

Python爬虫—破解JS加密的Cookie

專 欄 ❈Jerry,Python中文社区专栏作者。 blog:https://my.oschina.net/jhao104/blog github:htt...

68780
来自专栏小白鼠

ZookeeperZNode基本命令四字命令SessionWatcherACLZookeeper集群Paxos算法ZAB协议Curator分布式锁

在Zookeeper中,ZNode可以分为持久节点和临时节点两类。所谓持久节点是指一旦这个ZNode被创建了,除非主动进行ZNode的移除操作,否则这个ZNod...

12630
来自专栏野路子程序员

【CentOS】Apache多虚拟主机多版本PHP(5.3+5.6+N)共存运行配置全过程

41750
来自专栏SpringBoot 核心技术

SpringCloud组件 & 源码剖析:Eureka服务注册方式流程全面分析

在SpringCloud组件:Eureka服务注册是采用主机名还是IP地址?文章中我们讲到了服务注册的几种注册方式,那么这几种注册方式的源码是怎么实现的呢?我们...

10310
来自专栏Danny的专栏

【SSH快速进阶】——探索Hibernate对象的三种状态:Transient、Persistent、Detached

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/huyuyang6688/article/...

11320

扫码关注云+社区

领取腾讯云代金券