有奖捉虫:云通信与企业服务文档专题,速来> HOT
用户可以编写云函数来处理 CKafka 中收取到的消息。云函数后台模块可以作为消费者消费 CKafka 中的消息,并将消息传递给云函数。
CKafka 触发器具有以下特点:
Pull 模型:云函数的后台模块作为消费者,连接 CKafka 实例并消费消息。在后台模块获取到消息后,会将消息封装到数据结构中并调用指定的函数,将消息数据传递给云函数。
同步调用: CKafka 触发器使用同步调用类型来调用函数。有关调用类型的更多信息,请参见 调用类型
说明
对于运行错误(含用户代码错误和 Runtime 错误),CKafka 触发器会按照您配置的重试次数进行重试。默认重试10000次。
对于系统错误,CKafka 触发器会采用指数退避的方式持续重试,直至成功为止。

CKafka 触发器属性

CKafka 实例:配置连接的 CKafka 实例,仅支持选择同地域下的实例。
Topic:支持在 CKafka 实例中已经创建的 Topic(仅支持未创建 ACL 策略的 Topic)。
最大批量消息数:在拉取并批量投递给当前云函数时的最大消息数,目前支持最高配置为10000。结合消息大小、写入速度等因素影响,每次触发云函数并投递的消息数量不一定能达到最大值,而是处在1 - 最大消息数之间的一个变动值。
起始位置:触发器消费消息的起始位置,默认从最新位置开始消费。支持最新、最开始、按指定时间点三种配置。
重试次数:函数发生运行错误(含用户代码错误和 Runtime 错误)时的最大重试次数。

CKafka 消费及消息传递

由于 CKafka 消息无主动推送能力,需要消费方通过拉取的方式,拉取到消息并进行消费。因此,在配置 CKafka 触发器后,云函数后台会通过启动 CKafka 消费模块,作为消费者,并在 CKafka 中创立独立的消费组进行消费。
云函数后台的消费模块在消费到消息后,会根据一定的超时时间累积消息数量大小最大批量消息数等信息,组合为事件结构并发起函数调用(同步调用)。相关限制说明如下:
超时时间:目前云函数后台的消费模块的超时时间为60秒,避免时延过长才进行消费。例如,Ckafka Topic 的消息写入很少,消费模块在60秒内没有凑够最大批量消息数的消息,则依然会发起函数调用。
同步调用的事件大小限制:6MB,详情请参见 限制说明。如果 Ckafka Topic 的消息很大,例如单条消息就已经达到6MB,那么由于同步调用的6MB限制,所以传递给云函数的事件结构中只会有一条消息,而不是用户配置的最大消息个数。
最大批量消息数:同 CKafka 触发器属性,由用户设置,目前支持最高配置为10000。
云函数后台的消费模块会循环这个过程,且会保证消息消费的顺序性,即前一批消息消费完(同步调用),再进行下一批消息的消费。
说明
在这个过程中,每次组合的消息数量不一定相同,即每个事件结构内的消息个数在1 - 配置的最大消息个数之间。如果配置的最大消息数过大,有可能出现事件结构内的消息个数始终不会达到最大消息数的情况。
在云函数中获取到事件内容后,可选择循环处理的方式,确保每一条消息都得到处理,而不应假定每次传递的消息个数均是恒定的。

CKafka 触发器的事件消息结构

在指定的 CKafka Topic 接收到消息时,云函数的后台消费者模块会消费到消息,并将消息组装为类似以下的 JSON 格式事件,触发绑定的函数并将数据内容作为入参传递给函数。
{
"Records": [
{
"Ckafka": {
"topic": "test-topic",
"Partition":1,
"offset":36,
"msgKey": "None",
"msgBody": "Hello from Ckafka!"
}
},
{
"Ckafka": {
"topic": "test-topic",
"Partition":1,
"offset":37,
"msgKey": "None",
"msgBody": "Hello from Ckafka again!"
}
}
]
}
数据结构内容详细说明如下:
结构名
内容
Records
列表结构,可能有多条消息合并在列表中
Ckafka
标识事件来源为 CKafka
topic
消息来源 Topic
partition
消息来源的分区 ID
offset
消费偏移编号
msgKey
消息 key
msgBody
消息内容

常见问题

CKafka 消息堆积了很多该如何处理?

在您配置 CKafka 触发器后,云函数后台会通过启动 CKafka 消费模块作为消费者,在 CKafka 中创立独立的消费组进行消费,且消费模块的数量等于 Ckafka Topic 的分区(partition)数量。
如果堆积了很多 Ckafka 消息,则需要提升消费能力。提升消费能力有以下方法:
增加 Ckafka Topic 的分区数。云函数的消费能力正比于分区数量,云函数后台的 CKafka 消费模块会自动匹配 Ckafka Topic 分区数,即可以通过增加分区来提升消费能力。
优化云函数的运行时间。云函数的运行时间越短,消费能力就越强。若云函数的运行时间变长(例如,云函数内需要写 DB,DB 的响应变慢),则消费速度就会下降。