原文作者:Seth Luersen
原文地址:https://dzone.com/articles/creating-an-iot-kafka-pipeline-in-under-five-minutes
在此前的 MemSQL 直播里,我们讨论了现代企业能如何轻松地采用新的数据管理工具来管理数据的规模、增长以及复杂性。然后我们演示了基于 Apache Kafka 和 MemSQL 来构建实时的、交互式的数据管道的方法。这些数据管道能为数百万用户采集、处理,并输出海量的数据。
在直播期间,我们还分享了这些方法:
我们收到了直播观众发来的其他问题。在此我们也想分享一下这些问题的答案。
MemSQL 是一个新式的、实现了内存级别的优化的、能进行大规模并行处理的,无共享的实时数据库。MemSQL 将数据存储在表里面,并支持了标准的 SQL 数据类型。地理空间和 JSON 数据类型是 MemSQL 中的一等公民。MemSQL 能用来存储和查询那些结构化、半结构化或非结构化的数据。
MemSQL 是一个由一个或多个节点组成的分布式系统。你可以在我们的文档中找到更多和系统和硬件要求有关的信息。
MongoDB 会在底层以一种基于二进制编码的格式(称为 BSON)来表示 JSON 文档。BSON 也是 MongoDB 的消息传输格式(wire format)。与此同时,JSON 是 MemSQL 的一等公民。MemSQL 也会用 JSON 标准的数据类型来验证 JSON。MemSQL 会在底层将验证过的 JSON 存储为文本。如果要在 MemSQL 里面保存 BSON 格式的数据,那么相应的可用 SQL 数据类型有 VARBINARY 或其变体:LONGBLOB、MEDIUMBLOB、BLOB 还有 TINYBLOB。
MemSQL 跟 Apache Kafka 一样是个分布式系统,由一个或多个节点组成集群来运行。因此,怎么说也要一个独立的 Apache Kafka 生产者以及中介者,以及由一个主汇聚器和一个叶节点组成的独立 MemSQL 集群来作为这个系统的基础设施。
MemSQL Pipeline 在默认情况下会将从 Apache Kafka 的某个订阅主题那里获取的流数据导入到 MemSQL 的叶节点里。MemSQL 叶节点会包含单独的数据库分区。每个数据库分区都会把从 Kafka 流获得的数据存储到由数据指定的目标表中。针对特定订阅主题的 MemSQL 数据库分区数量与 Kafka 中介者的分区数量之间的对应关系决定了最佳的性能。
Apache Kafka 采用了更传统的,并且为大多数消息传递系统所共享的一种设计方式。在这一方式里,数据会被生产者推送给中介者,接着消费者会从中介者处获得数据。在这种基于推送的系统中,当消费者处理数据的速度一时跟不上生产者产生速度的速度时,消费者也能慢慢赶上。一个接入到 Apache Kafka 的 MemSQL 管道会为 Kafka 用上一个管道提取器。这个提取器就是一个 Kafka 的消费者。
MemSQL Pipeline 可以将数据并行地大量导入到分布式的表中。在 MemSQL 中,表可以是分布式的,也可以是非分布式的(即引用表)。表的存储类型有两种:内存级别的行存储以及列存储。所有列存储表都有一个隐藏的,存储在内存的行存储表。MemSQL 会自动地将内存里的行存储里面的行分开存储到列存储里面。所有列存储表的数据,包括隐藏的行存储表,都是可查询的。
可以,并且用一行简单的 SQL 语句就能搞定:
INSERT INTO columnstore_table SELECT * FROM rowstore_table;
这次在线研讨会主要演示的是 Apache Kafka 的 MemSQL 管道。其中 Apache Kafka 集群会由 MemSQL 来主管。其中会有个 Python 程序来生成数据并将其写入到一个 Kafka 生产者里,后者会基于 adtech 这一订阅主题来发送消息。然后 MemSQL 管道会消费从 Kafka 中介者端点的这个订阅主题得到的数据。
你可以用数据定义语言(DDL)ALTER TABLE ...
语句修改 MemSQL 里的表。在默认情况下,MemSQL 会支持在线执行 ALTER
语句。由于 MemSQL 管道也是用 DDL 编写的,因此你也能用 ALTER PIPELINE ...
语句来改造 MemSQL 管道。在通常情况下,处理模式更改的过程只用这些语句就足够了:
STOP PIPELINE mypipeline;
ALTER TABLE mytable… ;
ALTER PIPELINE mypipeline…;
TEST PIPELINE mypipeline;
START PIPELINE mypipeline;
问题:能不能给段代码来说明 Apache Kafka 的 MemSQL 管道转换 JSON 消息的方式?
这里以下面这个 Kafka 消息中的一个简单的 JSON 为例:
{
"id": 1,
"item": "cherry",
"quantity": 1
}
下面就是一个用 Python 写的转换脚本。它会从 JSON 里面提取 id 属性:
#!/usr/bin/env python
import struct
import sys
import json
def transform_records():
while True:
byte_len = sys.stdin.read(8)
if len(byte_len) == 8:
byte_len = struct.unpack("L", byte_len)[0]
result = sys.stdin.read(byte_len)
yield result
else:
return
for l in transform_records():
parsed_json = json.loads(l)
sys.stdout.write("%s\t%s\n" % (parsed_json["id"], l))
MemSQL 6 不会执行外键约束,也不会为触发器提供支持。MemSQL 管道也仅支持将数据加载到单个表里面。不过,最近MemSQL 的工程师演示了 MemSQL 6.5 的 beta 版本,其中 MemSQL Pipeline 可以将数据加载到存储过程(stored procedure)里面!新版本的架构也能让存储过程中的条件逻辑来处理复杂场景,例如将数据导入到相关的表里面。请参阅回顾使用 MemSQL 来开发的那一夜这篇博客来了解更多关于使用 MemSQL 管道将流数据传输到存储过程的细节。
MemSQL 管道支持导入 CSV 或 TSV 格式的数据。导入从 Kafka 的某个订阅主题拿到的 Avro 压缩数据的一种方法是用 Apache Spark 来创建一个数据管道。Spark 的流处理功能能让 Spark 直接消费 Kafka 的某个订阅主题下的消息。然后再用上 MemSQL Spark 连接器就可以解码二进制格式的数据并将数据直接保存到 MemSQL 中。不妨在我们的 MemSQL Spark 连接器指南中了解有关使用 Spark 的更多信息。
另一种方法是使用 Avro to JSON 转换器。转换之后的 Kafka 消息基本上是一个二进制 JSON 对象。在 MemSQL 管道中还能使用很多由 Linux 提供的能高效解析 JSON 的 API 来转换 JSON。MemSQL 管道还支持使用 jq —— 一种轻量级且灵活的命令行 JSON 处理器。不妨阅读 JSON 流处理和数据提取的未来以了解更多信息。
JSON 是 MemSQL 的一等公民。凭借内置的 JSON 函数,MemSQL 可以将 JSON 键值对解析为持久化存储的计算列(computing column)。MemSQL 也支持对计算列的索引。使用 MemSQL,你就可以使用标准的 SQL 语句来轻松地定位并解析 JSON 了。
在生产环境中的大型 Apache Kafka 集群能够以按每秒数百万条消息的高速度有序地传递消息。在这种数据数量多,传递速度快的情景下,许多由 API 驱动的用户处理起数据来都不能跟上这种速度,因此未处理的数据就会越积越多。这种现象就被称为背压(back-pressure)。
MemSQL 是一个新式的、实现了内存级别的优化的、能进行大规模并行处理的,无共享的实时数据库,MemSQL Pipeline 和 Apache Kafka 可以以极高的容量和极高的速率轻松地消费并导入消息。MemSQL Pipeline 在默认情况下会将从 Apache Kafka 的某个订阅主题那里获取的流数据导入到 MemSQL 的叶节点里。MemSQL 叶节点会包含单独的数据库分区。每个数据库分区都会把从 Kafka 流获得的数据存储到由数据指定的目标表中。
给定主题的 MemSQL 数据库分区数量与 Kafka 代理分区数量之间的并行性决定了最佳性能,因为这一并行性决定了总批量大小。
针对特定订阅主题的 MemSQL 数据库分区数量与 Kafka 中介者的分区数量之间的对应关系决定了最佳性能,因为这一对应关系会决定系统总共能处理多大批量的数据。MemSQL 会记录 Kafka 最早还有最近传递数据速度相对处理数据速度的偏移量,然后将结果记录在 information_schema.PIPELINES_BATCHES
这个表里。
Apache Kafka 是一个新型的分布式消息传递系统。Amazon S3 是用于存储和找回文件的一种云对象存储系统。MemSQL 管道为 Apache Kafka 和 Amazon S3 都提供了相应的管道提取器。对这两种提取器,数据导入的并行程度都由 MemSQL 中的数据库分区数决定。
就 S3 来说,MemSQL 中的数据库分区数等于每次在管道中处理的数据批次中的文件数。每个数据库分区会从 S3 存储桶中的文件夹里面提取特定的 S3 文件。这些文件是能被压缩的。现在已知的 Amazon S3 对 GET 请求速度的限制是从每秒 100 个请求开始算起的。至于 S3 的定价模型则是以数据输出量为基础的。请查看 13 亿行纽约出租车数据这篇博客来详细地了解 MemSQL S3 管道的规模。
Apache Kafka 集群能支持每秒数百万次的读写操作。当然,这也很依赖于集群所处理的数据包的大小。随着数据包大小的增加,消息传递的吞吐量也会下降。即便如此,作为一个分布式的系统,Apache Kafka 是可以根据实际需求来扩展的。
我们的 O'Reilly 电子书是能下载得到的。它们可以在 MemSQL O'Reilly Trilogy 这一网站里面找到,这三步曲的内容涵盖了预测分析还有人工智能等主题。
MemSQL 群集可以用 Docker,Amazon Web Services 或 Microsoft Azure 来独立部署。不妨看看我们的快速上手教程来了解一下。
要想了解快速构建 MemSQL Pipeline 以及 Apache Kafka 环境的方法,可以回顾一下我们的快速上手 Kafka 管道 教程,也可以点击这个链接来看一看在 5 分钟内造个 Kafka 管道的视频直播录像。如果想要一个将 MemSQL、MemSQL Pipelines,还有 Apache Kafka 结合起来的一个 demo,那可以在 MemSQL Demo 这里登记并获取。