如何在CVM上同步自建数据库的数据?

简介

Transporter是一种用于在不同数据存储之间移动数据的开源工具。开发人员经常为诸如跨数据库移动数据,将数据从文件移动到数据库或反之亦然等任务编写一次性脚本,但使用像Transporter这样的工具有几个优点。

在Transporter中,您构建通道,这些通道定义从源(读取数据的位置)到接收器(写入数据的位置)的数据流。源和接收器可以是SQL或NoSQL数据库,flat 数据或其他数据。Transporter使用可插拔扩展的适配器与这些资源进行通信,默认情况下,该项目包括几个适用于常用数据库的适配器。

除了移动数据之外,Transporter还允许您在使用变换器通过通道时更改数据。与适配器一样,默认情况下包含多个变换器。您也可以编写自己的变换器来自定义数据修改。

在本教程中,我们将介绍使用Transporter的内置适配器和用JavaScript编写的自定义转换器将数据从MongoDB数据库移动和处理到Elasticsearch的示例。

准备

要学习本教程,您需要:

  • 购买腾讯云CVM Ubuntu 16.04服务器,可以使用sudo命令的非root账户。
  • 在购买好的服务器上安装好MongoDB、Elasticsearch,相关安装教程可以参考腾讯云开发者实验室
  • Transporter通道是用JavaScript编写的,但是您不需要任何JavaScript知识或经验来学习本教程。

第一步、安装Transporter

Transporter为大多数常见操作系统提供二进制文件。Ubuntu的安装过程包括两个步骤:

  • 下载Linux二进制文件
  • 想办法使其可执行

首先,从GartHub上的Transporter项目页面获取最新版本的链接。复制以-linux-amd6结尾的链接。本教程使用v0.5.2,这是编写本文时最新的版本。

将二进制文件下载到您的主目录中。

cd
wget https://github.com/compose/transporter/releases/download/v0.5.2/transporter-0.5.2-linux-amd64

将其移动到/usr/local/bin或者您的安装目录中。

mv transporter-*-linux-amd64 /usr/local/bin/transporter 

接下来赋予权限,让其可执行

chmod +x /usr/local/bin/transporter

您可以通过运行二进制文件来测试是否正确设置了Transporter

transporter

您会看到帮助输出和版本号:

USAGE
  transporter <command> [flags]

COMMANDS
  run       run pipeline loaded from a file
  . . .

VERSION
  0.5.2

为了使用Transporter将数据从MongoDB移动到Elasticsearch,我们需要准备两个工作:MongoDB中有我们想要移动的数据和告诉Transporter如何移动它的通道。下一步创建一些示例数据,但如果您已经有一个想要移动的MongoDB数据库,则可以跳过下一步并直接进入步骤3。

第二步、向MongoDB添加示例数据(可选)

在此步骤中,我们将在MongoDB中创建一个包含单个集合的示例数据库,并向该集合添加一些文档。然后,在本教程的其余部分中,我们将使用Transporter通道迁移和转换此示例数据。

首先,连接到MongoDB数据库。

mongo

这会将您的命令提示符会自动更改为mongo>,表示您正在使用MongoDB shell。

从这里,选择要处理的数据库。我们为其命名为my_application

use my_application

MongoDB中,您不需要创建数据库或集合。一旦开始将数据添加到您按名称选择的数据库,就会自动创建该数据库。

