前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何在CVM上同步自建数据库的数据?

如何在CVM上同步自建数据库的数据?

原创
作者头像
爆栈工程师
修改2018-07-18 14:50:10
1.5K0
修改2018-07-18 14:50:10
举报

简介

Transporter是一种用于在不同数据存储之间移动数据的开源工具。开发人员经常为诸如跨数据库移动数据,将数据从文件移动到数据库或反之亦然等任务编写一次性脚本,但使用像Transporter这样的工具有几个优点。

在Transporter中,您构建通道,这些通道定义从源(读取数据的位置)到接收器(写入数据的位置)的数据流。源和接收器可以是SQL或NoSQL数据库,flat 数据或其他数据。Transporter使用可插拔扩展的适配器与这些资源进行通信,默认情况下,该项目包括几个适用于常用数据库的适配器。

除了移动数据之外,Transporter还允许您在使用变换器通过通道时更改数据。与适配器一样,默认情况下包含多个变换器。您也可以编写自己的变换器来自定义数据修改。

在本教程中,我们将介绍使用Transporter的内置适配器和用JavaScript编写的自定义转换器将数据从MongoDB数据库移动和处理到Elasticsearch的示例。

准备

要学习本教程,您需要:

  • 购买腾讯云CVM Ubuntu 16.04服务器,可以使用sudo命令的非root账户。
  • 在购买好的服务器上安装好MongoDB、Elasticsearch,相关安装教程可以参考腾讯云开发者实验室
  • Transporter通道是用JavaScript编写的,但是您不需要任何JavaScript知识或经验来学习本教程。

第一步、安装Transporter

Transporter为大多数常见操作系统提供二进制文件。Ubuntu的安装过程包括两个步骤:

  • 下载Linux二进制文件
  • 想办法使其可执行

首先,从GartHub上的Transporter项目页面获取最新版本的链接。复制以-linux-amd6结尾的链接。本教程使用v0.5.2,这是编写本文时最新的版本。

将二进制文件下载到您的主目录中。

代码语言:txt
复制
cd
wget https://github.com/compose/transporter/releases/download/v0.5.2/transporter-0.5.2-linux-amd64

将其移动到/usr/local/bin或者您的安装目录中。

代码语言:txt
复制
mv transporter-*-linux-amd64 /usr/local/bin/transporter 

接下来赋予权限,让其可执行

代码语言:txt
复制
chmod +x /usr/local/bin/transporter

您可以通过运行二进制文件来测试是否正确设置了Transporter

代码语言:txt
复制
transporter

您会看到帮助输出和版本号:

代码语言:txt
复制
USAGE
  transporter <command> [flags]

COMMANDS
  run       run pipeline loaded from a file
  . . .

VERSION
  0.5.2

为了使用Transporter将数据从MongoDB移动到Elasticsearch,我们需要准备两个工作:MongoDB中有我们想要移动的数据和告诉Transporter如何移动它的通道。下一步创建一些示例数据,但如果您已经有一个想要移动的MongoDB数据库,则可以跳过下一步并直接进入步骤3。

第二步、向MongoDB添加示例数据(可选)

在此步骤中,我们将在MongoDB中创建一个包含单个集合的示例数据库,并向该集合添加一些文档。然后,在本教程的其余部分中,我们将使用Transporter通道迁移和转换此示例数据。

首先,连接到MongoDB数据库。

代码语言:txt
复制
mongo

这会将您的命令提示符会自动更改为mongo>,表示您正在使用MongoDB shell。

从这里,选择要处理的数据库。我们为其命名为my_application

代码语言:txt
复制
use my_application

MongoDB中,您不需要创建数据库或集合。一旦开始将数据添加到您按名称选择的数据库,就会自动创建该数据库。

