前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >用云函数快速批量处理COS里面的日志

用云函数快速批量处理COS里面的日志

原创
作者头像
黄希彤
修改2022-03-10 15:13:20
5490
修改2022-03-10 15:13:20
举报
文章被收录于专栏:黄希彤的专栏黄希彤的专栏

本来CLS日志服务可以直接触发云函数来快速处理的,不过这样触发次数就有点多了,比如说被处理的日志本来就是云函数生成的,那么函数触发次数就直接要翻番,如果日志不是需要及时处理的话,可以让它在CLS里面累计上几分钟,然后用定时器触发一个云函数,通过日志服务查询接口 SearchLog 来实现批量处理。

但是这样做有个坑,如果这几分钟的log条数超过100条的话,我们可以把Limit放到最大1000,如果超过1000条的话,按照文档的说明,应该通过传递Context的方式来实现翻页,最多可以翻出来10000条。然而,文档上说的操作根本是无法实现的,因为如果你翻页查询的时候传递了Query参数,那就会被认为你要进行一次新的查询,然后给你返回第一页,即使Query参数和上一次查询一模一样也没有用。如果你不传Query参数只传Context参数呢,那你只会收到一个缺少Query参数的错误,因为Query是必选参数。

就算你用SQL查询和SQL翻页的方式(通过给SQL传递offset和limit),如果这几分钟的log超过了一万条,你还是没辙。所以更好的方式可能是每分钟通过 日志下载接口 来下载指定时间段的日志处理,或者把日志自动投递到COS,用COS的创建文件事件做触发器来触发云函数执行,然后把日志文件下载过来批量处理。

当日志非常多的时候,通过日志下载接口需要自行处理分包的问题,用投递COS的方式处理的话分包也是自动处理的,代码逻辑会更简单一些。但是CLS投递到COS的最短周期是5分钟,但是实际上一个日志从生成到打包、上传,触发处理,可能要经过接近10分钟,如果需要更及时的处理数据的话,只要确保文件不会大到需要分包,用定时器来触发可能更合适。

这样一个日志文件可能会非常大,如果整个文件读到内存中处理的话需要给云函数申请足够多的内存。更好的方式是用流的方式来处理。因为COS的sdk可以把文件读成流,日志下载接口生成的日志文件也可以用request读成流。这样即使文件非常大,也可以通过流处理的方式进行实时解压(因为日志打包的时候会被强制自动压缩),并对流进行实时解析,实现高效的日志处理:

代码语言:javascript
复制
'use strict';
const zlib = require("zlib"),
	readLine = require('readline'),
	COS = require('cos-nodejs-sdk-v5'),
	cos = new COS({
		SecretId: process.env.SECRET_ID,
		SecretKey: process.env.SECRET_KEY,
		KeepAlive: false
	}),
	tencentcloud = require("/opt/node_modules/tencentcloud-sdk-nodejs"), //node14之前的版本的内置SDK不支持cls,需要下载新的sdk用层的方式覆盖进去并用这个方式引用
	ClsClient = tencentcloud.cls.v20201016.Client,
	clientConfig = {
		credential: {
			secretId: process.env.SECRET_ID,
			secretKey: process.env.SECRET_KEY,
		},
		region: process.env.TENCENTCLOUD_REGION,
		profile: {
			httpProfile: {
				endpoint: "cls.tencentcloudapi.com",
			},
		},
	},
	downloadCycle = 60000, //下载周期
	request = require('request');
exports.main_handler = (event, context, callback) => {
	let gunzip = zlib.createGunzip();
	let jsonCount = 0,
		invalidLines = 0;
	let rl = readLine.createInterface(gunzip);
	rl.on('line', function(line) {
		if (/^\s*\{.*\}\s*$/.test(line)) {
			jsonCount++; //收到的行数据似乎是JSON数据
		} else {
			invalidLines++; //收到的一行似乎不是JSON数据
		}
	})
	rl.on('close', () => {
		//因为是demo,这里没有等待所有的可能的并发流都处理完再回调
		//实际使用的时候应该Promise.all或者用异步方式逐个流处理完再回调。
		callback(null, "有效行数: " + jsonCount + " ,无效行数:" + invalidLines)
	})
	if ("Type" in event && event.Type == "Timer") { //定时器触发
		let client = new ClsClient(clientConfig);
		let params = {
			"TopicId": process.env.TopicId,
			"Query": "SCF_Namespace:\"" + process.env.SCF_NAMESPACE + "\"",
			"Count": 10000000, //最大一次下载1000万条。如果一个周期内有可能超过的话需要考虑分包或者该用cos投递
			"From": (Math.floor(Date.now() / downloadCycle - 1) * downloadCycle),
			"To": (Math.floor(Date.now() / downloadCycle) * downloadCycle) //触发点之前的一个完整的时间周期
		};
		//生成日志打包任务,定时器下次触发的时候下载处理
		client.CreateExport(params).then(res => {
			if ("ExportId" in res) {
				console.log("导出任务发起成功,生成的文件等待定时器下次触发的时候处理");
				client.DescribeExports({
					"TopicId": process.env.TopicId
				}).then(res => {
					if ("Exports" in res && res.Exports instanceof Array) {
						for (let i = 0; i < res.Exports.length; i++) {
							let e = res.Exports[i];
							if (e.Status == "Completed") {
								request(e.CosPath).pipe(gunzip);
								client.DeleteExport({"ExportId": e.ExportId});//如果要更保险一点,放到readline的close后面删除更好
							}
						}
					}
				})
			} else {
				console.log("导出任务发起失败,需要告警并走重试逻辑");
			}
		})
	} else if ("Records" in event && event.Records instanceof Array) { //COS触发
		let records = event.Records;
		for (let i = 0; i < records.length; i++) {
			let c = records[i].cos;
			console.log("下载并分析日志文件 " + c.cosObject.key)
			cos.getObject({
				Bucket: c.cosBucket.name + "-" + c.cosBucket.appid,
				Region: c.cosBucket.cosRegion,
				Key: "/" + c.cosObject.key.split("/").slice(3).join("/"),
				Output: gunzip
				//            Output: "/tmp/test.gz"
			}, function(err, data) {
				if (err) console.log("err:" + JSON.stringify(err))
			});
		}
	} else {
		return "未知触发器"
	}
};

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档