因此,要创建数据库my\_application,请将两个文档保存到users`集合中:一个代表Sammy Shark,一个代表Gilly Glowfish。这将是我们的测试数据。

db.users.save({"firstName": "Sammy", "lastName": "Shark"});
db.users.save({"firstName": "Gilly", "lastName": "Glowfish"});

添加文档后,您可以查询集合users以查看记录。

db.users.find().pretty();

输出看起来类似于下面的输出,但_id列是不同的。MongoDB自动添加对象ID以唯一标识集合中的文档。

{
  "_id" : ObjectId("59299ac7f80b31254a916456"),
  "firstName" : "Sammy",
  "lastName" : "Shark"
}
{
  "_id" : ObjectId("59299ac7f80b31254a916457"),
  "firstName" : "Gilly",
  "lastName" : "Glowfish"
}

CTRL+C退出MongoDB shell。

接下来,让我们创建一个Transporter通道,将这些数据从MongoDB移动到Elasticsearch。

第三步、创建基本通道

Transporter中的通道默认由命名为pipeline.js的JavaScript文件来定义。在给定源和接收器的情况下,内置的init命令在COR中创建基本配置文件。

使用MongoDB的pipeline.js作为源,将Elasticsearch作为接收器。

transporter init mongodb elasticsearch

您将看到以下输出:

Writing pipeline.js...

这次您不需要修改pipeline.js,但让我们看看它是如何工作的。

该文件看起来是这样,但你也可以通过cat pipeline.js,less pipeline.js(按q退出)命令来查看文件的内容)。

var source = mongodb({
  "uri": "${MONGODB_URI}"
  // "timeout": "30s",
  // "tail": false,
  // "ssl": false,
  // "cacerts": ["/path/to/cert.pem"],
  // "wc": 1,
  // "fsync": false,
  // "bulk": false,
  // "collection_filters": "{}",
  // "read_preference": "Primary"
})

var sink = elasticsearch({
  "uri": "${ELASTICSEARCH_URI}"
  // "timeout": "10s", // defaults to 30s
  // "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service
  // "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service
  // "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearch
})

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")

分别以var sourcevar sink开头MongoDB定义JavaScript变量和Elasticsearch适配器。我们将定义MONGODB\_URIELASTICSEARCH\_URI环境变量,以便在这一步里面适配器后续来使用。

//开头的行是注释行。它们突出显示了您可以为通道设置的一些常见配置选项,这次我们默认不打开。

最后一行连接源和接收器。变量transportert让我们访问我们的通道。我们使用.Source().Save()函数在文件中增加源和接收器,这些源和接收器是提前在文件中用sourcesink变量定义的。

SoCube()SaveE()函数的第三个参数是namespace。传递/.*/最后一个参数意味着我们希望将所有数据从MangGDB传输,并将其保存在RealStCype中的同一命名空间中。

在我们运行此通道之前,我们需要为MongoDB URI和Elasticsearch URI设置环境变量。在我们使用的示例中,两者都使用默认设置在本地托管,但如果您使用的是现有MongoDB或Elasticsearch实例,请确保自定义这些选项。

export MONGODB_URI='mongodb://localhost/my_application'
export ELASTICSEARCH_URI='http://localhost:9200/my_application'

现在我们准备好运行通道了。

transporter run pipeline.js

你会看到输出结束如下:

. . .
INFO[0001] metrics source records: 2                     path=source ts=1522942118483391242
INFO[0001] metrics source/sink records: 2                path="source/sink" ts=1522942118483395960
INFO[0001] exit map[source:mongodb sink:elasticsearch]   ts=1522942118483396878

在第二行和第三行到最后行中,该输出指示源中存在2条记录,并且2条记录被移动到接收器。

为了确认两个记录都被处理,您可以查询my_application数据库的内容进行搜索,而MySQL应用程序数据库现在应该存在新的数据。

curl $ELASTICSEARCH_URI/_search?pretty=true 

加?pretty=true参数使输出更易于阅读

{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my_application",
        "_type" : "users",
        "_id" : "5ac63e9c6687d9f638ced4fe",
        "_score" : 1.0,
        "_source" : {
          "firstName" : "Gilly",
          "lastName" : "Glowfish"
        }
      },
      {
        "_index" : "my_application",
        "_type" : "users",
        "_id" : "5ac63e986687d9f638ced4fd",
        "_score" : 1.0,
        "_source" : {
          "firstName" : "Sammy",
          "lastName" : "Shark"
        }
      }
    ]
  }
}

MongoDB中的数据库和集合类似于Elasticsearch中的索引和类型。考虑到这一点,你应该看到:

  • _index字段转向my\_applicationMongoDB数据库的名称。
  • _type字段转向usersMongoDB集合的名称。
  • firstNamelastName字段分别填写了"Sammy","Shark"和"Gilly""Glowfish"。

这证实了来自MongoDB的记录都通过Transporter成功处理并加载到Elasticsearch。为了构建这个基本通道,我们将添加一个可以转换输入数据的中间处理步骤。

第四步、创建变换器

顾名思义,变换器在将源数据加载到接收器之前修改源数据。例如,它们允许您添加新字段,删除字段或更改字段的数据。Transporter附带一些预定义的变换器以及对定制变换器的支持。

通常,自定义转换器编写为JavaScript函数并保存在单独的文件中。要使用它们,请在pipeline.js中添加对变换器文件的引用。Transporter包括Otto和Goja JavaScript引擎。因为Goja更新快,我们将在这里使用它。唯一的差异是语法。

创建一个名为transform.js的文件,我们将用它来编写转换函数。

nano transform.js

下面是我们将使用的函数,它将创建一个名为FulnNew的新字段,其功能将是firstNamelastName字段连接在一起 ,然后用空格分割,代码如下:

function transform(msg) {
    msg.data.fullName = msg.data.firstName + " " + msg.data.lastName;
    return msg
}

让我们来看看这个代码:

  • 第一行,function transform(msg),是函数定义。
  • MSG是一个JavaScript对象,包含源文档的详细信息。我们使用这个对象来访问通过通道的数据。
  • 函数的第一行连接两个现有字段,并将该值分配给新的fullName字段。
  • 函数的最后一行返回新修改的MSG对象,以便使用其余的通道。

保存并关闭文件。

接下来,我们需要修改通道以使用此转换器。打开pipeline.js文件进行编辑。

nano pipeline.js 

最后,我们需要给转换函数添加一个调用Transform(),以将转换器添加到Source()Save() 之间的通道中,像这样

. . .
t.Source("source", source, "/.*/")
.Transform(goja({"filename": "transform.js"}))
.Save("sink", sink, "/.*/")

传递给Transform()的参数是转换的类型,在这种情况下是Goja。使用goja函数,我们使用其相对路径指定变换器的文件名。

保存并关闭文件。在我们重新运行通道以测试变换器之前,让我们从之前的测试中清除Elasticsearch中的现有数据。

curl -XDELETE $ELASTICSEARCH_URI

您将看到正确的输出。

{"acknowledged":true}

现在重新运行通道。

transporter run pipeline.js 

输出看起来与之前的测试非常相似,您可以在最后几行看到通道是否像以前一样成功完成。我们可以再次运行Elasticsearch查看数据是否是我们想要的格式。

curl $ELASTICSEARCH_URI/_search?pretty=true 

您可以在新输出中看到fullName字段:

{
  "took" : 9,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my_application",
        "_type" : "users",
        "_id" : "5ac63e9c6687d9f638ced4fe",
        "_score" : 1.0,
        "_source" : {
          "firstName" : "Gilly",
          "fullName" : "Gilly Glowfish",
          "lastName" : "Glowfish"
        }
      },
      {
        "_index" : "my_application",
        "_type" : "users",
        "_id" : "5ac63e986687d9f638ced4fd",
        "_score" : 1.0,
        "_source" : {
          "firstName" : "Sammy",
          "fullName" : "Sammy Shark",
          "lastName" : "Shark"
        }
      }
    ]
  }
}

fullName已在两个文档中添加了正确设置值的字段。现在我们知道如何向Transporter管道添加自定义转换。

结论

您已经构建了一个带有转换器的基本Transporter通道,用于将数据从MongoDB复制和修改到Elasticsearch。您可以以相同的方式应用更复杂的转换,在同一通道中链接多个转换等等。MongoDB和Elasticsearch只是Transporter支持的两个适配器。它还支持flat 数据或Postgres等SQL数据库以及许多其他数据源。

购买两台服务器试试吧:https://cloud.tencent.com/product/cvm,很简单哦~


参考文献:《How To Sync Transformed Data from MongoDB to Elasticsearch with Transporter on Ubuntu 16.04 》

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏技术碎碎念

OS存储器管理(二)

离散分配 分页(Paging),分段,段页式 一、分页 一个进程的物理地址可以是非连续的; 将物理内存分成固定大小的块,称为块(frame); 将逻辑内存分为同...

3358
来自专栏友弟技术工作室

vim精简版教程

vim编辑器 ? vim trree 编辑器的分类 文本编辑器,ASCII码 字处理器:word 全称 vi:Visual interface vim: Vis...

2275
来自专栏用户2442861的专栏

牛人整理分享的面试知识:操作系统、计算机网络、设计模式、Linux编程,数据结构总结

牛人整理分享的面试知识:操作系统、计算机网络、设计模式、Linux编程,数据结构总结

2744
来自专栏潇涧技术专栏

Head First Systrace

深入浅出systrace(1)systrace的简单介绍和systrace工具源码分析。

1751
来自专栏智能大石头

简易远程消息交换协议SRMP

经过十多年实战经验积累以及多方共同讨论,新生命团队(https://github.com/newlifex)制订了一种简单而又具有较好扩展性的RPC(Remot...

856
来自专栏贾老师の博客

【笔记】ejoy2d —— spritepack

932
来自专栏玄魂工作室

看代码学PHP渗透(3) - 实例化任意对象漏洞

大家好,我们是红日安全-代码审计小组。最近我们小组正在做一个PHP代码审计的项目,供大家学习交流,我们给这个项目起了一个名字叫 PHP-Audit-Labs 。...

5811
来自专栏闻道于事

vim命令

751
来自专栏崔庆才的专栏

用Flask+Aiohttp+Redis维护动态代理池

4605
来自专栏非典型程序猿

从源码透析gRPC调用原理

gRPC是如何work的,清楚的理解其调用逻辑,对于我们更好、更深入的使用gRPC很有必要。因此我们必须深度解析下gRPC的实现逻辑,在本文中,将分别从客户端和...

3.1K8

扫码关注云+社区