因此,要创建数据库my\_application,请将两个文档保存到users`集合中:一个代表Sammy Shark,一个代表Gilly Glowfish。这将是我们的测试数据。

代码语言:txt
复制
db.users.save({"firstName": "Sammy", "lastName": "Shark"});
db.users.save({"firstName": "Gilly", "lastName": "Glowfish"});

添加文档后,您可以查询集合users以查看记录。

代码语言:txt
复制
db.users.find().pretty();

输出看起来类似于下面的输出,但_id列是不同的。MongoDB自动添加对象ID以唯一标识集合中的文档。

代码语言:txt
复制
{
  "_id" : ObjectId("59299ac7f80b31254a916456"),
  "firstName" : "Sammy",
  "lastName" : "Shark"
}
{
  "_id" : ObjectId("59299ac7f80b31254a916457"),
  "firstName" : "Gilly",
  "lastName" : "Glowfish"
}

CTRL+C退出MongoDB shell。

接下来,让我们创建一个Transporter通道,将这些数据从MongoDB移动到Elasticsearch。

第三步、创建基本通道

Transporter中的通道默认由命名为pipeline.js的JavaScript文件来定义。在给定源和接收器的情况下,内置的init命令在COR中创建基本配置文件。

使用MongoDB的pipeline.js作为源,将Elasticsearch作为接收器。

代码语言:txt
复制
transporter init mongodb elasticsearch

您将看到以下输出:

代码语言:txt
复制
Writing pipeline.js...

这次您不需要修改pipeline.js,但让我们看看它是如何工作的。

该文件看起来是这样,但你也可以通过cat pipeline.js,less pipeline.js(按q退出)命令来查看文件的内容)。

代码语言:txt
复制
var source = mongodb({
  "uri": "${MONGODB_URI}"
  // "timeout": "30s",
  // "tail": false,
  // "ssl": false,
  // "cacerts": ["/path/to/cert.pem"],
  // "wc": 1,
  // "fsync": false,
  // "bulk": false,
  // "collection_filters": "{}",
  // "read_preference": "Primary"
})

var sink = elasticsearch({
  "uri": "${ELASTICSEARCH_URI}"
  // "timeout": "10s", // defaults to 30s
  // "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service
  // "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service
  // "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearch
})

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")

分别以var sourcevar sink开头MongoDB定义JavaScript变量和Elasticsearch适配器。我们将定义MONGODB\_URIELASTICSEARCH\_URI环境变量,以便在这一步里面适配器后续来使用。

//开头的行是注释行。它们突出显示了您可以为通道设置的一些常见配置选项,这次我们默认不打开。

最后一行连接源和接收器。变量transportert让我们访问我们的通道。我们使用.Source().Save()函数在文件中增加源和接收器,这些源和接收器是提前在文件中用sourcesink变量定义的。

SoCube()SaveE()函数的第三个参数是namespace。传递/.*/最后一个参数意味着我们希望将所有数据从MangGDB传输,并将其保存在RealStCype中的同一命名空间中。

在我们运行此通道之前,我们需要为MongoDB URI和Elasticsearch URI设置环境变量。在我们使用的示例中,两者都使用默认设置在本地托管,但如果您使用的是现有MongoDB或Elasticsearch实例,请确保自定义这些选项。

代码语言:txt
复制
export MONGODB_URI='mongodb://localhost/my_application'
export ELASTICSEARCH_URI='http://localhost:9200/my_application'

现在我们准备好运行通道了。

代码语言:txt
复制
transporter run pipeline.js

你会看到输出结束如下:

代码语言:txt
复制
. . .
INFO[0001] metrics source records: 2                     path=source ts=1522942118483391242
INFO[0001] metrics source/sink records: 2                path="source/sink" ts=1522942118483395960
INFO[0001] exit map[source:mongodb sink:elasticsearch]   ts=1522942118483396878

在第二行和第三行到最后行中,该输出指示源中存在2条记录,并且2条记录被移动到接收器。

为了确认两个记录都被处理,您可以查询my_application数据库的内容进行搜索,而MySQL应用程序数据库现在应该存在新的数据。

代码语言:txt
复制
curl $ELASTICSEARCH_URI/_search?pretty=true 

加?pretty=true参数使输出更易于阅读

代码语言:txt
复制
{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my_application",
        "_type" : "users",
        "_id" : "5ac63e9c6687d9f638ced4fe",
        "_score" : 1.0,
        "_source" : {
          "firstName" : "Gilly",
          "lastName" : "Glowfish"
        }
      },
      {
        "_index" : "my_application",
        "_type" : "users",
        "_id" : "5ac63e986687d9f638ced4fd",
        "_score" : 1.0,
        "_source" : {
          "firstName" : "Sammy",
          "lastName" : "Shark"
        }
      }
    ]
  }
}

MongoDB中的数据库和集合类似于Elasticsearch中的索引和类型。考虑到这一点,你应该看到:

  • _index字段转向my\_applicationMongoDB数据库的名称。
  • _type字段转向usersMongoDB集合的名称。
  • firstNamelastName字段分别填写了"Sammy","Shark"和"Gilly""Glowfish"。

这证实了来自MongoDB的记录都通过Transporter成功处理并加载到Elasticsearch。为了构建这个基本通道,我们将添加一个可以转换输入数据的中间处理步骤。

第四步、创建变换器

顾名思义,变换器在将源数据加载到接收器之前修改源数据。例如,它们允许您添加新字段,删除字段或更改字段的数据。Transporter附带一些预定义的变换器以及对定制变换器的支持。

通常,自定义转换器编写为JavaScript函数并保存在单独的文件中。要使用它们,请在pipeline.js中添加对变换器文件的引用。Transporter包括Otto和Goja JavaScript引擎。因为Goja更新快,我们将在这里使用它。唯一的差异是语法。

创建一个名为transform.js的文件,我们将用它来编写转换函数。

代码语言:txt
复制
nano transform.js

下面是我们将使用的函数,它将创建一个名为FulnNew的新字段,其功能将是firstNamelastName字段连接在一起 ,然后用空格分割,代码如下:

代码语言:txt
复制
function transform(msg) {
    msg.data.fullName = msg.data.firstName + " " + msg.data.lastName;
    return msg
}

让我们来看看这个代码:

  • 第一行,function transform(msg),是函数定义。
  • MSG是一个JavaScript对象,包含源文档的详细信息。我们使用这个对象来访问通过通道的数据。
  • 函数的第一行连接两个现有字段,并将该值分配给新的fullName字段。
  • 函数的最后一行返回新修改的MSG对象,以便使用其余的通道。

保存并关闭文件。

接下来,我们需要修改通道以使用此转换器。打开pipeline.js文件进行编辑。

代码语言:txt
复制
nano pipeline.js 

最后,我们需要给转换函数添加一个调用Transform(),以将转换器添加到Source()Save() 之间的通道中,像这样

代码语言:txt
复制
. . .
t.Source("source", source, "/.*/")
.Transform(goja({"filename": "transform.js"}))
.Save("sink", sink, "/.*/")

传递给Transform()的参数是转换的类型,在这种情况下是Goja。使用goja函数,我们使用其相对路径指定变换器的文件名。

保存并关闭文件。在我们重新运行通道以测试变换器之前,让我们从之前的测试中清除Elasticsearch中的现有数据。

代码语言:txt
复制
curl -XDELETE $ELASTICSEARCH_URI

您将看到正确的输出。

代码语言:txt
复制
{"acknowledged":true}

现在重新运行通道。

代码语言:txt
复制
transporter run pipeline.js 

输出看起来与之前的测试非常相似,您可以在最后几行看到通道是否像以前一样成功完成。我们可以再次运行Elasticsearch查看数据是否是我们想要的格式。

代码语言:txt
复制
curl $ELASTICSEARCH_URI/_search?pretty=true 

您可以在新输出中看到fullName字段:

代码语言:txt
复制
{
  "took" : 9,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my_application",
        "_type" : "users",
        "_id" : "5ac63e9c6687d9f638ced4fe",
        "_score" : 1.0,
        "_source" : {
          "firstName" : "Gilly",
          "fullName" : "Gilly Glowfish",
          "lastName" : "Glowfish"
        }
      },
      {
        "_index" : "my_application",
        "_type" : "users",
        "_id" : "5ac63e986687d9f638ced4fd",
        "_score" : 1.0,
        "_source" : {
          "firstName" : "Sammy",
          "fullName" : "Sammy Shark",
          "lastName" : "Shark"
        }
      }
    ]
  }
}

fullName已在两个文档中添加了正确设置值的字段。现在我们知道如何向Transporter管道添加自定义转换。

结论

您已经构建了一个带有转换器的基本Transporter通道,用于将数据从MongoDB复制和修改到Elasticsearch。您可以以相同的方式应用更复杂的转换,在同一通道中链接多个转换等等。MongoDB和Elasticsearch只是Transporter支持的两个适配器。它还支持flat 数据或Postgres等SQL数据库以及许多其他数据源。

购买两台服务器试试吧:https://cloud.tencent.com/product/cvm,很简单哦~


参考文献:《How To Sync Transformed Data from MongoDB to Elasticsearch with Transporter on Ubuntu 16.04 》

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 准备
  • 第一步、安装Transporter
  • 第二步、向MongoDB添加示例数据(可选)
  • 第三步、创建基本通道
  • 第四步、创建变换器
  • 结论
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档