操作背景
您可以将日志主题的数据投递到腾讯云 CKafka,然后用于您的实时流计算场景。如果您没有购买腾讯云 CKafka 实例,可以考虑使用日志服务(Cloud Log Service,CLS)自带的 Kafka 协议消费功能。
前提条件
已开通腾讯云消息队列 (CKafka)。
注意:
如果您的 CKafka 开启了 ACL,请注意 CKafka 的版本限制:CKafka1.X版本,需在1.1.1及以上;CKafka2.X版本,需在2.4.2及以上,其它版本开启 ACL 均会导致投递失败,可通过关闭 ACL 解决投递失败的问题。
当前仅支持实时日志投递,不支持历史日志的投递。
确保当前操作账号拥有开通投递到 CKafka 的权限。请参见 CLS 访问策略模板。
操作步骤
您需对该Topic的配置做如下修改:
CleanUp.policy:选择 delete。该参数需设置为 delete,否则会投递失败。
max.message.bytes:设置为 ≥ 8MB。该参数 default 值是1MB,当投递 CKafka 的单条 message(一组日志数据)的大小超过1MB时,无法写入 CKafka Topic,导致投递失败。
2. 前往 日志服务控制台,如下图,有两种方式,进入投递任务管理页面。
在左侧导航栏中,单击投递任务,选择地域、日志集和日志主题。
在左侧导航栏中,单击日志主题,选择需要配置投递到 CKafka 任务的日志主题,进入日志主题管理页面。单击投递到 CKafka 页签。
3. 单击右侧的编辑,开启投递到 CKafka 开关,选择相应的 CKafka 实例以及对应的 Topic,开始投递。您可以选择以原始内容或者 JSON 投递日志。
3.1 以原始内容投递
原始内容投递配置项说明如下:
配置项 | 解释说明 | 规则 | 是否必填 |
CKafka 实例 | 与当前日志主题同地域的 CKafka Topic 作为投递目标 。 | 列表选择 | 必填 |
投递数据格式 | 选项【原始内容】,投递用户的原始日志。 | 列表选择 | 必填 |
数据压缩格式 | 支持 Snappy\\LZ4。 | 列表选择 | 必填 |
投递日志预览 | 预览您投递的日志数据。 | - | - |
3.2 以 JSON 投递
JSON 投递配置项说明如下:
配置项 | 解释说明 | 规则 | 是否必填 |
CKafka 实例 | 与当前日志主题同地域的 CKafka Topic 作为投递目标 | 列表选择 | 必填 |
投递数据格式 | 选项【JSON】,以 JSON 的数据格式投递日志
| 列表选择 | 必填 |
JSON 格式的转义/不转义 | 转义,将 JSON 第一层节点的值转为 String,如果您的第一层节点的值是 Struct,在下游入库或者计算时,需要提前将该Struct 转为 String,可以选这个选项。示例: 日志原文:{"a":"aa", "b":{"b1":"b1b1", "c1":"c1c1"}} 投递到 CKafka :{"a":"aa","b":"{\\"b1\\":\\"b1b1\\", \\"c1\\":\\"c1c1\\"}"} 不转义,不对您的 JSON 结构和层级做修改,日志格式和采集侧保持一致。示例: 日志原文:{"a":"aa", "b":{"b1":"b1b1", "c1":"c1c1"}} 投递到 CKafka:{"a":"aa", "b":{"b1":"b1b1", "c1":"c1c1"}} | 列表选择 | 必填 |
__TAG__元信息 | 将__TAG__元信息平铺或者不平铺,请按照您的实际业务场景选择。示例: __TAG__元信息:{"__TAG__":{"fieldA":200, "fieldB":"text"}} 平铺:{"__TAG__.fieldA":200,"__TAG__.fieldB":"text"} 不平铺:{"__TAG__":{"fieldA":200, "fieldB":"text"}} | | |
数据压缩格式 | 支持 Snappy\\ LZ4 | 列表选择 | 必填 |
投递日志预览 | 预览您投递的日志数据。 | - | - |
4. 单击确定,启动投递到 CKafka 。
注意
常见问题
提示没有读写 CKafka Topic 的权限,怎么办?
如果您直接使用 API 接口投递数据到 CKafka,可能会存在读写 CKafka Topic 的权限问题。因为,如果您在控制台使用该功能,系统会引导您完成相关授权;如果您直接调用 API 投递,则需要手动授权。具体的排查和解决方案请参见 投递任务角色授权。