CKafka 触发器

最近更新时间:2018-08-28 11:42:37

用户可以编写 SCF 函数来处理 CKafka 中收取到的消息。SCF 后台模块可以作为消费者消费 CKafka 中的消息,并将消息传递给 SCF 函数。

CKafka 触发器具有以下特点:

  • Pull 模型:SCF 的后台模块作为消费者,连接 CKafka 实例并消费消息。在后台模块获取到消息后,会将消息封装到数据结构中并调用指定的函数,将消息数据传递给云函数。
  • 异步调用: CKafka 触发器始终使用异步调用类型来调用函数,结果不会返回给调用方。有关调用类型的更多信息,请参阅 调用类型

CKafka 触发器属性

  • CKafka 实例:配置连接的 CKafka 实例,仅支持选择同地域下的实例。
  • Topic:支持在 CKafka 实例中已经创建的 Topic。
  • 最大批量消息数:在拉取并批量投递给当前云函数时的最大批量数。根据消息大小,写入速度,每次触发云函数并投递时的消息,数量不一定均能达到最大消息数,而是一个变动值,消息个数在 1 ~ 最大消息数之间。

CKafka 消费及消息传递方法

由于 CKafka 消息无主动推送能力,需要消费方通过拉取方式,拉取到消息并进行消费。因此,在配置 CKafka 触发器后,SCF 云函数后台会通过启动 CKafka 消费模块,作为消费者,在 CKafka 中创立独立的消费组进行消费。

SCF 云函数后台的消费模块,在消费到消息后,会根据一定的超时时间、累积消息数量及大小、最大批量消息数等信息,组合为事件结构并传递给云函数。在这个过程中,每次组合的消息数量不一定相同,每个事件结构内的消息个数在 1 ~ 配置的最大消息个数 之间。如果配置的最大消息数过大,有可能出现事件结构内的消息个数始终不会达到最大消息数的情况。

在云函数中获取到事件内容后,可以通过循环处理的方式,确保每一条消息都得到处理,而不应假定每次传递的消息个数均是恒定的。

CKafka 触发器的事件消息结构

在指定的 CKafka Topic 接收到消息时,云函数的后台消费者模块会消费到消息,并将消息组装为类似以下的 JSON 格式事件,触发绑定的函数并将数据内容作为入参传递给函数。

{
  "Records": [
    {
      "Ckafka": {
        "topic": "test-topic",
        "partition":1,
        "offset":36,
        "msgKey": "None",
        "msgBody": "Hello from Ckafka!"
      }
    },
    {
      "Ckafka": {
        "topic": "test-topic",
        "partition":1,
        "offset":37,
        "msgKey": "None",
        "msgBody": "Hello from Ckafka again!"
      }
    }
  ]
}

数据结构内容详细说明如下:

结构名 内容
Records 列表结构,可能有多条消息合并在列表中
Ckafka 标识事件来源为 CKafka
topic 消息来源 Topic
partition 消息来源的分区 ID
offset 消费偏移编号
msgKey 消息 key
msgBody 消息内容