前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >5 分钟内造个物联网 Kafka 管道

5 分钟内造个物联网 Kafka 管道

作者头像
未来守护者
发布2018-04-25 11:50:37
2.1K0
发布2018-04-25 11:50:37
举报
文章被收录于专栏:安全领域安全领域

原文作者:Seth Luersen

原文地址:https://dzone.com/articles/creating-an-iot-kafka-pipeline-in-under-five-minutes

在此前的 MemSQL 直播里,我们讨论了现代企业能如何轻松地采用新的数据管理工具来管理数据的规模、增长以及复杂性。然后我们演示了基于 Apache Kafka 和 MemSQL 来构建实时的、交互式的数据管道的方法。这些数据管道能为数百万用户采集、处理,并输出海量的数据。

在直播期间,我们还分享了这些方法:

  • 使用新型工具构建数据管道
  • 让数据工作流能够为基于数据管道的机器学习和预测分析提供支持
  • 在 5 分钟内用 Apache Kafka 和 MemSQL Pipelines 构建实时的数据管道

我们收到了直播观众发来的其他问题。在此我们也想分享一下这些问题的答案。

问题:MemSQL 是 NoSQL 数据库还是关系数据库管理系统(RDBMS)?

MemSQL 是一个新式的、实现了内存级别的优化的、能进行大规模并行处理的,无共享的实时数据库。MemSQL 将数据存储在表里面,并支持了标准的 SQL 数据类型。地理空间和 JSON 数据类型是 MemSQL 中的一等公民。MemSQL 能用来存储和查询那些结构化、半结构化或非结构化的数据。

问题:MemSQL 的最低内存要求是多少?

MemSQL 是一个由一个或多个节点组成的分布式系统。你可以在我们的文档中找到更多和系统和硬件要求有关的信息。

问题:将 JSON 加载到 MemSQL 里的方法是否跟 MongoDB 相似?

MongoDB 会在底层以一种基于二进制编码的格式(称为 BSON)来表示 JSON 文档。BSON 也是 MongoDB 的消息传输格式(wire format)。与此同时,JSON 是 MemSQL 的一等公民。MemSQL 也会用 JSON 标准的数据类型来验证 JSON。MemSQL 会在底层将验证过的 JSON 存储为文本。如果要在 MemSQL 里面保存 BSON 格式的数据,那么相应的可用 SQL 数据类型有 VARBINARY 或其变体:LONGBLOB、MEDIUMBLOB、BLOB 还有 TINYBLOB。

问题:运行 MemSQL 和 Apache Kafka 需要什么样的基础设施?

MemSQL 跟 Apache Kafka 一样是个分布式系统,由一个或多个节点组成集群来运行。因此,怎么说也要一个独立的 Apache Kafka 生产者以及中介者,以及由一个主汇聚器和一个叶节点组成的独立 MemSQL 集群来作为这个系统的基础设施。

MemSQL Pipeline 在默认情况下会将从 Apache Kafka 的某个订阅主题那里获取的流数据导入到 MemSQL 的叶节点里。MemSQL 叶节点会包含单独的数据库分区。每个数据库分区都会把从 Kafka 流获得的数据存储到由数据指定的目标表中。针对特定订阅主题的 MemSQL 数据库分区数量与 Kafka 中介者的分区数量之间的对应关系决定了最佳的性能。

问题:MemSQL 中是否有处理从 Apache Kafka 获得的数据的消费者的概念?

Apache Kafka 采用了更传统的,并且为大多数消息传递系统所共享的一种设计方式。在这一方式里,数据会被生产者推送给中介者,接着消费者会从中介者处获得数据。在这种基于推送的系统中,当消费者处理数据的速度一时跟不上生产者产生速度的速度时,消费者也能慢慢赶上。一个接入到 Apache Kafka 的 MemSQL 管道会为 Kafka 用上一个管道提取器。这个提取器就是一个 Kafka 的消费者。

问题:使用 Apache Kafka 提取器的 MemSQL 管道是否仅能把数据导入到一个 “行存储” 表里面?

MemSQL Pipeline 可以将数据并行地大量导入到分布式的表中。在 MemSQL 中,表可以是分布式的,也可以是非分布式的(即引用表)。表的存储类型有两种:内存级别的行存储以及列存储。所有列存储表都有一个隐藏的,存储在内存的行存储表。MemSQL 会自动地将内存里的行存储里面的行分开存储到列存储里面。所有列存储表的数据,包括隐藏的行存储表,都是可查询的。

问题:是否可以将数据从内存中的行存储表移动到列存储表中?

可以,并且用一行简单的 SQL 语句就能搞定:

代码语言:sql
复制
INSERT INTO columnstore_table SELECT * FROM rowstore_table;

问题:在本次在线研讨会的演示里面用到的数据是在哪导入的?它们是不是预先生成的数据?

