存储 CKafka 消息到 COS

最近更新时间:2020-06-01 09:31:26

操作场景

消息队列 CKafka 支持用户存储消息的能力,您可以将消息存储到 COS 中,并下载分析。

说明:

该白名单功能即将下线,您可以使用新版的 消息转储,将消息转储在 COS 中 。

操作步骤

  1. 登录 消息队列 CKafka 控制台
  2. 在实例列表页,单击目标实例 ID,进入topic 管理标签页。
  3. 在 topic 管理标签页,单击操作列的【存储消息到 COS】。
  4. 单击启用图标,开启开启存储消息到 COS 功能。
    • 时间粒度:根据消息量的大小,选取汇聚消息的时间间隔,时间间隔为5 - 60分钟不等。
    • 存放 Bucket:针对不同的 topic,选取相应的 COS 中 Bucket,则请求消息会自动在 Bucket 下创建 instance ID + topic ID 为名称的文件夹进行存储。选取完成后,单击 Bucket 地址可以直接跳转到文件下载页面。

如果您还未创建对象存储的 Bucket,请在 新建 Bucket 后选取相应的存储位置。

后置条件

开启【存储消息到 COS】功能后,CKafka 服务会在【访问管理】>【角色】中增加一个【cosCkafka_QCSRole】角色用来授权消息存储到 COS 服务。

  • 如果您不再需要此项功能,请在 CKafka 控制台 >【实例列表】>【topic 管理】中,单击操作列的【存储消息到 COS】,禁用此功能并删除其角色。

  • 如果您需要一直使用此功能,但误删除了【cosCkafka_QCSRole】角色,将会影响消息存储到 COS,请及时重新创建角色。

具体创建步骤如下:

  1. 主账号登录 访问管理控制台,在左侧导航栏中选择【角色】>【新建角色】>【腾讯云账号】,填写其他账号 ID:91000000031。
  2. 搜索策略:QcloudCOSAccessForCkafkaRole,选中后单击【下一步】。
  3. 填写角色名称和描述。
    角色名称:cosCkafka_QCSRole
    角色描述: 消息服务(CKafka)对对象存储服务(COS)的跨业务访问权限
  4. 单击【完成】,创建的角色将显示在角色列表中。
  5. 在 CKafka 控制台中,观察 Consumer Group 数据消费是否正常。
注意:

子账号创建的实例或者 topic 使用消息存储到 COS,主帐号需要授权子帐号可以传递指定角色(Pass Role)给腾讯云 CKafka 服务(如果没有此步骤,则子帐号无法通过 CKafka 服务访问 COS 资源)。

您可以创建自定义策略并授权子账号,PassRole 策略语法如下:

{
    "version": "2.0",
    "statement": [
        {
            "effect": "allow",
            "action": "cam:PassRole",
            "resource": "qcs::cam::uin/${roleOwnerUin}:roleName/cosCkafka_QCSRole"
        }
    ]
}

产品限制和费用计算

  • 该功能适用于少量数据备份到 COS 的场景,不保证数据能100%成功同步到 COS 中。
  • 当前 COS 文件聚合粒度为5 - 60分钟不等,允许用户指定。
  • 数据的传输会有一定的延迟。
  • 当前仅支持和 CKafka 实例同个地域的 COS 进行消息存储,为保证延时,不支持跨地域存储。
  • object 权限用 COS 默认的私有读写权限。
  • 转储服务会占用一个 Group ID。
  • 文件名为存放的 timestamp,存放路径为 instance ID/topic ID
  • 文件内容是 CKafka 消息里的 value 用 utf-8 String 序列化拼接而成,暂不支持二进制的数据格式。
  • 当前 CKafka 消息到 COS 服务免费,COS 存储可享受一定 免费额度,提供50GB免费存储空间。如您的消息量级较大,请及时清理数据。
  • 开启转 COS 的操作人必须对目标 COS Bucket 具备写权限。
  • 开启转发前,积压 Ckafka 消息不会被转存到 COS。
  • 实例到期后转发 COS 也会中断,实例续费后会自动恢复转发。

旧版 COS 转储能力迁移说明

为提供更好的服务并全面支持更多 Ckafka 相关转储能力,我们将在近日对旧版白名单 COS 转储功能进行下线处理,并正式推出全新商业化 COS 转储能力。请及时做好相关迁移工作。

新版优势

新版正式商业化 COS 转储能力有以下优势:

  • 灵活转储:新版 COS 转储功能提供更加灵活的转储配置,创建完成后可在云函数控制台自定义较为特殊的转储逻辑,如设置特殊换行符,日志过滤等。亦可快捷方便的使用默认配置。
  • 消费能力:新版转储消费能力相较旧版 COS 转储消费能力提升50%,是自建单节点 logstash 消费能力的15倍。
  • 文件存储:新版 Ckafka 转储 COS 的单个文件最大500MB,如超过该数值,会自动分包上传。相较于之前旧版20MB分包,新版单包大小聚合能力提升25倍。
  • 功能特性:新版 Ckafka 转储 COS 新增 "起始位置"特性,用户可自行选择历史消息的处理方式,更加方便的使用 to COS 转储功能。
  • 更多支持:新版转储能力新增"通用转储",可支持并自定义 Elasticsearch、MySQL、PostgreSQL 等通用场景的转储能力。

注意事项

关于旧版 COS 转储能力迁移,您需要关注以下几点:

  • 存储路径:为保证转储文件的可读性,新版 COS 转储存储路径将修改为 instance-id/topic-id/date/timestamp 。

    说明:

    如相关路径如无法满足业务需要,请创建完成后在云函数控制台修改 CkafkaToCosConsumer 函数,参考下文迁移步骤。

  • 时间聚合:新版 Ckafka 转储 COS 为了保证转储服务的可用性防止消息堆积,故支持1 - 15分钟粒度。

  • 费用相关:新版转储功能基于云函数 SCF 服务提供。SCF 为用户提供了一定量 免费额度 ,超额部分产生的收费,请以 SCF 服务的 计费规则 为准。

更多说明参考 消息转储文档

迁移步骤

  1. 创建新版消息转储。
    登录 消息队列 CKafka 控制台,在目标实例的topic 管理页,单击操作列的【消息转储】。详细操作可参考 Ckafka 转储对象存储(COS)
  2. 设置时间粒度,选择与之前相同的 Bucket 信息。
    说明:

    新版 COS 转储新增“起始位置”,可根据迁移需求自行选择 Topic 消费位置。

  3. 如新版相关存储路径无法满足业务需要,可单击页面右侧的函数管理链接。

    在云函数控制台中,切换到函数代码标签页,单击管理内的 CkafkaToCosConsumer 函数进行修改。

    将 CkafkaToCosConsumer 函数第49行 -56行内容替换为如下代码,单击【保存】即可与旧版存储路径保持一致。
     # Generating file name. 生成写入文件名
     def object_key_generate(self):
         logger.info("start to generate key name")
         file_name = str(int(round(time.time())))
         dir_name = "{}/{}".format(str(self.kafka_instance_id), str(self.topic_id))
         object_key = '{}/{}'.format(dir_name, file_name)
         return object_key
  4. 为避免重复存储数据,请在 CKafka 控制台中关闭旧版 COS 转储功能。
目录