Single Message Transforms(SMT)是Kafka Connect的一项功能,支持单条消息的转换。例如聚合或连接数据流这样的复杂操作,都应差遣Kafka Streams完成——但简单的转换,Kafka Connect自身就能完成,且无需一行代码。
SMT作用于经过Kafka Connect的消息流;在流入消息到达Kafka前修改消息,在流出时或Kafka中留存消息尚未处理时修改消息(译注:适用于消息到达Kafka或输出到其他接收系统前,必须执行一些转换操作的场景,如过滤某些类型的事件或敏感信息)。
为连接器设置数据Key
下面是SMT实践范例。和
教程1
的例子一样,数据从JDBC Source 连接器进入,默认key是null(无论源数据库中定义了什么key)。将key应用于实践很有用,如支持Kafka分区策略的设计,或保障流出数据的逻辑key在目标存储(例如Elasticsearch)中被持久化。
让我们回忆下从Kafka JDBC连接器流向Kafka的消息是啥样的:
行首的那些null就是消息的key。这里,我们希望key包含c1列的整数值,因此我们将把两个SMT链接在一起。首先将消息的c1字段复制到消息key,然后提取该字段整数部分。SMT需要对每个连接器进行设置,因此,连接器配置要这样修改(或新建):
使配置变更生效:
./bin/confluent config jdbc_source_mysql_foobar_01 -d /tmp/kafka-connect-jdbc-source-with-smt.json
连接器将自动切换到新的配置,在MySQL的foobar表中插入新数据(这里使用直接管道而不是交互式会话):
echo "insert into foobar (c1,c2) values (100,'bar');"|mysql --user=rmoff --password=pw demo
在avro consumer控制台上会出现:
100 {"c1":{"int":100},"c2":{"string":"bar"},"create_ts": 1501799535000,"update_ts":1501799535000}
注意到key(行首)和c1的值相同吧,这是我们在配置文件transforms部分定义的,用于获取消息key的列。
有了这一特性,我们可以利用Kafka消息里的key来维护Elasticsearch源表中的静态视图。这是因为Elasticsearch的行(Elasticsearch术语是"document ")key以幂等方式定义。要做到这点,只需使用SMT确保Kafka消息中有key,并在Elasticsearch sink 连接器上设置"key.ignore": "false"(或直接删除配置行,因默认是false)。
注意,如果你遵循了教程1的示例,还可能会在Kafka topic上收到null key消息,可修改下topic并重启source连接器。如果试图从包含null key 消息的topic向Elasticsearch写入数据,并且连接器没有设置"key.ignore": "true"参数,那么sink任务将失败并报错:ConnectException: Key is used as document id and can not be null。
为数据血缘定义元数据
下面两种SMT会很有用。
第一个(InsertTopic)将topic名嵌入到消息中——如果你有着复杂的管道,又想保留消息及其源topic的血缘,非常有用;
第二个(InsertSourceDetails)将自定义静态信息作为字段添加到消息中。如下例中的源数据库名。或用途、环境名(开发/测试/生产)、分布式系统的地理位置,等等。
感谢Confluent Schema Registry的发明,使得这些新列自动添加到定义中,并在流向Elasticsearch的数据中反映——无需进行任何更改。实际上,在我做这些的时候,管道一直在运行,只是列被添加到Elasticseach mapping中罢了。很巧妙!
字段遮盖与黑白名单
SMT还提供了遮盖或完全删除字段值的功能,下面是字段遮盖的配置:
转换原始消息,遮盖字段c2:
{"c1":{"int":22},"c2":{"string":"foo"}
完全删除c2的内容:
{"c1":{"int":22},"c2":{"string":""}}
还能完全删除字段:
最后只剩:
{"c1":{"int":22}}
Kafka Connect SMT消息路由
通过几行额外的配置,流经Kafka Connect的数据就能得到修改和丰富。接下来的示例,将演示SMT把消息路由到不同topic,实现数据管道的灵活性。
基于正则的Topic路由
Kafka Topic通常用于定义数据存储中的目标对象名称(如Elasticsearch),另外还通过各种前后缀来表示数据中心、环境等。所以,你希望Elasticsearch索引(或其他)的名称是FOO而非DC1-TEST-FOO,那就必须在配置中手工重新映射。如果这事可以自动完成,何须手工操作呢?
下面是更新后,Elasticsearch连接器和SMT的配置:
可以确定,Elasticsearch上的数据现在路由到了foobar索引:
注意,使用Elasticsearch sink可能会遇到ConnectException: Cannot create mapping——请前往GitHub详细了解。另外,作为变通方案,可以显式创建Elasticsearch索引:
curl -XPUT 'http://localhost:9200/foobar'
基于时间戳的Topic路由
除了根据正则重新映射topic外,SMT还可以根据时间戳将消息路由到topic。
有两个配置参数,topic.format和timestamp.format。以便用户指定topic和时间戳格式,例如:
多种路由转换可混合使用:
总结
而本文中,对Kafka Connect进行了更深入的探讨,了解了Single Message Transforms是如何在消息流经Kafka Connect时操作它们的。遮盖数据、添加血缘以及路由topic,SMT都能完成。对了,这里甚至没有提SMT是开放API,因此也是完全可扩展的!有关何时使用SMT的更多细节,请移步confluent.io。
关于译者: 十年网络安全行业,八年Java EE研发、架构、管理,三年开源对象数据库社区运营,一年大数据、机器学习产线管理,知乎、微信@rosenjiang。
领取专属 10元无门槛券
私享最新 技术干货