这次在线研讨会主要演示的是 Apache Kafka 的 MemSQL 管道。其中 Apache Kafka 集群会由 MemSQL 来主管。其中会有个 Python 程序来生成数据并将其写入到一个 Kafka 生产者里,后者会基于 adtech 这一订阅主题来发送消息。然后 MemSQL 管道会消费从 Kafka 中介者端点的这个订阅主题得到的数据。

问题:若要调整或更改数据模式,那会发生什么情况?

你可以用数据定义语言(DDL)ALTER TABLE ... 语句修改 MemSQL 里的表。在默认情况下,MemSQL 会支持在线执行 ALTER 语句。由于 MemSQL 管道也是用 DDL 编写的,因此你也能用 ALTER PIPELINE ... 语句来改造 MemSQL 管道。在通常情况下,处理模式更改的过程只用这些语句就足够了:

代码语言:sql
复制
STOP PIPELINE mypipeline;
ALTER TABLE mytable… ;
ALTER PIPELINE mypipeline…;
TEST PIPELINE mypipeline;
START PIPELINE mypipeline;

问题:能不能给段代码来说明 Apache Kafka 的 MemSQL 管道转换 JSON 消息的方式?

这里以下面这个 Kafka 消息中的一个简单的 JSON 为例:

代码语言:json
复制
{ 
    "id": 1,
    "item": "cherry",
    "quantity": 1
}

下面就是一个用 Python 写的转换脚本。它会从 JSON 里面提取 id 属性:

代码语言:python
复制
#!/usr/bin/env python 
import struct
import sys
import json
def transform_records():
    while True:
        byte_len = sys.stdin.read(8) 
        if len(byte_len) == 8:
            byte_len = struct.unpack("L", byte_len)[0] 
            result = sys.stdin.read(byte_len)
            yield result
        else: 
            return
for l in transform_records():
    parsed_json = json.loads(l)
    sys.stdout.write("%s\t%s\n" % (parsed_json["id"], l))

问题:如何使用 MemSQL 管道将复杂的、一对多的、有很多外键的那种记录保存起来?

MemSQL 6 不会执行外键约束,也不会为触发器提供支持。MemSQL 管道也仅支持将数据加载到单个表里面。不过,最近MemSQL 的工程师演示了 MemSQL 6.5 的 beta 版本,其中 MemSQL Pipeline 可以将数据加载到存储过程(stored procedure)里面!新版本的架构也能让存储过程中的条件逻辑来处理复杂场景,例如将数据导入到相关的表里面。请参阅回顾使用 MemSQL 来开发的那一夜这篇博客来了解更多关于使用 MemSQL 管道将流数据传输到存储过程的细节。

问题:Apache Kafka 中的数据常用二进制形式(比如 Apache Avro)来表示,对此 MemSQL 又如何支持由用户定义的解码?

MemSQL 管道支持导入 CSV 或 TSV 格式的数据。导入从 Kafka 的某个订阅主题拿到的 Avro 压缩数据的一种方法是用 Apache Spark 来创建一个数据管道。Spark 的流处理功能能让 Spark 直接消费 Kafka 的某个订阅主题下的消息。然后再用上 MemSQL Spark 连接器就可以解码二进制格式的数据并将数据直接保存到 MemSQL 中。不妨在我们的 MemSQL Spark 连接器指南中了解有关使用 Spark 的更多信息。

另一种方法是使用 Avro to JSON 转换器。转换之后的 Kafka 消息基本上是一个二进制 JSON 对象。在 MemSQL 管道中还能使用很多由 Linux 提供的能高效解析 JSON 的 API 来转换 JSON。MemSQL 管道还支持使用 jq —— 一种轻量级且灵活的命令行 JSON 处理器。不妨阅读 JSON 流处理和数据提取的未来以了解更多信息。

JSON 是 MemSQL 的一等公民。凭借内置的 JSON 函数,MemSQL 可以将 JSON 键值对解析为持久化存储的计算列(computing column)。MemSQL 也支持对计算列的索引。使用 MemSQL,你就可以使用标准的 SQL 语句来轻松地定位并解析 JSON 了。

问题:MemSQL 能不能自动处理背压问题?

在生产环境中的大型 Apache Kafka 集群能够以按每秒数百万条消息的高速度有序地传递消息。在这种数据数量多,传递速度快的情景下,许多由 API 驱动的用户处理起数据来都不能跟上这种速度,因此未处理的数据就会越积越多。这种现象就被称为背压(back-pressure)。

MemSQL 是一个新式的、实现了内存级别的优化的、能进行大规模并行处理的,无共享的实时数据库,MemSQL Pipeline 和 Apache Kafka 可以以极高的容量和极高的速率轻松地消费并导入消息。MemSQL Pipeline 在默认情况下会将从 Apache Kafka 的某个订阅主题那里获取的流数据导入到 MemSQL 的叶节点里。MemSQL 叶节点会包含单独的数据库分区。每个数据库分区都会把从 Kafka 流获得的数据存储到由数据指定的目标表中。

