首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

这可能是北半球最暖的Kafka Connect数据管道教程:3

Single Message Transforms(SMT)是Kafka Connect的一项功能,支持单条消息的转换。例如聚合或连接数据流这样的复杂操作,都应差遣Kafka Streams完成——但简单的转换,Kafka Connect自身就能完成,且无需一行代码。

SMT作用于经过Kafka Connect的消息流;在流入消息到达Kafka前修改消息,在流出时或Kafka中留存消息尚未处理时修改消息(译注:适用于消息到达Kafka或输出到其他接收系统前,必须执行一些转换操作的场景,如过滤某些类型的事件或敏感信息)。

为连接器设置数据Key

下面是SMT实践范例。和

教程1

的例子一样,数据从JDBC Source 连接器进入,默认key是null(无论源数据库中定义了什么key)。将key应用于实践很有用,如支持Kafka分区策略的设计,或保障流出数据的逻辑key在目标存储(例如Elasticsearch)中被持久化。

让我们回忆下从Kafka JDBC连接器流向Kafka的消息是啥样的:

行首的那些null就是消息的key。这里,我们希望key包含c1列的整数值,因此我们将把两个SMT链接在一起。首先将消息的c1字段复制到消息key,然后提取该字段整数部分。SMT需要对每个连接器进行设置,因此,连接器配置要这样修改(或新建):

使配置变更生效:

./bin/confluent config jdbc_source_mysql_foobar_01 -d /tmp/kafka-connect-jdbc-source-with-smt.json

连接器将自动切换到新的配置,在MySQL的foobar表中插入新数据(这里使用直接管道而不是交互式会话):

echo "insert into foobar (c1,c2) values (100,'bar');"|mysql --user=rmoff --password=pw demo

在avro consumer控制台上会出现:

100 {"c1":{"int":100},"c2":{"string":"bar"},"create_ts": 1501799535000,"update_ts":1501799535000}

注意到key(行首)和c1的值相同吧,这是我们在配置文件transforms部分定义的,用于获取消息key的列。

有了这一特性,我们可以利用Kafka消息里的key来维护Elasticsearch源表中的静态视图。这是因为Elasticsearch的行(Elasticsearch术语是"document ")key以幂等方式定义。要做到这点,只需使用SMT确保Kafka消息中有key,并在Elasticsearch sink 连接器上设置"key.ignore": "false"(或直接删除配置行,因默认是false)。

注意,如果你遵循了教程1的示例,还可能会在Kafka topic上收到null key消息,可修改下topic并重启source连接器。如果试图从包含null key 消息的topic向Elasticsearch写入数据,并且连接器没有设置"key.ignore": "true"参数,那么sink任务将失败并报错:ConnectException: Key is used as document id and can not be null。

为数据血缘定义元数据

下面两种SMT会很有用。

第一个(InsertTopic)将topic名嵌入到消息中——如果你有着复杂的管道,又想保留消息及其源topic的血缘,非常有用;

第二个(InsertSourceDetails)将自定义静态信息作为字段添加到消息中。如下例中的源数据库名。或用途、环境名(开发/测试/生产)、分布式系统的地理位置,等等。

感谢Confluent Schema Registry的发明,使得这些新列自动添加到定义中,并在流向Elasticsearch的数据中反映——无需进行任何更改。实际上,在我做这些的时候,管道一直在运行,只是列被添加到Elasticseach mapping中罢了。很巧妙!

字段遮盖与黑白名单

SMT还提供了遮盖或完全删除字段值的功能,下面是字段遮盖的配置:

转换原始消息,遮盖字段c2:

{"c1":{"int":22},"c2":{"string":"foo"}

完全删除c2的内容:

{"c1":{"int":22},"c2":{"string":""}}

还能完全删除字段:

最后只剩:

{"c1":{"int":22}}

Kafka Connect SMT消息路由

通过几行额外的配置,流经Kafka Connect的数据就能得到修改和丰富。接下来的示例,将演示SMT把消息路由到不同topic,实现数据管道的灵活性。

基于正则的Topic路由

Kafka Topic通常用于定义数据存储中的目标对象名称(如Elasticsearch),另外还通过各种前后缀来表示数据中心、环境等。所以,你希望Elasticsearch索引(或其他)的名称是FOO而非DC1-TEST-FOO,那就必须在配置中手工重新映射。如果这事可以自动完成,何须手工操作呢?

下面是更新后,Elasticsearch连接器和SMT的配置:

可以确定,Elasticsearch上的数据现在路由到了foobar索引:

注意,使用Elasticsearch sink可能会遇到ConnectException: Cannot create mapping——请前往GitHub详细了解。另外,作为变通方案,可以显式创建Elasticsearch索引:

curl -XPUT 'http://localhost:9200/foobar'

基于时间戳的Topic路由

除了根据正则重新映射topic外,SMT还可以根据时间戳将消息路由到topic。

有两个配置参数,topic.format和timestamp.format。以便用户指定topic和时间戳格式,例如:

多种路由转换可混合使用:

总结

而本文中,对Kafka Connect进行了更深入的探讨,了解了Single Message Transforms是如何在消息流经Kafka Connect时操作它们的。遮盖数据、添加血缘以及路由topic,SMT都能完成。对了,这里甚至没有提SMT是开放API,因此也是完全可扩展的!有关何时使用SMT的更多细节,请移步confluent.io。

关于译者: 十年网络安全行业,八年Java EE研发、架构、管理,三年开源对象数据库社区运营,一年大数据、机器学习产线管理,知乎、微信@rosenjiang。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190202G0PRCD00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券