给定主题的 MemSQL 数据库分区数量与 Kafka 代理分区数量之间的并行性决定了最佳性能,因为这一并行性决定了总批量大小。

针对特定订阅主题的 MemSQL 数据库分区数量与 Kafka 中介者的分区数量之间的对应关系决定了最佳性能,因为这一对应关系会决定系统总共能处理多大批量的数据。MemSQL 会记录 Kafka 最早还有最近传递数据速度相对处理数据速度的偏移量,然后将结果记录在 information_schema.PIPELINES_BATCHES 这个表里。

问题:Apache Kafka 相比 Amazon S3 有什么优势?

Apache Kafka 是一个新型的分布式消息传递系统。Amazon S3 是用于存储和找回文件的一种云对象存储系统。MemSQL 管道为 Apache Kafka 和 Amazon S3 都提供了相应的管道提取器。对这两种提取器,数据导入的并行程度都由 MemSQL 中的数据库分区数决定。

就 S3 来说,MemSQL 中的数据库分区数等于每次在管道中处理的数据批次中的文件数。每个数据库分区会从 S3 存储桶中的文件夹里面提取特定的 S3 文件。这些文件是能被压缩的。现在已知的 Amazon S3 对 GET 请求速度的限制是从每秒 100 个请求开始算起的。至于 S3 的定价模型则是以数据输出量为基础的。请查看 13 亿行纽约出租车数据这篇博客来详细地了解 MemSQL S3 管道的规模。

Apache Kafka 集群能支持每秒数百万次的读写操作。当然,这也很依赖于集群所处理的数据包的大小。随着数据包大小的增加,消息传递的吞吐量也会下降。即便如此,作为一个分布式的系统,Apache Kafka 是可以根据实际需求来扩展的。

问题:如何获取 MemSQL 的 O'Reilly eBook 三部曲?

我们的 O'Reilly 电子书是能下载得到的。它们可以在 MemSQL O'Reilly Trilogy 这一网站里面找到,这三步曲的内容涵盖了预测分析还有人工智能等主题。

问题:我需要做些什么来上手 Apache Kafka 和 MemSQL?

MemSQL 群集可以用 Docker,Amazon Web Services 或 Microsoft Azure 来独立部署。不妨看看我们的快速上手教程来了解一下。

要想了解快速构建 MemSQL Pipeline 以及 Apache Kafka 环境的方法,可以回顾一下我们的快速上手 Kafka 管道 教程,也可以点击这个链接来看一看在 5 分钟内造个 Kafka 管道的视频直播录像。如果想要一个将 MemSQL、MemSQL Pipelines,还有 Apache Kafka 结合起来的一个 demo,那可以在 MemSQL Demo 这里登记并获取。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题:MemSQL 是 NoSQL 数据库还是关系数据库管理系统(RDBMS)?
  • 问题:MemSQL 的最低内存要求是多少?
  • 问题:将 JSON 加载到 MemSQL 里的方法是否跟 MongoDB 相似?
  • 问题:运行 MemSQL 和 Apache Kafka 需要什么样的基础设施?
  • 问题:MemSQL 中是否有处理从 Apache Kafka 获得的数据的消费者的概念?
  • 问题:使用 Apache Kafka 提取器的 MemSQL 管道是否仅能把数据导入到一个 “行存储” 表里面?
  • 问题:是否可以将数据从内存中的行存储表移动到列存储表中?
  • 问题:在本次在线研讨会的演示里面用到的数据是在哪导入的?它们是不是预先生成的数据?
  • 问题:若要调整或更改数据模式,那会发生什么情况?
  • 问题:如何使用 MemSQL 管道将复杂的、一对多的、有很多外键的那种记录保存起来?
  • 问题:Apache Kafka 中的数据常用二进制形式(比如 Apache Avro)来表示,对此 MemSQL 又如何支持由用户定义的解码?
  • 问题:MemSQL 能不能自动处理背压问题?
  • 问题:Apache Kafka 相比 Amazon S3 有什么优势?
  • 问题:如何获取 MemSQL 的 O'Reilly eBook 三部曲?
  • 问题:我需要做些什么来上手 Apache Kafka 和 MemSQL?
相关产品与服务
物联网
腾讯连连是腾讯云物联网全新商业品牌,它涵盖一站式物联网平台 IoT Explorer,连连官方微信小程序和配套的小程序 SDK、插件和开源 App,并整合腾讯云内优势产品能力,如大数据、音视频、AI等。同时,它打通腾讯系 C 端内容资源,如QQ音乐、微信支付、微保、微众银行、医疗健康等生态应用入口。提供覆盖“云-管-边-端”的物联网基础设施,面向“消费物联”和 “产业物联”两大赛道提供全方位的物联网产品和解决方案,助力企业高效实现数字化转型。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档